1. 程式人生 > >hreadPoolExecutor使用和思考(上)-線程池大小設置與BlockingQueue的三種實現區別

hreadPoolExecutor使用和思考(上)-線程池大小設置與BlockingQueue的三種實現區別

滿足 interface 我會 很多 程序員 dong turn 插入 p s

閱讀更多

工作中多處接觸到了ThreadPoolExecutor。趁著現在還算空,學習總結一下。

前記:

  1. jdk官方文檔(javadoc)是學習的最好,最權威的參考。
  2. 文章分上中下。上篇中主要介紹ThreadPoolExecutor接受任務相關的兩方面入參的意義和區別,池大小參數corePoolSize和maximumPoolSize,BlockingQueue選型(SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue);中篇中主要聊聊與keepAliveTime這個參數相關的話題;下片中介紹一下一些比較少用的該類的API,及他的近親:
    ScheduledThreadPoolExecutor。
  3. 如果理解錯誤,請直接指出。

查看JDK幫助文檔,可以發現該類比較簡單,繼承自AbstractExecutorService,而AbstractExecutorService實現了ExecutorService接口。

ThreadPoolExecutor的完整構造方法的簽名是:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

先記著,後面慢慢解釋。

===============================神奇分割線==================================

其實對於ThreadPoolExecutor的構造函數網上有N多的解釋的,大多講得都很好,不過我想先換個方式,從Executors這個類入手。因為他的幾個構造工廠構造方法名字取得令人很容易了解有什麽特點。但是其實Executors類的底層實現便是ThreadPoolExecutor!

ThreadPoolExecutor是Executors類的底層實現。

在JDK幫助文檔中,有如此一段話:

強烈建議程序員使用較為方便的 Executors

工廠方法 Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)和 Executors.newSingleThreadExecutor()(單個後臺線程),它們均為大多數使用場景預定義了設置。”

可以推斷出ThreadPoolExecutor與Executors類必然關系密切。

===============================神奇分割線==================================

OK,那就來看看源碼吧,從newFixedThreadPool開始。

ExecutorService newFixedThreadPool(int nThreads):固定大小線程池。

可以看到,corePoolSize和maximumPoolSize的大小是一樣的(實際上,後面會介紹,如果使用無界queue的話maximumPoolSize參數是沒有意義的),keepAliveTime和unit的設值表名什麽?-就是該實現不想keep alive!最後的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特點,他是無界的

Java代碼 技術分享圖片
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

ExecutorService newSingleThreadExecutor():單線程。

可以看到,與fixedThreadPool很像,只不過fixedThreadPool中的入參直接退化為1

Java代碼 技術分享圖片
  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

ExecutorService newCachedThreadPool():無界線程池,可以進行自動線程回收。

這個實現就有意思了。首先是無界的線程池,所以我們可以發現maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue。可能對於該BlockingQueue有些陌生,簡單說:該QUEUE中,每個插入操作必須等待另一個

線程的對應移除操作。比如,我先添加一個元素,接下來如果繼續想嘗試添加則會阻塞,直到另一個線程取走一個元素,反之亦然。(想到什麽?就是緩沖區為1的生產者消費者模式^_^)

註意到介紹中的自動回收線程的特性嗎,為什麽呢?先不說,但註意到該實現中corePoolSize和maximumPoolSize的大小不同。

Java代碼 技術分享圖片
  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

===============================神奇分割線==================================

到此如果有很多疑問,那是必然了(除非你也很了解了)

先從BlockingQueue<Runnable> workQueue這個入參開始說起。在JDK中,其實已經說得很清楚了,一共有三種類型的queue。以下為引用:(我會稍微修改一下,並用紅色突出顯示)

