1. 程式人生 > >JAVA學習筆記(併發程式設計 - 捌)- 執行緒池

JAVA學習筆記(併發程式設計 - 捌)- 執行緒池

文章目錄

執行緒池

執行緒資源必須通過執行緒池提供,不允許在應用中自行顯式建立執行緒。說明:使用執行緒池的好處是減少在建立和銷燬執行緒上所花的時間以及系統資源的開銷,解決?源不足的問題。如果不使用執行緒池,有可能造成系統建立大量同類執行緒而導致消耗完記憶體或“過度切換”的問題

執行緒池對於限制應用程式中同一時刻執行的執行緒數很有用。因為每啟動一個新執行緒都會有相應的效能開銷,每個執行緒都需要給棧分配一些記憶體等等。

我們可以把併發執行的任務傳遞給一個執行緒池,來替代為每個併發執行的任務都啟動一個新的執行緒。只要池裡有空閒的執行緒,任務就會分配給一個執行緒執行。線上程池的內部,任務被插入一個阻塞佇列(Blocking Queue ),執行緒池裡的執行緒會去取這個佇列裡的任務。當一個新任務插入佇列時,一個空閒執行緒就會成功的從佇列中取出任務並且執行它。

執行緒池經常應用在多執行緒伺服器上。每個通過網路到達伺服器的連線都被包裝成一個任務並且傳遞給執行緒池。執行緒池的執行緒會併發的處理連線上的請求。

簡單來說使用執行緒池有以下幾個目的:

  • 執行緒是稀缺資源,不能頻繁的建立。應當將其放入一個池子中,可以給其他任務進行復用,減少物件建立、消亡的開銷,效能好
  • 解耦作用;執行緒的創建於執行完全分開,方便維護。
  • 執行緒池可有效控制最大併發執行緒數,提高系統資源利用率,同時可以避免過多資源競爭,避免阻塞
  • 執行緒池可提供定時執行、定期執行、單執行緒以及併發數控制等功能

直接new Thread的弊端:

  • 每次new Thread 新建物件,效能差
  • 執行緒缺乏統一管理,可能無限制的新建執行緒,相互競爭,常常會出現佔用過多的系統資源導致宕機或者發生OOM(out of memory 記憶體溢位),這種問題的原因不是因為單純的new一個Thread,而是可能因為程式的bug或者設計上的缺陷導致不斷new Thread造成的。
  • 缺少更多功能,如更多執行、定期執行、執行緒中斷等

執行緒池的好處

  • 重用存在的執行緒,減少物件建立、消亡的開銷,效能佳
  • 可有效控制最大併發執行緒數,提高系統資源利用率,同時可以避免過多資源競爭,避免阻塞
  • 提供定時執行、定期執行、單執行緒、併發數控制等功能

執行緒池原理

談到執行緒池就會想到池化技術,其中最核心的思想就是把寶貴的資源放到一個池子中;每次使用都從裡面獲取,用完之後又放回池子供其他人使用,有點吃大鍋飯的意思。
執行緒池類圖:
在這裡插入圖片描述
在上邊的類圖中,最上層就是Executor框架,它是一個根據一組執行策略的呼叫排程執行和控制非同步任務的框架,目的是提供一種將任務提交與任務如何執行分離開的機制。它包含了三個executor介面:

  • Executor:執行新任務的簡單介面
  • ExecutorService:擴充套件了Executor,添加了用來管理執行器生命週期和任務生命週期的方法
  • ScheduledExecutorService:擴充套件了ExecutorService,支援Future和定期執行任務

在類圖中,我們最常使用的是ThreadPoolExecutor和Executors,這兩個類都可以建立執行緒池,其中ThreadPoolExecutor是可定製化的去建立執行緒池,而Executors則屬於是工具類,該類中已經封裝好了一些建立執行緒池的方法,直接呼叫相應的方法即可建立執行緒。

執行緒池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。
說明: Executors返回的執行緒池物件的弊端如下:

  1. FixedThreadPool和SingleThreadPool:允許的請求佇列長度為Integer.MAX-VALUE,可能會堆積大量的請求,從而導致00M。
  2. CachedThreadPool和ScheduledThreadPool:允許的建立執行緒數量為Integer.MAXVALUE,可能會建立大量的執行緒,從而導致oom2) CachedThreadPool和ScheduledThreadPool:允許的建立執行緒數量為Integer.MAXVALUE,可能會建立大量的執行緒,從而導致oom

