1. 程式人生 > >當問起執行緒池(Java中的ThreadPoolExecutor類),你應該知道的基礎知識點

當問起執行緒池(Java中的ThreadPoolExecutor類),你應該知道的基礎知識點

執行緒池

執行緒池是一種多執行緒處理形式,處理過程中將任務新增到佇列,然後在建立執行緒後自動啟動這些任務。執行緒池執行緒都是後臺執行緒。每個執行緒都使用預設的堆疊大小,以預設的優先順序執行,並處於多執行緒單元中。如果某個執行緒在託管程式碼中空閒(如正在等待某個事件),則執行緒池將插入另一個輔助執行緒來使所有處理器保持繁忙。如果所有執行緒池執行緒都始終保持繁忙,但佇列中包含掛起的工作,則執行緒池將在一段時間後建立另一個輔助執行緒但執行緒的數目永遠不會超過最大值。超過最大值的執行緒可以排隊,但他們要等到其他執行緒完成後才啟動。

執行緒過多會帶來排程開銷,進而影響快取區域性性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。可用執行緒數量應該取決於可用的併發處理器、處理器核心、記憶體、網路sockets等的數量。 例如,執行緒數一般取cpu數量+2

比較合適,執行緒數過多會導致額外的執行緒切換開銷。

總之,建立太多執行緒,將會浪費一定的資源,有些執行緒未被充分使用。銷燬太多執行緒,將導致之後浪費時間再次建立它們。建立執行緒太慢,將會導致長時間的等待,效能變差。銷燬執行緒太慢,導致其它執行緒資源飢餓。

任務排程以執行執行緒的常見方法是使用同步佇列,稱作任務佇列。池中的執行緒等待佇列中的任務,並把執行完的任務放入完成佇列中。

執行緒池模式一般分為兩種:HS/HA半同步/半非同步模式、L/F領導者與跟隨者模式。

  • 半同步/半非同步模式又稱為生產者消費者模式,是比較常見的實現方式,比較簡單。分為同步層、佇列層、非同步層三層。同步層的主執行緒處理工作任務並存入工作佇列,工作執行緒從工作佇列取出任務進行處理,如果工作佇列為空,則取不到任務的工作執行緒進入掛起狀態。由於執行緒間有資料通訊,因此不適於大資料量交換的場合。

  • 領導者跟隨者模式,線上程池中的執行緒可處在3種狀態之一:領導者leader、追隨者follower或工作者processor。任何時刻執行緒池只有一個領導者執行緒。事件到達時,領導者執行緒負責訊息分離,並從處於追隨者執行緒中選出一個來當繼任領導者,然後將自身設定為工作者狀態去處置該事件。處理完畢後工作者執行緒將自身的狀態置為追隨者。這一模式實現複雜,但避免了執行緒間交換任務資料,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了領導者跟隨者模式實現。

Java中的ThreadPoolExecutor類

ThreadPoolExecutor的繼承結構:

java.uitl.concurrent.ThreadPoolExecutor類是執行緒池中最核心的一個類,因此如果要透徹地瞭解Java中的執行緒池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現原始碼。

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

在ThreadPoolExecutor類中提供了四個構造方法,但其實一共就7種引數型別:

下面解釋下一下構造器中各個引數的含義:

corePoolSize:核心池的大小,這個引數跟後面講述的執行緒池的實現原理有非常大的關係。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;

maximumPoolSize:執行緒池最大執行緒數,這個引數也是一個非常重要的引數,它表示線上程池中最多能建立多少個執行緒;

keepAliveTime:表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0;

unit:引數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
 

workQueue:一個阻塞佇列,用來儲存等待執行的任務,這個引數的選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,常用的workQueue型別:

  1. SynchronousQueue:接收到任務直接提交給執行緒處理,而不保留它,沒有空閒就新建一個執行緒來處理這個任務!所以為了保證不出現<執行緒數達到了maximumPoolSize而不能新建執行緒>的錯誤,使用這個型別佇列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大
  2. LinkedBlockingQueue:接收到任務的時候,如果當前執行緒數小於核心執行緒數,則新建執行緒(核心執行緒)處理任務;如果當前執行緒數等於核心執行緒數,則進入佇列等待。
  3. ArrayBlockingQueue:可以限定佇列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建執行緒(核心執行緒)執行任務,如果達到了,則入隊等候,如果佇列已滿,則新建執行緒(非核心執行緒)執行任務,又如果匯流排程數到了maximumPoolSize,並且佇列也滿了,則發生錯誤
  4. DelayQueue:佇列內元素必須實現Delayed介面,這就意味著你傳進去的任務必須先實現Delayed介面。這個佇列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務


           threadFactory:執行緒工廠,主要用來建立執行緒;

handler:表示當拒絕處理任務時的策略,有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 

ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)。

ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務 。

當我們向執行緒池提交任務時,通常使用execute方法,接下來就先從該方法開始分析。(我們都知道執行緒池是維護了一批執行緒來處理使用者提交的任務,達到執行緒複用的目的,執行緒池維護的這批執行緒被封裝成了Worker。)

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();

        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        else if (!addWorker(command, false))
            reject(command);
    }

分三個步驟進行:

1。如果執行的執行緒小於corePoolSize,請嘗試以給定的命令作為新執行緒的第一個執行緒任務。對addWorker的呼叫原子地檢查runState和workerCount,從而防止可能新增的錯誤警報執行緒,當它不應該,返回false。

2。如果任務可以成功排隊,那麼我們仍然需要再次檢查是否應該新增執行緒(因為上次檢查後已有的已經死亡)或其他原因進入該方法後,池關閉。所以我們重新檢查狀態,如果需要,回滾排隊停止,或啟動一個新的執行緒,如果沒有。

3。如果無法對任務進行排隊,則嘗試新增新的任務執行緒。如果它失敗了,我們知道我們被關閉或飽和了,所以拒絕這個任務。