所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此隊列與池大小進行交互:
  • 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。(什麽意思?如果當前運行的線程小於corePoolSize,則任務根本不會存放,添加到queue中,而是直接抄家夥(thread)開始運行
  • 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列而不添加新的線程
  • 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。
先不著急舉例子,因為首先需要知道queue上的三種類型。
排隊有三種通用策略:
  1. 直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
  2. 無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
  3. 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

===============================神奇分割線==================================

到這裏,該了解的理論已經夠多了,可以調節的就是corePoolSize和maximumPoolSizes 這對參數還有就是BlockingQueue的選擇。

例子一:使用直接提交策略,也即SynchronousQueue。

首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,但是由於該Queue本身的特性在某次添加元素後必須等待其他線程取走後才能繼續添加。在這裏不是核心線程便是新創建的線程,但是我們試想一樣下,下面的場景。

我們使用一下參數構造ThreadPoolExecutor:

Java代碼 技術分享圖片
  1. new ThreadPoolExecutor(
  2. 2, 3, 30, TimeUnit.SECONDS,
  3. new <span style="white-space: normal;">SynchronousQueue</span><Runnable>(),
  4. new RecorderThreadFactory("CookieRecorderPool"),
  5. new ThreadPoolExecutor.CallerRunsPolicy());

當核心線程已經有2個正在運行.

  1. 此時繼續來了一個任務(A),根據前面介紹的“如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列而不添加新的線程。”,所以A被添加到queue中。
  2. 又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由於使用的SynchronousQueue,所以一定無法加入進去。
  3. 此時便滿足了上面提到的“如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕。”,所以必然會新建一個線程來運行這個任務。
  4. 暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,後一個呢?queue中無法插入,而線程數達到了maximumPoolSize,所以只好執行異常策略了。
所以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就可以避免上述情況發生(如果希望限制就直接使用有界隊列)。對於使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖
什麽意思?如果你的任務A1,A2有內部關聯,A1需要先運行,那麽先提交A1,再提交A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1麽有被執行前,A2不可能添加入queue中
例子二:使用無界隊列策略,即LinkedBlockingQueue
這個就拿newFixedThreadPool來說,根據前文提到的規則: 寫道 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。 那麽當任務繼續增加,會發生什麽呢? 寫道 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。

OK,此時任務變加入隊列之中了,那什麽時候才會添加新線程呢?

寫道 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。

這裏就很有意思了,可能會出現無法加入隊列嗎?不像SynchronousQueue那樣有其自身的特點,對於無界隊列來說,總是可以加入的(資源耗盡,當然另當別論)。換句說,永遠也不會觸發產生新的線程!corePoolSize大小的線程數會一直運行,忙完當前的,就從隊列中拿任務開始運行。所以要防止任務瘋長,比如任務運行的實行比較長,而添加任務的速度遠遠超過處理任務的時間,而且還不斷增加,如果任務內存大一些,不一會兒就爆了,呵呵。

可以仔細想想哈。

例子三:有界隊列,使用ArrayBlockingQueue。

這個是最為復雜的使用,所以JDK不推薦使用也有些道理。與上面的相比,最大的特點便是可以防止資源耗盡的情況發生。

舉例來說,請看如下構造方法:

Java代碼 技術分享圖片
  1. new ThreadPoolExecutor(
  2. 2, 4, 30, TimeUnit.SECONDS,
  3. new ArrayBlockingQueue<Runnable>(2),
  4. new RecorderThreadFactory("CookieRecorderPool"),
  5. new ThreadPoolExecutor.CallerRunsPolicy());

假設,所有的任務都永遠無法執行完。

對於首先來的A,B來說直接運行,接下來,如果來了C,D,他們會被放到queu中,如果接下來再來E,F,則增加線程運行E,F。但是如果再來任務,隊列無法再接受了,線程數也到達最大的限制了,所以就會使用拒絕策略來處理。

總結:

  1. ThreadPoolExecutor的使用還是很有技巧的。
  2. 使用無界queue可能會耗盡系統資源。
  3. 使用有界queue可能不能很好的滿足性能,需要調節線程數和queue大小
  4. 線程數自然也有開銷,所以需要根據不同應用進行調節。
通常來說對於靜態任務可以歸為:
  1. 數量大,但是執行時間很短
  2. 數量小,但是執行時間較長
  3. 數量又大執行時間又長
  4. 除了以上特點外,任務間還有些內在關系
看完這篇問文章後,希望能夠可以選擇合適的類型了技術分享圖片

45
1
踩 分享到: 技術分享圖片 技術分享圖片 ThreadPoolExecutor使用和思考(中)-keepAl ... | MySQL中當記錄更新時 timestamp類型自動更 ...
  • 2011-02-08 19:39
  • 瀏覽 154488
  • 評論(28)
  • 分類:編程語言
  • 查看更多
評論
28 樓 生亦何歡 2016-12-15 bravekingzhang 寫道樓主的基礎知識比較薄弱,SynchronousQueue就是生產者消費者?no!BlockingQueue的子類其實都可以去搞一搞生產者消費者。但是基於樓主舉例子到時不錯,我也不噴。很久以前的記憶了,只是現在忘記了,來這裏會議一下。
這時候你應該跟樓主一樣,寫一篇詳細的博客來解釋queue,而不是在別人的評論裏面說別人怎麽的怎麽的(真心蛋疼)。 27 樓 bravekingzhang 2015-11-30 ymwcwee 寫道ThreadPoolExecutor與Executors.newCachedThreadPool,哪個性能更好些?
恰好看到,就回答一下吧,現在人不註重基礎知識也是醉了,也不去話時間讀讀源碼。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

這是源碼,你看看返回的是個什麽。在讓你看看他們的關系
public interface ExecutorService extends Executor
public abstract class AbstractExecutorService implements ExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService

明白了吧。額 26 樓 bravekingzhang 2015-11-30 樓主的基礎知識比較薄弱,SynchronousQueue就是生產者消費者?no!BlockingQueue的子類其實都可以去搞一搞生產者消費者。但是基於樓主舉例子到時不錯,我也不噴。很久以前的記憶了,只是現在忘記了,來這裏會議一下。 25 樓 ymwcwee 2015-10-27 ThreadPoolExecutor與Executors.newCachedThreadPool,哪個性能更好些? 24 樓 xuyfiei 2015-03-13 最後的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特點,他是無界的。LinkedBlockingQueue是有界的,你說的無界是在沒有設置capacity的情況下吧,(雖然挖墳是不對的 23 樓 concurrency 2014-10-30 "用SynchronousQueue。可能對於該BlockingQueue有些陌生,簡單說:該QUEUE中,每個插入操作必須等待另一個
線程的對應移除操作。比如,我先添加一個元素,接下來如果繼續想嘗試添加則會阻塞,直到另一個線程取走一個元素,反之亦然。(想到什麽?就是緩沖區為1的生產者消費者模式^_^)"
樓主這段寫的有問題哈,SynchronousQueue不能這樣理解,絕對不是緩沖區為1的生產者消費者模式。如果在第一個線程裏要添加一個元素,那麽添加成功的前提是必須要有另一個線程已經在等待取出這個元素了。否則第一個線程一直會被阻塞。 22 樓 lunwen 2014-09-11 是Executor而不是Executors,Executor是接口,底層實現之一是ThreadPoolExecutor 21 樓 Jnerd 2014-01-18 技術分享圖片 20 樓 onthewanying 2013-10-21 cmxp2008 寫道引用此時繼續來了一個任務(A),根據前面介紹的“如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。”,所以A被添加到queue中。
又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由於使用的SynchronousQueue,所以一定無法加入進去。

這裏描述的場景不太對吧!
SynchronousQueue隊列是不會保存任何任務的。由於兩個core線程都在忙,沒有空閑線程等在SynchronousQueue隊列的出口取任務,此時A任務的offer(e)操作一定是返回false的。所以ThreadPoolExecutor會再創建一個線程來承接A任務。等到B任務進來時,如果前面3個線程仍然都在忙,那麽B任務就會因為當前線程數達到maximumPoolSize值,而被拒絕!
恩,我測試了下,確實如18樓所說,另外我發現當使用SynchronousQueue的時候,發現設置的corePoolSize沒有用,也就是說就算設置corePoolSize不為0,當任務結束後,該線程還是會被銷毀(通過調用ThreadPoolExecutor.getPoolSize()查看當前線程池線程數),大家可以測試下,明白原理的解釋下 19 樓 quitgame 2013-08-12 LinkedBlockingQueue為什麽會是無界隊列?LinkedBlockingQueue一樣可以指定capacity啊。

我理解LinkedBlockingQueue和ArrayBlockingQueue只是內部數據結構上的差異。 18 樓 cmxp2008 2013-06-04 引用此時繼續來了一個任務(A),根據前面介紹的“如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。”,所以A被添加到queue中。
又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由於使用的SynchronousQueue,所以一定無法加入進去。

這裏描述的場景不太對吧!
SynchronousQueue隊列是不會保存任何任務的。由於兩個core線程都在忙,沒有空閑線程等在SynchronousQueue隊列的出口取任務,此時A任務的offer(e)操作一定是返回false的。所以ThreadPoolExecutor會再創建一個線程來承接A任務。等到B任務進來時,如果前面3個線程仍然都在忙,那麽B任務就會因為當前線程數達到maximumPoolSize值,而被拒絕! 17 樓 manxisuo 2013-04-28 贊 寫得很好!技術分享圖片 16 樓 qq346 2013-03-28 通俗易懂。。。有幫助 15 樓 xiawared 2013-01-25 不錯不錯。解釋的清晰,繼續啊。。 14 樓 gboystal 2013-01-11 引用什麽意思?如果你的任務A1,A2有內部關聯,A1需要先運行,那麽先提交A1,再提交A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1麽有被執行前,A2不可能添加入queue中

弱弱地問一句,如果是順序執行的話為什麽還要用線程池來並發呢? 13 樓 gboystal 2013-01-11 引用什麽意思?如果你的任務A1,A2有內部關聯,A1需要先運行,那麽先提交A1,再提交A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1麽有被執行前,A2不可能添加入queue中

弱弱地問一句,如果是順序執行的話為什麽還要用線程池來並發呢? 12 樓 xusemon 2012-12-14 引用Java代碼
1.new ThreadPoolExecutor(
2. 2, 3, 30, TimeUnit.SECONDS,
3. new <SPAN style="WHITE-SPACE: normal">SynchronousQueue</SPAN><Runnable>(),
4. new RecorderThreadFactory("CookieRecorderPool"),
5. new ThreadPoolExecutor.CallerRunsPolicy());

new ThreadPoolExecutor(
2, 3, 30, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new RecorderThreadFactory("CookieRecorderPool"),
new ThreadPoolExecutor.CallerRunsPolicy()); 當核心線程已經有2個正在運行.



1.此時繼續來了一個任務(A),根據前面介紹的“如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。”,所以A被添加到queue中。
2.又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由於使用的SynchronousQueue,所以一定無法加入進去。
3.此時便滿足了上面提到的“如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕。”,所以必然會新建一個線程來運行這個任務。
4.暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,後一個呢?queue中無法插入,而線程數達到了maximumPoolSize,所以只好執行異常策略了。
想問下,3小點創建新的線程是執行任務A呢還是任務B。按下面的解釋引用對於使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。應該是執行任務A啦?
那麽ArraylistBlockingQueue和LinkedlistBlockingQueue是否也這樣,都是按FIFO的順序來執行的? 11 樓 lionfox 2012-07-30 理解理解了,謝謝 10 樓 promzaid 2012-07-17 我想知道當一個線程執行完畢之後,從blockingqueue中取出等待的線程,這段代碼在什麽地方,是定時讀取這個queue還是用什麽方法? 9 樓 yzhw 2012-05-09 引用
例子一:使用直接提交策略,也即SynchronousQueue。



首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,但是由於該Queue本身的特性,在某次添加元素後必須等待其他線程取走後才能繼續添加。在這裏不是核心線程便是新創建的線程,但是我們試想一樣下,下面的場景。



我們使用一下參數構造ThreadPoolExecutor:

Java代碼 技術分享圖片
  1. new ThreadPoolExecutor(
  2. 2, 4, 30, TimeUnit.SECONDS,
  3. new ArrayBlockingQueue<Runnable>(2),
  4. new RecorderThreadFactory("CookieRecorderPool"),
  5. new ThreadPoolExecutor.CallerRunsPolicy());


當核心線程已經有2個正在運行.



此時繼續來了一個任務(A),根據前面介紹的“如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。”,所以A被添加到queue中。
又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由於使用的SynchronousQueue,所以一定無法加入進去。
此時便滿足了上面提到的“如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕。”,所以必然會新建一個線程來運行這個任務。
暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,後一個呢?queue中無法插入,而線程數達到了maximumPoolSize,所以只好執行異常策略了。


這個是向隊列裏放一個任務再來第二個任務時創建線程呢,還是直接創建線程直接提交任務,不緩存隊列? (看解釋貌似是先緩存任務A再來任務B的時候才無法向隊列中添加)
? 上一頁 1 2 下一頁 ?
發表評論

技術分享圖片 您還沒有登錄,請您登錄後再發表評論

原文地址:http://dongxuan.iteye.com/blog/901689

hreadPoolExecutor使用和思考(上)-線程池大小設置與BlockingQueue的三種實現區別