1. 程式人生 > >ThreadPoolExecutor機制&執行緒池詳解

ThreadPoolExecutor機制&執行緒池詳解

一、概述
1.執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務時提供增強的效能,並且還可以提供繫結和管理資源(包括執行任務集時使用的執行緒)的方法。每個 ThreadPoolExecutor 還維護著一些基本的統計資料,如完成的任務數;
2.ThreadPoolExecutor作為java.util.concurrent包對外提供的基礎實現,以內部執行緒池的形式對外提供管理任務執行、執行緒排程、執行緒池管理等等服務;
3.Executors方法提供的執行緒服務,都是通過引數設定來實現不同的執行緒池機制。如 Executors.newCachedThreadPool()(無界執行緒池,可以進行自動執行緒回收)、Executors.newFixedThreadPool(int)(固定大小執行緒池)和 Executors.newSingleThreadExecutor()(單個後臺執行緒),它們均為大多數使用場景預定義了設定。

二、核心構造方法引數講解
corePoolSize:核心執行緒池大小;
maximumPoolSize:最大執行緒池大小;
keepAliveTime:執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間;
TimeUnit:keepAliveTime的時間單位;
workQueue:阻塞任務佇列;
threadFactory:新建執行緒工廠;
RejectedExecutionHandler:當提交任務數量超過maximumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler處理。

重點講解:
其中比較容易讓人誤解的是:corePoolSize、maximumPoolSize、workQueue之間的關係:
1.當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒;
2.當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行;
3.當workQueue已滿,且maximumPoolSize>CorePoolSize時,新提交的任務會建立新執行緒執行任務;
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理;
5.當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,關閉空閒執行緒;
6.當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉。

三、Executors提供的執行緒池配置方案
1.構造一個固定執行緒數目的執行緒池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界的LinkedBlockingQueue存放阻塞任務,因此多餘的任務將存在在阻塞佇列,不會由RejectedExecutionHandler處理:
    public static ExecutorService newFixedThreadPool(int nThreads) {  
            return new ThreadPoolExecutor(nThreads, nThreads,  
                                          0L, TimeUnit.MILLISECONDS,  
                                          new LinkedBlockingQueue<Runnable>());  
        }
2.構造一個緩衝功能的執行緒池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s以及一個無容量的阻塞佇列SynchronousQueue,因此任務提交之後,將會建立新的執行緒執行,執行緒空閒超過60s將會銷燬:
    public static ExecutorService newCachedThreadPool() {  
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                          60L, TimeUnit.SECONDS,  
                                          new SynchronousQueue<Runnable>());  
        }  
3.構造一個只支援一個執行緒的執行緒池,配置corePoolSize=maximumPoolSize=1,無界阻塞佇列LinkedBlockingQueue,保證任務由一個執行緒序列執行:
    public static ExecutorService newSingleThreadExecutor() {  
            return new FinalizableDelegatedExecutorService  
                (new ThreadPoolExecutor(1, 1,  
                                        0L, TimeUnit.MILLISECONDS,  
                                        new LinkedBlockingQueue<Runnable>()));  
        }  

四、自定義執行緒池的注意要點(相關程式碼見url)
1、用ThreadPoolExecutor自定義執行緒池,看執行緒是的用途,如果任務量不大,可以用無界佇列,如果任務量非常大,要用有界佇列,防止OOM;
2、如果任務量很大,還要求每個任務都處理成功,要對提交的任務進行阻塞提交,重寫拒絕機制,改為阻塞提交。保證不拋棄一個任務;
3、最大執行緒數一般設為2N+1最好,N是CPU核數;
4、核心執行緒數,看應用,如果是任務,一天跑一次,設定為0,合適,因為跑完就停掉了,如果是常用執行緒池,看任務量,是保留一個核心還是幾個核心執行緒數;
5、如果要獲取任務執行結果,用CompletionService,但是注意,獲取任務的結果的要重新開一個執行緒獲取,如果在主執行緒獲取,就要等任務都提交後才獲取,就會阻塞大量任務結果,佇列過大OOM,所以最好非同步開個執行緒獲取結果。

五、執行緒池狀態含義:
1.running:接收新任務並處理阻塞佇列裡的任務;
2.shutdown:拒絕新任務但是處理阻塞佇列裡的任務;
3.stop:拒絕新任務並且拋棄阻塞佇列裡的任務同時會中斷正在處理的任務;
4.tidying:所有任務都執行完(包括阻塞佇列裡的任務),當執行緒池裡活動執行緒為0,將呼叫terminated方法;
5.terminated:終止狀態,terminated方法呼叫完成以後的狀態。

六、按需構造
預設情況下,即使核心執行緒最初只是在新任務到達時才建立和啟動的,也可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 對其進行動態重寫。如果構造帶有非空佇列的池,則可能希望預先啟動執行緒。

七、建立新執行緒
使用 ThreadFactory 建立新執行緒。如果沒有另外說明,則在同一個 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 建立執行緒,並且這些執行緒具有相同的 NORM_PRIORITY 優先順序和非守護程序狀態。通過提供不同的 ThreadFactory,可以改變執行緒的名稱、執行緒組、優先順序、守護程序狀態,等等。如果從 newThread 返回 null 時 ThreadFactory 未能建立執行緒,則執行程式將繼續執行,但不能執行任何任務。

八、排隊
所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此佇列與池大小進行互動:
1.如果執行的執行緒少於 corePoolSize,則 Executor 始終首選新增新的執行緒,而不進行排隊。
2.如果執行的執行緒等於或多於 corePoolSize,則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。
3.如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumPoolSize,在這種情況下,任務將被拒絕。

九、排隊的三種通用策略
1.直接提交:工作佇列的預設選項是 SynchronousQueue,它將任務直接提交給執行緒而不保持它們(沒有容量)。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
2.無界佇列:使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web 頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
3.有界佇列:當使用有限的 maximumPoolSizes 時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。

十、被拒絕的任務
當 Executor 已經關閉,或者 Executor 將有限邊界用於最大執行緒和工作佇列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種情況下,execute 方法都將呼叫其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預定義的處理程式策略:
1.在預設的 ThreadPoolExecutor.AbortPolicy 中,處理程式遭到拒絕將丟擲執行時 RejectedExecutionException。
2.在 ThreadPoolExecutor.CallerRunsPolicy 中,執行緒呼叫執行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
3.在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。
4.在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)。定義和使用其他種類的 RejectedExecutionHandler 類也是可能的,但這樣做需要非常小心,尤其是當策略僅用於特定容量或排隊策略時。

十一 hook(鉤子)方法
ThreadPoolExecutor提供 protected 可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執行每個任務之前和之後呼叫。它們可用於操縱執行環境;例如,重新初始化 ThreadLocal、蒐集統計資訊或新增日誌條目。此外,還可以重寫方法 terminated() 來執行 Executor 完全終止後需要完成的所有特殊處理。如果鉤子 (hook) 或回撥方法丟擲異常,則內部輔助執行緒將依次失敗並突然終止。

十二 佇列維護
方法 getQueue() 允許出於監控和除錯目的而訪問工作佇列。強烈反對出於其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行儲存回收。

十三 終止
程式 AND 不再引用的池沒有剩餘執行緒會自動 shutdown。如果希望確保回收取消引用的池(即使使用者忘記呼叫 shutdown()),則必須安排未使用的執行緒最終終止:設定適當保持活動時間,使用 0 核心執行緒的下邊界和/或設定 allowCoreThreadTimeOut(boolean)。