【Java多執行緒】執行緒池的工作原理詳解(下)
接著上篇文章,我接下來繼續介紹執行緒池的工作原理,如果你還沒有看上篇,我建議最好瀏覽一下:執行緒池的工作原理詳解(上)
Executors 工具類
1.定義
Executors是java執行緒池的工廠類,通過它可以快速初始化一個符合業務需求的執行緒池。
2.常用方法
(1) newFixedThreadPool方法 :建立固定大小的執行緒池
- 適用於為了滿足資源管理的需求,而需要限制當前執行緒數量的應用場景,即負載比較重的伺服器
FixedThreadPool方法原始碼如下:
public static ExecutorService newFixedThreadPool (int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
分析:從原始碼可以看出,其實是初始化了一個指定執行緒數的執行緒池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞佇列,如果執行緒池沒有可執行任務時,也不會釋放執行緒。
執行FixedThreadPool圖解:
分析:
1)如果當前執行的執行緒數少於corePoolSize,則建立新執行緒來執行任務。
2)線上程池完成預熱之後(當前執行的執行緒數等於corePoolSize),將任務加入LinkedBlockingQueue。
3)執行緒執行完1)中的任務後,會在迴圈中反覆從LinkedBlockingQueue獲取任務來執行。
FixedThreadPool使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為
Integer.MAX_VALUE)。使用無界佇列作為工作佇列會對執行緒池帶來如下影響。
①當執行緒池中的執行緒數達到corePoolSize後,新任務將在無界佇列中等待,因此執行緒池中
的執行緒數不會超過corePoolSize。
②由於①,使用無界佇列時maximumPoolSize將是一個無效引數。
③ 由於①和②,使用無界佇列時keepAliveTime將是一個無效引數。
④ 由於使用無界佇列,執行中的FixedThreadPool(未執行方法shutdown()或
shutdownNow())不會拒絕任務(不會呼叫RejectedExecutionHandler.rejectedExecution方法)。
(2)newCachedThreadPool() 方法 :快取執行緒池,執行緒池的數量不固定,可以根據需求自動的更改數量。
大小無界的執行緒池,適用於執行很多的短期非同步任務的小程式,或是負載較輕的伺服器。
CachedThreadPool方法原始碼如下
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
分析:
初始化一個可以快取執行緒的執行緒池,預設快取60s,執行緒池的執行緒數可達到Integer.MAX_VALUE,即2147483647,內部使用不儲存元素的SynchronousQueue作為阻塞佇列;
newCachedThreadPool在沒有任務執行時,當執行緒的空閒時間超過keepAliveTime,會自動釋放執行緒資源,當提交新任務時,如果沒有空閒執行緒,則建立新執行緒執行任務,會導致一定的系統開銷;
執行CachedThreadPool圖解:
分析:
① :首先執行SynchronousQueue.offer(Runnable task)。如果當前maximumPool中有空閒執行緒正在執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那麼主執行緒執行offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行,execute()方法執行完成;否則執行下面的 2)
② :當初始maximumPool為空,或者maximumPool中當前沒有空閒執行緒時,將沒有執行緒執行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,①將失敗。此時CachedThreadPool會建立一個新執行緒執行任務,execute()方法執行完成。
③ :在 ②中新建立的執行緒將任務執行完後,會執行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這個poll操作會讓空閒執行緒最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內主執行緒提交了一個新任務(主執行緒執行①),那麼這個空閒執行緒將執行主執行緒提交的新任務;否則,這個空閒執行緒將終止。由於空閒60秒的空閒執行緒會被終止,因此長時間保持空閒的CachedThreadPool不會使用任何資源。
CachedThreadPool使用SynchronousQueue,把主執行緒提交的任務傳遞給空閒執行緒執行。
CachedThreadPool中任務傳遞示意圖如下:
(3)newSingleThreadExecutor() 方法 : 建立單個執行緒池。執行緒池中只有一個執行緒
適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個執行緒是活動的應用場景。
SingleThreadExecutor方法原始碼如下
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
分析:
初始化的執行緒池中只有一個執行緒,如果該執行緒異常結束,會重新建立一個新的執行緒繼續執行任務,唯一的執行緒可以保證所提交任務的順序執行,內部使用LinkedBlockingQueue作為阻塞佇列。
執行SingleThreadExecutor圖解:
分析:
1)如果當前執行的執行緒數少於corePoolSize(即執行緒池中無執行的執行緒),則建立一個新執行緒來執行任務。
2)線上程池完成預熱之後(當前執行緒池中有一個執行的執行緒),將任務加入Linked-BlockingQueue。
3)執行緒執行完1中的任務後,會在一個無限迴圈中反覆從LinkedBlockingQueue獲取任務來執行。
(4)newScheduledThreadPool() 方法: 建立固定大小的執行緒,可以延遲或定時的執行任務。
適用於需要多個後臺執行緒執行週期任務,同時為了滿足資源 管理的需求而需要限制後臺執行緒的數量的應用場景
newScheduledThreadPool方法原始碼如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
- ScheduledThreadPoolExecutor原始碼
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
- 繼承父類的構造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
分析:初始化的執行緒池可以在指定的時間內週期性的執行所提交的任務,在實際的業務場景中可以使用該執行緒池定期的同步資料。其實最底層還是通過ThreadPoolExecutor類實現的
執行ScheduledThreadPoolExecutor圖解
分析:
當呼叫ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWith-
FixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue新增一個實現了
RunnableScheduledFutur介面的ScheduledFutureTask。執行緒池中的執行緒從DelayQueue中獲取ScheduledFutureTask,然後執行任務。
如何合理配置執行緒池
1.任務特性
任務的性質:CPU密集型任務、IO密集型任務和混合型任務。
任務的優先順序:高、中和低。
任務的執行時間:長、中和短。
任務的依賴性:是否依賴其他系統資源,如資料庫連線。
2.處理策略
性質不同的任務可以用不同規模的執行緒池分開處理。
CPU密集型任務應配置儘可能小的 執行緒,如配置Ncpu+1個執行緒的執行緒池。由於IO密集型任務執行緒並不是一直在執行任務,則應配置儘可能多的執行緒,如2*Ncpu。
混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務 和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於序列執行的吞吐量。
如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。
優先順序不同的任務可以使用優先順序佇列PriorityBlockingQueue來處理。它可以讓優先順序高的任務先執行。
3.注意
如果一直有優先順序高的任務提交到佇列裡,那麼優先順序低的任務可能永遠不能執行。執行時間不同的任務可以交給不同規模的執行緒池來處理,或者可以使用優先順序佇列,讓執行時間短的任務先執行。
依賴資料庫連線池的任務,因為執行緒提交SQL後需要等待資料庫返回結果,等待的時間越 長,則CPU空閒時間就越長,那麼執行緒數應該設定得儘量大,這樣才能更好地利用CPU。
建議使用有界佇列。有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,如果當時我們設定成無界佇列,那麼執行緒池的佇列就會越來越多,有可能會撐滿記憶體,導致整個系統不可用,而不只是後臺任務出現問題。
執行緒池的監控
1.定義
如果在系統中大量使用執行緒池,則有必要對執行緒池進行監控,方便在出現問題時,可以根據執行緒池的使用狀況快速定位問題。可以通過執行緒池提供的引數進行監控,
2.監控執行緒池使用的屬性
taskCount:執行緒池需要執行的任務數量。
completedTaskCount:執行緒池在執行過程中已完成的任務數量,小於或等於taskCount。
largestPoolSize:執行緒池裡曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否曾經滿過。如該數值等於執行緒池的最大大小,則表示執行緒池曾經滿過。
getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,執行緒池裡的執行緒不會自動銷燬,所以這個大小隻增不減。
getActiveCount:獲取活動的執行緒數。通過擴充套件執行緒池進行監控。可以通過繼承執行緒池來自定義執行緒池,重寫執行緒池的
注:
beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行後和執行緒池關閉前執行一些程式碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。
本人才疏學淺,若有錯,請指出,謝謝!
如果你有更好的建議,可以留言我們一起討論,共同進步!
衷心的感謝您能耐心的讀完本篇博文。