1. 程式人生 > >ThreadPoolExecutor執行緒池解析與BlockingQueue的三種實現

ThreadPoolExecutor執行緒池解析與BlockingQueue的三種實現

目的

主要介紹ThreadPoolExecutor的用法,和較淺顯的認識,場景的使用方案等等,比較忙碌,如果有錯誤還請大家指出

ThreadPoolExecutor介紹

ThreadPoolExecutor的完整構造方法的簽名如下

ThreadPoolExecutor
(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler)
  1. corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads方法,執行緒池會提前建立並啟動所有基本執行緒。
  2. workQueue任務佇列):用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。

    1. ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。
    2. LinkedBlockingQueue:一個基於連結串列結構
      的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列
    3. SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
    4. PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列
  3. maximumPoolSize(執行緒池最大大小):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是如果使用了無界的任務佇列這個引數就沒什麼效果。

  4. ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒做些更有意義的事情,比如設定daemon和優先順序等等
  5. RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。以下是JDK1.5提供的四種策略。
    1. AbortPolicy:直接丟擲異常。
    2. CallerRunsPolicy:只用呼叫者所線上程來執行任務。
    3. DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
    4. DiscardPolicy:不處理,丟棄掉。
    5. 也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。
  6. keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。
  7. TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

根據上面的描述,我相信我們能夠在熟悉引數的情況下自定義自己的執行緒池,但是我們發現在jdk幫助文件裡面有這樣一句話

強烈建議程式設計師使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界執行緒池,可以進行自動執行緒回收)、Executors.newFixedThreadPool(int)(固定大小執行緒池)和Executors.newSingleThreadExecutor()(單個後臺執行緒),它們均為大多數使用場景預定義了設定。

執行緒池的工作方式

  1. 如果執行的執行緒少於 corePoolSize,則 Executor 始終首選新增新的執行緒,而不進行排隊。(什麼意思?如果當前執行的執行緒小於corePoolSize,則任務根本不會存放,新增到queue中
  2. 如果執行的執行緒等於或多於 corePoolSize,則 Executor 始終首選將請求加入佇列,而不新增新的執行緒
  3. 如果無法將請求加入佇列(佇列已滿),則建立新的執行緒,除非建立此執行緒超出 maximumPoolSize,如果超過,在這種情況下,新的任務將被拒絕。

那麼我們可以發現,佇列線上程池中是非常重要的角色,那麼Executors就是根據不同的佇列實現了功能不同的執行緒池,下面我們來看看

Executors包含的常用執行緒池

1.ExecutorService newFixedThreadPool(int nThreads):固定大小執行緒池。

public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS, 
                                      new LinkedBlockingQueue<Runnable>());  
    }  

我們可以發現,coresize和maxsize相同,超時時間為0,佇列用的LinkedBlockingQueue無界的FIFO佇列,這表示什麼,很明顯,這個執行緒池始終只有<size的執行緒在執行,同時超時時間為0,執行緒執行完後就關閉,而不會再等待超時時間,如果佇列裡面有執行緒任務的話就從佇列裡面取出執行緒,然後開啟一個新的執行緒開始執行

2.ExecutorService newCachedThreadPool():無界執行緒池

public static ExecutorService newCachedThreadPool() {  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                      60L, TimeUnit.SECONDS,  
                                      new SynchronousQueue<Runnable>());  
    }  

SynchronousQueue佇列,一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作。所以,當我們提交第一個任務的時候,是加入不了佇列的,這就滿足了,一個執行緒池條件“當無法加入佇列的時候,且任務沒有達到maxsize時,我們將新開啟一個執行緒任務”。所以我們的maxsize是big big。時間是60s,當一個執行緒沒有任務執行會暫時儲存60s超時時間,如果沒有的新的任務的話,會從cache中remove掉。

3.Executors.newSingleThreadExecutor();大小為1的固定執行緒池,這個其實就是newFixedThreadPool(1).關注newFixedThreadPool的用法就行

排隊策略