可以說執行緒池體系裡最為核心的類是ThreadPoolExecutor,也是功能最強的,ThreadPoolExecutor共有四個建構函式,如下:

ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue <Runnable>)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue <Runnable>, ThreadFactory) 
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue <Runnable>, RejectedExecutionHandler)
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue <Runnable>, ThreadFactorv, RejectedExecutionHandler)

執行緒池引數

其中最多可傳入七個引數,這七個引數配合起來,構成了執行緒池強大的功能。引數說明:

  • corePoolSize:核心執行緒數量
  • maximumPoolSize:執行緒最大執行緒數
  • workQueue:阻塞佇列,儲存等待執行的任務,很重* 要,會對執行緒池執行過程產生重大影響
  • keepAliveTime:執行緒沒有任務執行時最多保持多久時間終止(當執行緒中的執行緒數量大於corePoolSize的時候,如果這時沒有新的任務提交核心執行緒外的執行緒不會立即銷燬,而是等待,直到等待的時間超過keepAliveTime)
  • unit:keepAliveTime的時間單位
  • threadFactory:執行緒工廠,用來建立執行緒,若不設定則使用預設的工廠來建立執行緒,這樣新創建出來的執行緒會具有相同的優先順序,並且是非守護的執行緒,同時也會設定好名稱
  • rejectHandler:當拒絕處理任務時(阻塞佇列滿)的策略(AbortPolicy預設策略直接丟擲異常、CallerRunsPolicy用呼叫者所在的執行緒執行任務、DiscardOldestPolicy丟棄佇列中最靠前的任務並執行當前任務、DiscardPolicy直接丟棄當前任務)

拒絕策略的實現類都在TreadPoolExecutor中:

ThreadPoolExecutor
	Worker
	CallerRunsPolicy
	AbortPolicy
	DiscardPolicy
	DiscardoldestPolicy

我們來說一下其中corePoolSize、maximumPoolSize、workQueue 這三個引數的關係:

如果執行的執行緒數量小於corePoolSize的時候,直接建立新執行緒來處理任務。即使執行緒池中的其他執行緒是空閒的。如果執行緒池中的執行緒數量大於corePoolSize且小於maximumPoolSize時,那麼只有當workQueue滿的時候才建立新的執行緒去處理任務。如果corePoolSize與maximumPoolSize是相同的,那麼建立的執行緒池大小是固定的。這時如果有新任務提交,且workQueue未滿時,就把請求放入workQueue中,等待空閒執行緒從workQueue取出任務進行處理。如果需要執行的執行緒數量大於maximumPoolSize時,並且此時workQueue也滿了,那麼就使用rejectHandler引數所指定的拒絕策略去進行處理。

然後我們來具體介紹一下 workQueue, 它是儲存待執行任務的一個阻塞佇列,當我們提交一個新的任務到執行緒池後,執行緒池會根據當前池中正在執行的執行緒數量來決定該任務的處理方式。處理方式總共有三種:

1、直接切換(SynchronusQueue)
2、×××佇列(LinkedBlockingQueue),若使用該佇列,那麼執行緒池中能夠建立的最大執行緒數為corePoolSize,這時maximumPoolSize就不會起作用了。當執行緒池中所有的核心執行緒都是執行狀態的時候,新的任務提交就會放入等待佇列中。
3、有界佇列(ArrayBlockingQueue),使用該佇列可以將執行緒池中的最大執行緒數量限制為maximumPoolSize引數所指定的值,這種方式能夠降低資源消耗,但是這種方式使得執行緒池對執行緒排程變的更困難。因為此時執行緒池與佇列容量都是有限的了,所以想讓執行緒池處理任務的吞吐率達到一個合理的範圍,又想使我們的執行緒排程相對簡單,並且還儘可能降低執行緒池對資源的消耗,那麼我們就需要合理的設定corePoolSize和maximumPoolSize這兩個引數的值
分配技巧: 如果想降低資源的消耗包括降低cpu使用率、作業系統資源的消耗、上下文切換的開銷等等,可以設定一個較大的佇列容量和較小的執行緒池容量,這樣會降低執行緒池處理任務的吞吐量。如果我們提交的任務經常發生阻塞,我們可以考慮呼叫相關方法調整maximumPoolSize引數的值。如果我們的佇列容量較小,通常需要把執行緒池的容量設定得大一些,這樣cpu的使用率相對來說會高一些。但是如果執行緒池的容量設定的過大,提高任務的數量過多的時候,併發量會增加,那麼執行緒之間的排程就是一個需要考慮的問題,這樣反而可能會降低處理任務的吞吐量。

