Java併發核心基礎——執行緒池使用及底層實現機制詳解
阿新 • • 發佈:2019-01-09
Java執行緒池概述:
從使用入手:
java.util.concurrent.Executosr是執行緒池的靜態工廠,我們通常使用它方便地生產各種型別的執行緒池,主要的方法有三種:
1、newSingleThreadExecutor()——建立一個單執行緒的執行緒池
2、newFixedThreadPool(int n)——建立一個固定大小的執行緒池
3、newCachedThreadPool()——建立一個可快取的執行緒池
1、SingleThreadExecutor
特點:單執行緒序列工作,如果這個唯一的執行緒因為異常終止,則有一個新的執行緒來替代它。
2、FixedThreadPool
特點:固定大小的執行緒池,如果設定的所有執行緒都在執行,新任務會在任務佇列等待。
3、CachedThreadPool
特點:大小可伸縮的執行緒池。如果當前沒有可用執行緒,則建立一個執行緒。在執行結束後快取60s,如果不被呼叫則移除執行緒。呼叫execute()方法時可以重用快取中的執行緒。適用於很多短期非同步任務的環境,可以提高程式效能。
以CachedThreadPool為例:
其中,我們不光會使用excute()方法執行任務,還會使用submit()方法,submit方法主要適用於使用Callable的情況,區別主要有如下幾點: 1、接收引數不一樣,excute()方法需要一個Runnable型別引數,而submit方法需要一個Callable<T>型別引數。 2、返回值不同,excute()方法沒有返回值,submit方法會返回Future<T>型別返回值。 3、submit方法適合處理異常。執行的任務裡會丟擲checked或者unchecked exception,而你又希望外面的呼叫者能夠感知這些exception並做出及時的處理,那麼就需要用到submit,通過捕獲Future.get丟擲的異常。
使用執行緒池的好處:
1、降低資源消耗。重複利用已建立執行緒,降低執行緒建立與銷燬的資源消耗。
2、提高響應效率。任務到達時,不需等待建立執行緒就能立即執行。
3、提高執行緒可管理性。
4、防止伺服器過載。記憶體溢位、CPU耗盡。
執行緒池應用範圍:
1、需要大量的執行緒來完成任務,並且完成所需時間較短。如Web伺服器完成網頁請求。
2、對效能有苛刻要求。如伺服器實時響應。
3、突發性大量請求,且不至於在伺服器產生大量執行緒。
介紹執行緒池實現機制之前:
關於執行緒池一些比較重要的類或介面:
(1)ExecutorService是真正的執行緒池介面,所以我們在通過Excutors建立各種執行緒時,一般採用如下程式碼:
ExecutorService threadPool = Executors.newXXX();
先宣告一個執行緒池介面,執行時動態繫結具體的執行緒池物件,典型的介面的用法(雖然ExecutorService名字起的不像個介面……)。
從這段程式碼還可以看出,(2)Executors是靜態工廠的功能(雖然名字也不像個工廠……),生產各種型別執行緒池。
需要注意的是,要區分(3)Executor與Executors,Executor是執行緒池的頂級介面,但它只是一個執行執行緒的工具,真正的執行緒池介面是ExecutorService。
(4)AbstractExecutorService實現了ExecutorService介面,實現了其中大部分的方法(有沒有實現的,所以被宣告為Abstract)。
(5)ThreadPoolExecutor,繼承了AbstractExecutorService,是ExecutorService的預設實現,也是一會將要介紹的重點。
這五個類或介面實現了從執行緒池頂層介面到底層實現的整個架構。
其它的帶有Schedule關鍵字的類或介面與實現週期性重複性工作相關,不是本篇考慮的重點。類圖如下:
除了繼承Thread、實現Runnable、Callable三種建立執行緒方式外的第四種建立方式: 實現java.util.concurrent.ThreadFactory介面,實現newThread(Runnable r)方法。 這種方式應用於這樣一種場景:我們需要一個執行緒池,並且對於執行緒池中的執行緒物件、賦予統一的名字、優先順序,以及一些其他統一操作,使用這樣的工廠方式就是優秀程式設計師應該使用的最好的方法。 執行緒池工作機制及原理: 先看看Excutors建立各種執行緒池的原始碼: 1、newSingleThreadExecutor()
測試結果:public class MyCachedThreadPool { public static void main(String[] args){ //建立Runnable物件,實現run()方法 Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " running!"); try { //為了體現出任務競爭資源,讓執行緒休眠1000ms Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end!"); } }; //建立執行緒池,加入任務,執行任務 ExecutorService myThreadPool = Executors.newCachedThreadPool(); myThreadPool.execute(runnable); myThreadPool.execute(runnable); myThreadPool.execute(runnable); myThreadPool.execute(runnable); myThreadPool.execute(runnable); //關閉執行緒池 myThreadPool.shutdown(); } }
其中,我們不光會使用excute()方法執行任務,還會使用submit()方法,submit方法主要適用於使用Callable的情況,區別主要有如下幾點: 1、接收引數不一樣,excute()方法需要一個Runnable型別引數,而submit方法需要一個Callable<T>型別引數。 2、返回值不同,excute()方法沒有返回值,submit方法會返回Future<T>型別返回值。 3、submit方法適合處理異常。執行的任務裡會丟擲checked或者unchecked exception,而你又希望外面的呼叫者能夠感知這些exception並做出及時的處理,那麼就需要用到submit,通過捕獲Future.get丟擲的異常。
除了繼承Thread、實現Runnable、Callable三種建立執行緒方式外的第四種建立方式: 實現java.util.concurrent.ThreadFactory介面,實現newThread(Runnable r)方法。 這種方式應用於這樣一種場景:我們需要一個執行緒池,並且對於執行緒池中的執行緒物件、賦予統一的名字、優先順序,以及一些其他統一操作,使用這樣的工廠方式就是優秀程式設計師應該使用的最好的方法。 執行緒池工作機制及原理: 先看看Excutors建立各種執行緒池的原始碼: 1、newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
2、newFixedThreadPool(int n)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3、newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
發現所有實現都是建立了一個ThreadPoolExecutor例項(ThreadPoolExecutor是ExecutorServiec預設實現類,加深印象),
其中ThreadPoolExecutor的建構函式中有幾個引數,現在介紹這些引數,是理解執行緒池工作原理的重要方式:
1、第一個引數:int corePoolSIze,核心池大小,也就是執行緒池中會維持不被釋放的執行緒數量。我們可以看到FixedThreadPool中這個引數值就是設定的執行緒數量,而SingleThreadExcutor中就是1,newCachedThreadPool中就是0,不會維持,只會快取60L。但需要注意的是,線上程池剛建立時,裡面並沒有建好的執行緒,只有當有任務來的時候才會建立(除非呼叫方法prestartAllCoreThreads()與prestartCoreThread()方法),在corePoolSize數量範圍的執行緒在完成任務後不會被回收。
2、第二個引數:int maximumPoolSize,執行緒池的最大執行緒數,代表著執行緒池中能建立多少執行緒池。超出corePoolSize,小於maximumPoolSize的執行緒會在執行任務結束後被釋放。此配置在CatchedThreadPool中有效。
3、第三個引數:long keepAliveTime,剛剛說到的會被釋放的執行緒快取的時間。我們可以看到,正如我們所說的,在CachedThreadPool()構造過程中,會被設定快取時間為60s(時間單位由第四個引數控制)。
4、第四個引數:TimeUnit unit,設定第三個引數keepAliveTime的時間單位。
5、第五個引數:儲存等待執行任務的阻塞佇列,有多種選擇,分別介紹:
SynchronousQueue——直接提交策略,適用於CachedThreadPool。它將任務直接提交給執行緒而不保持它們。如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求最大的 maximumPoolSize 以避免拒絕新提交的任務(正如CachedThreadPool這個引數的值為Integer.MAX_VALUE)。當任務以超過佇列所能處理的量、連續到達時,此策略允許執行緒具有增長的可能性。吞吐量較高。
LinkedBlockingQueue——無界佇列,適用於FixedThreadPool與SingleThreadExcutor。基於連結串列的阻塞佇列,建立的執行緒數不會超過corePoolSizes(maximumPoolSize值與其一致),當執行緒正忙時,任務進入佇列等待。按照FIFO原則對元素進行排序,吞吐量高於ArrayBlockingQueue。
ArrayListBlockingQueue——有界佇列,有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。
ThreadPoolExcutor的構造方式不僅有這一種,總共有四種,還可以在最後加入一個引數以控制執行緒池任務超額處理策略:
當用來快取待處理任務的佇列已滿時,又加入了新的任務,那麼這時候就該考慮如何處理這個任務。
可以通過實現RejectedExceptionHandler介面,實現rejectedException(ThreadPoolExecutor e, Runnable r)方法自定義操作。但通常我們使用JDK提供了4種處理策略,在ThreadPoolExecutor構造時以引數傳入:
ThreadPoolExcutor.AbortPolicy()——直接丟擲異常,預設操作
ThreadPoolExcutor.CallerRunsPolicy()——只用呼叫者所線上程來執行任務
ThreadPoolExcutor.DiscardOldersPolicy()——丟棄佇列裡最近的一個任務,並執行當前任務
ThreadPoolExcutor.DiscardPolicy()——不處理,直接丟掉
關於excute()方法,它的執行實際上分了三步:
1、當少量的執行緒在執行,執行緒的數量還沒有達到corePoolSize,那麼啟用新的執行緒來執行新任務。
2、如果執行緒數量已經達到了corePoolSize,那麼嘗試把任務快取起來,然後再次檢查執行緒池的狀態,看這個時候是否能新增一個額外的執行緒,來執行這個任務。如果這個檢查到執行緒池關閉了,就拒絕任務。
3、如果我們沒法快取這個任務,那麼我們就嘗試去新增執行緒去執行這個任務,如果失敗,可能任務已被取消或者任務佇列已經飽和,就拒絕掉這個任務。
可以參考原始碼理解:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
其中,執行緒池會把每個執行緒封裝成一個Worker物件,由addWorker(Runnable firstTask, boolean core)方法控制,firstTask代表執行緒池首要執行的任務,core代表是否使用corePoolSize引數作為執行緒池最大標記。
參考資料:
《Java併發程式設計從入門到精通》
http://www.cnblogs.com/wanqieddy/p/3853863.html