排隊有三種通用策略:
1. 直接提交。工作佇列的預設選項是 SynchronousQueue,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
2. 無界佇列。使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列
3. 有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。

使用直接提交策略,即SynchronousQueue。

首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,但是由於該Queue本身的特性,在某次新增元素後必須等待其他執行緒取走後才能繼續新增。在這裡不是核心執行緒便是新建立的執行緒,但是我們試想一樣下,下面的場景。

new ThreadPoolExecutor(  
            2, 3, 30, TimeUnit.SECONDS,   
            new SynchronousQueue<Runnable>(),   
            new RecorderThreadFactory("CookieRecorderPool"),   
            new ThreadPoolExecutor.CallerRunsPolicy());  

當核心執行緒已經有2個正在執行.
1. 此時繼續來了一個任務(A),根據前面介紹的“如果執行的執行緒等於或多於 corePoolSize,則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。”,所以A被新增到queue中。
2. 又來了一個任務(B),且核心2個執行緒還沒有忙完,OK,接下來首先嚐試1中描述,但是由於使用的SynchronousQueue,所以一定無法加入進去
3. 此時便滿足了上面提到的“如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出maximumPoolSize,在這種情況下,任務將被拒絕。”,所以必然會新建一個執行緒來執行這個任務。
4. 暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個新增入queue中,後一個呢?queue中無法插入,而執行緒數達到了maximumPoolSize,所以只好執行異常策略了。

所以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就可以避免上述情況發生(如果希望限制就直接使用有界佇列)。對於使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。
什麼意思?如果你的任務A1,A2有內部關聯,A1需要先執行,那麼先提交A1,再提交A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1麼有被執行前,A2不可能新增入queue中

使用無界佇列策略,即LinkedBlockingQueue

這個就拿newFixedThreadPool來說,根據前文提到的規則:如果執行的執行緒少於 corePoolSize,則 Executor 始終首選新增新的執行緒,而不進行排隊。那麼當任務繼續增加,會發生什麼呢?
如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumPoolSize,在這種情況下,任務將被拒絕。
這裡就很有意思了,可能會出現無法加入佇列嗎?不像SynchronousQueue那樣有其自身的特點,對於無界佇列來說,總是可以加入的(資源耗盡,當然另當別論)。換句說,永遠也不會觸發產生新的執行緒!corePoolSize大小的執行緒數會一直執行,忙完當前的,就從佇列中拿任務開始執行。所以要防止任務瘋長,比如任務執行的實行比較長,而新增任務的速度遠遠超過處理任務的時間,而且還不斷增加,如果任務記憶體大一些,不一會兒就爆了

有界佇列,使用ArrayBlockingQueue。

個是最為複雜的使用,所以JDK不推薦使用也有些道理。與上面的相比,最大的特點便是可以防止資源耗盡的情況發生。

new ThreadPoolExecutor(  
            2, 4, 30, TimeUnit.SECONDS,   
            new ArrayBlockingQueue<Runnable>(2),   
            new RecorderThreadFactory("CookieRecorderPool"),   
            new ThreadPoolExecutor.CallerRunsPolicy());  

假設,所有的任務都永遠無法執行完。對於首先來的A,B來說直接執行,接下來,如果來了C,D,他們會被放到queu中,如果接下來再來E,F,則增加執行緒執行E,F。但是如果再來任務,佇列無法再接受了,執行緒數也到達最大的限制了,所以就會使用拒絕策略來處理。

Summary

  1. ThreadPoolExecutor的使用還是很有技巧的。
  2. 使用無界queue可能會耗盡系統資源。
  3. 使用有界queue可能不能很好的滿足效能,需要調節執行緒數和queue大小
  4. 執行緒數自然也有開銷,所以需要根據不同應用進行調節。

通常來說對於靜態任務可以歸為:
1. 數量大,但是執行時間很短
2. 數量小,但是執行時間較長
3. 數量又大執行時間又長
4. 除了以上特點外,任務間還有些內在關係
5. CPU密集或者IO密集型任務

看完這篇問文章後,希望能夠可以選擇合適的型別。