執行緒池狀態

執行緒池有五種狀態,執行緒池狀態轉換過程圖如下:
在這裡插入圖片描述

  • running:執行狀態,能接受新提交的任務,也能處理阻塞佇列中的任務
  • shutdown:關閉狀態,不能處理新的任務,但卻可以繼續處理阻塞佇列中已儲存的任務。線上程池處於 RUNNING 狀態時,呼叫 shutdown()方法會使執行緒池進入到該狀態。(finalize() 方法在執行過程中也會呼叫shutdown()方法進入該狀態);
  • stop:停止狀態,不能接受新任務,也不處理佇列中的任務,會中斷正在處理任務的執行緒。線上程池處於 RUNNING 或 SHUTDOWN 狀態時,呼叫 shutdownNow() 方法會使執行緒池進入到該狀態;
  • tidying:如果所有的任務都已終止了,workerCount (有效執行緒數) 為0,執行緒池進入該狀態後會呼叫 terminated() 方法進入TERMINATED 狀態。
  • terminated:最終狀態,在terminated() 方法執行完後進入該狀態,預設terminated()方法中什麼也沒有做。

執行緒池常用方法

方法名 描述
execute() 提交任務,交給執行緒池執行
submit() 提交任務,能夠返回執行結果 execute+Future
shutdown() 關閉執行緒池,等待任務都執行完
shutdownNow() 立刻關閉執行緒池,不等待任務執行完
getTaskCount() 執行緒池已執行和未執行的任務總數
getCompleteTaskCount() 已完成的任務數量
getPoolSize() 執行緒池當前的執行緒數量
getActiveCount() 當前執行緒池中正在執行任務的執行緒數量

使用Executors建立執行緒池
上文中我們提到了可以使用Executors工具類方便的建立執行緒,該類中提供了四種建立執行緒池的方法,如下:

方法名 描述
newCachedThreadPool 建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒
newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待
newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行
newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行

newCachedThreadPool使用示例:

package com.mmall.concurrency.example.threadPool;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class ThreadPoolExample1 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
 // 若需使用ThreadPoolExecutor裡的方法,則需要進行強轉
// ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        for (int i = 0; i <10 ; i++) {
            final int index=i;
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}",index);
                }
            });
        }
        executorService.shutdown();
    }
}
//19:13:20.558 [pool-1-thread-9] INFO com.mmall.concurrency.example.threadPool.ThreadPoolExample1 - task:8
//19:13:20.556 [pool-1-thread-4] INFO com.mmall.concurrency.example.threadPool.ThreadPoolExample1 - task:3
//19:13:20.559 [pool-1-thread-10] INFO com.mmall.concurrency.example.threadPool.ThreadPoolExample1 - task:9
//19:13:20.558 [pool-1-thread-8] INFO com.mmall.concurrency.example.threadPool.ThreadPoolExample1 - task:7
//19:13:20.558 [pool-1-thread-7] INFO com.mmall.concurrency.example.threadPool.ThreadPoolExample1 - task:6
//19:13:20.555 [pool-1-thread-3] INFO com.mmall.concurrency.example.threadPool.ThreadPoolExample1 - task:2
//19:13:20.558 [pool-1-thread-2] INFO com.mmall.concurrency.example.threadPool.ThreadPoolExample1 - task:1
//19:13:20.558 [pool-1-thread-6] INFO com.mmall.concurrency.example.threadPool.ThreadPoolExample1 - task:5
//19:13:20.558 [pool-1-thread-5] INFO com.mmall.concurrency.example.threadPool.ThreadPoolExample1 - task:4
//19:13:20.557 [pool-1-thread-1] INFO com.mmall.concurrency.example.threadPool.ThreadPoolExample1 - task:0

newFixedThreadPool使用示例:

@Slf4j
public class ThreadPoolExample2 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> log.info("task: {}", index));
        }
        executorService.shutdown();
    }
}

newSingleThreadExecutor使用示例:

@Slf4j
public class ThreadPoolExample3 {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> log.info("task: {}", index));
        }
        executorService.shutdown();
    }
}

newScheduledThreadPool使用示例:

@Slf4j
public class ThreadPoolExample4 {
    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

        // 延遲3秒執行
        executorService.schedule(() -> log.info("Scheduled run"), 3, TimeUnit.SECONDS);

        // 以指定的速率執行任務,這裡是每隔3秒執行一次任務
        executorService.scheduleAtFixedRate(() -> log.info("Scheduled run"), 1, 3, TimeUnit.SECONDS);

        // 以指定的延遲執行任務,這裡是延遲3秒執行一次任務,使用起來和scheduleAtFixedRate基本一樣
        executorService.scheduleWithFixedDelay(() -> log.info("Scheduled run"), 1, 3, TimeUnit.SECONDS);

        executorService.shutdown();
    }
}

關於延遲執行任務的操作,在Java中還可以使用Timer類進行實現,如下:

@Slf4j
public class ThreadPoolExample4 {
    public static void main(String[] args) {
        Timer timer = new Timer();
        // 每隔3秒執行一次任務
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                log.info("timer task run");
            }
        }, new Date(), 3000);
    }
}

雖然可行,但是並不建議這麼使用,在多執行緒並行處理定時任務時,Timer執行多個TimeTask的話,只要其中之一沒有捕獲丟擲的異常,其它任務便會自動終止執行,使用ScheduledExecutorService則沒有這個問題。

使用ThreadPoolExecutor建立執行緒池

之前我們提到了,不建議使用Executors來建立執行緒池,而是使用ThreadPoolExecutor進行建立。實際上Executors裡建立的也就是ThreadPoolExecutor的例項,具體的看一下Executors類的原始碼就知道了。

接下來用一個例子演示一下如何通過ThreadPoolExecutor來建立執行緒池,這裡使用7個引數的建構函式,示例程式碼如下:

package org.zero.concurrency.demo.example.threadpool;

import lombok.extern.slf4j.Slf4j;
import org.springframework.lang.NonNull;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;


@Slf4j
public class ThreadPoolExample6 {
    public static void main(String[] args) {
        // 使用ArrayBlockingQueue作為其等待佇列
        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(5);
        // 使用自定義的ThreadFactory,目的是設定有意義的的執行緒名字,方便出錯時回溯
        ThreadFactory namedThreadFactory = new MyThreadFactory("test-thread");

        // 建立執行緒池
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                3, 5, 1, TimeUnit.MINUTES, blockingQueue, namedThreadFactory,
                new ThreadPoolExecutor.AbortPolicy());

        // 執行任務
        poolExecutor.execute(() -> log.info("thread run"));

        // 關閉執行緒池
        poolExecutor.shutdown();
    }

    private static class MyThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        private MyThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix + "-";
        }

        @Override
        public Thread newThread(@NonNull Runnable r) {
            Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
            if (t.isDaemon()) {
                t.setDaemon(true);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}

執行緒池的建立先介紹到這,其實大部分的建立方式可以參考Executors類的原始碼,所以這裡就不贅述了。

執行緒池的合理配置:

  • CPU密集型任務,就需要儘量壓榨CPU,參考值可以設定為NCPU+1,即CPU核心數量+1
  • IO密集型任務,參考值可以設定為2*NCPU,即CPU核心數量的2倍

最後需要說一句,執行緒池雖好但並非放之四海皆準,我們應當結合實際業務場景去考慮是否使用執行緒池。例如當執行緒池內需要執行的任務很小,小到執行任務的時間和任務排程的時間很接近,這時若使用執行緒池反而會更慢,因為任務排程和任務管理是需要耗時的。