Java高併發程式設計(十二):Executor框架
Java中的執行緒既是工作單元,也是執行單元。工作單元包括Runnable和Callable,而執行單元是由Executor框架支援。
1. Executor框架簡介
1.1 Executor框架的兩級排程模型
在HotSpot VM的執行緒模型中,Java執行緒(java.lang.Thread)被一對一對映為本地作業系統執行緒。Java執行緒啟動時會建立一個本地作業系統執行緒;當該Java執行緒終止時,這個作業系統執行緒也會被回收。作業系統會排程所有執行緒並將他們分配給可用的CPU。
兩級排程模型指的是:
- 在上層,使用者將應用分解為若干個任務,然後用 Executor框架 將這些任務對映為固定數量的執行緒。
- 在底層,作業系統核心將這些執行緒對映到處理器上。
1.2 Executor框架的結構和成員
如上圖所示,Executor框架由 3大部分組成:
- 任務。即執行任務需要實現的介面:Runnable 介面 或 Callable介面。
- 任務的執行。任務執行機制核心介面Executor、和繼承自Executor的ExecutorService介面(2個關鍵類實現:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor)。
- 非同步計算的結果。即 Future介面 和 它的實現類 FutureTask類。
下面是這些類或者介面的簡介:
- Executor是一個介面,它是Executor框架的基礎,它將任務的提交與任務的執行分離開來。
- ThreadPoolExecutor是執行緒池的核心實現類,用來執行被提交的任務。
- ScheduledThreadPoolExecutor是一個實現類,可以在給定的延遲後執行命令,或者定期執行命令。ScheduledThreadPoolExecutor比Timer更靈活,功能更強大。
- Future介面和實現Future介面的FutureTask類,代表非同步計算的結果。
- Runnable介面和Callable介面的實現類,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執行。
下面是Executor框架使用示意圖:
使用流程:
- 主執行緒主要建立 Runnable 和 Callable 的任務物件。呼叫
Executors.callable(Runnable)
可以把Runnable物件包裝成Future物件。 - 再建立一個ExecutorService的一個子類物件。
- 呼叫
ExecutorService.submit(Runnable / Callable<T>)
或ExecutorService.execute(Runnable)
,把任務提交給ExecutorService。 - 呼叫
submit()
會返回一個FutureTask物件,主執行緒可呼叫FutureTask.get()
使主執行緒等待任務執行完成,也可以呼叫FutureTask.cancel(boolean)
取消任務執行。
1.3 Executor框架成員
1)Executor介面
Executor介面僅僅只有一個execute(Runnable)
方法,ExecutorService介面是繼承該介面的一個子介面。
2)ThreadPoolExecutor
執行緒池的核心實現類,用來執行被提交的任務。
3)ScheduledThreadPoolExecutor
是ThreadPoolExecutor的一個子類,只是ThreadPoolExecutor的一個簡單封裝。可以在給定的延遲後執行命令,或者定期指定命令,比 工具類Timer更加靈活、功能更強大。
4)Runnable 和 Callable介面
都是任務,可以提交給ExecutorService執行。
Callable 可以返回結果 且 可丟擲異常,Runnable 則不可以。呼叫Executors.callable(Runnable)
可以把Runnable物件包裝成Future物件。
5)Future介面
Future介面 和 FutureTask實現類 是用來表示非同步計算的結果的。JDK 1.8為止,呼叫submit()
會返回一個FutureTask物件。
6)Executors工具類
Executors工廠類可以,
- 建立3三種ThreadPoolExecutor:SinglePoolExecutor、FixedThreadPool和CachedThreadPool。
- 還可以建立ScheduledThreadPoolExecutor、SingleThreadScheduledExecutor。
3.3 各類執行緒池 和 FutureTask詳解
1)3種ThreadPoolExecutor型別的執行緒池
1.1) FixedThreadPool
目的是為了建立固定執行緒數量的執行緒池,適用於負載比較重的伺服器,使用了無界佇列LinkedBlockingQueue。
使用Executors工廠類建立的原始碼為:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可知:
- corePoolSize 和 maximumPoolSize 都被設定為 指定引數 nThreads。
- keepAliveTime 為0,說明多餘的空閒執行緒會被立即終止。
- 使用了無界佇列LinkedBlockingQueue,會帶來如下影響:
- 由於無界佇列,maximumPoolSize 和 keepAliveTime 都是無效引數;
- 由於無界佇列,執行緒池不會拒絕任務(即不會呼叫飽和策略),且執行緒池的執行緒池數量不會超過corePoolSize 。
其執行流程如下:
- 如果 執行中的執行緒數量 < corePoolSize,則建立新執行緒執行任務。
- 執行緒池完成預熱之後,將任務將入 LinkedBlockingQueue。
- 執行緒執行完 1 的任務後,會迴圈反覆從 LinkedBlockingQueue 獲取任務來執行。
3.3 各類執行緒池 和 FutureTask詳解
1)3種ThreadPoolExecutor型別的執行緒池
1.1) FixedThreadPool
目的是為了建立固定執行緒數量的執行緒池,適用於負載比較重的伺服器,使用了無界佇列LinkedBlockingQueue。
使用Executors工廠類建立的原始碼為:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可知:
- corePoolSize 和 maximumPoolSize 都被設定為 指定引數 nThreads。
- keepAliveTime 為0,說明多餘的空閒執行緒會被立即終止。
- 使用了無界佇列LinkedBlockingQueue,會帶來如下影響:
- 由於無界佇列,maximumPoolSize 和 keepAliveTime 都是無效引數;
- 由於無界佇列,執行緒池不會拒絕任務(即不會呼叫飽和策略),且執行緒池的執行緒池數量不會超過corePoolSize 。
其執行流程如下:
- 如果 執行中的執行緒數量 < corePoolSize,則建立新執行緒執行任務。
- 執行緒池完成預熱之後,將任務將入 LinkedBlockingQueue。
- 執行緒執行完 1 的任務後,會迴圈反覆從 LinkedBlockingQueue 獲取任務來執行。
1.2) SingleThreadExecutor
使用了無界佇列LinkedBlockingQueue,目的是建立使用單個執行緒的Executor,適用於需要保證各個任務順序執行且不會有多個執行緒活動的應用場景。
使用Executors工廠類建立的原始碼為:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
使用無界佇列LinkedBlockingQueue帶來的影響同上。
其執行流程如下:
流程說明略。
1.3) CachedThreadPool
CachedThreadPool 是一個會根據需求建立新執行緒的執行緒池,是一個執行緒數量無界的執行緒池。適用於 執行很多的短任務的小程式 或者 負載較輕的伺服器。使用的是無容量的無界佇列SynchronousQueue。
使用Executors工廠類建立的原始碼為:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
說明:
- corePoolSize被設定為0,maximumPoolSize為最大整數值。說明執行緒池的允許執行緒數量是無限的。
- keepAliveTime被設定為60s,說明空閒執行緒超過60s就會被停止。
- SynchronousQueue是沒有容量的工作佇列,且maximumPool是無界的,說明 主執行緒提交任務速度 高於 執行緒處理任務的速度(來一個任務就使用 建立的新執行緒 或 空閒執行緒 來執行,不將任務儲存在任務佇列中);由於執行緒數量不限制,極端情況下,可能會導致太多執行緒耗盡CPU資源。
其執行流程如下:
流程說明:
- 先執行
SynchronousQueue.offer(Runnable)
。如果 maximumPool中有空閒執行緒 執行了poll()
,那麼與主執行緒執行的offer()
配對成功,主執行緒則把任務直接提交給空閒執行緒執行,execute()
執行完成。否則,執行步驟2. - 當 初始maximumPool為空 或者 maximumPool 沒有空閒執行緒時,這時沒有執行緒執行
poll()
,這種情況下CachedThreadPool 會建立一個新執行緒執行任務,execute()
執行完成。 - 在 步驟2 中新建立的執行緒執行完任務後,會執行
poll()
,會讓空閒執行緒最多在 SynchronousQueue 中等待60s,- 如果60s內主執行緒提交了任務,空閒執行緒會執行提交的任務。
- 否則,空閒執行緒被終止。
2)ScheduledThreadPoolExecutor詳解
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,主要用來給 給定延遲之後執行任務,或者定期執行任務。適用於 需要多個後臺執行週期任務 且 又需要限制執行緒數量 的應用場景。
使用了無界佇列 DelayQueue。
功能與Timer類似,但區別有:
- ScheduledThreadPoolExecutor功能更強大。
- Timer類不管啟動多少定時器,只會啟動單個執行緒,而ScheduledThreadPoolExecutor可以指定多條執行緒。
- Timer執行週期任務依賴於系統時間,而ScheduledThreadPoolExecutor基於時間的延遲,因此不會因系統時間的改變而改變。
使用Executors工廠類建立的原始碼為:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
說明:其中corePoolSize是指定的後臺執行緒的基本數量(但是它的maximumPool是無界的)。
ScheduledThreadPoolExecutor的建構函式如下:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
newSingleThreadScheduledExecutor的建構函式如下:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
2.1)任務的傳遞
ScheduledThreadPoolExecutor 的任務是直接增加到 DelayQueue佇列中,然後由coolPool的執行緒以一定方式獲取任務執行,是一次任務傳遞,而不是直接交付給執行緒執行,其流程如下:
為了實現週期性的執行任務,對ThreadPoolExecutor做了3個方面的修改:
- 使用DelayQueue最為任務佇列。
- 獲取任務的方式不同。
- 執行週期任務後,增加了額外的處理。
2.2)任務執行步驟
主執行緒呼叫 scheduleAtFixedRate()
或 scheduleWithFixedDelay()
提交任務(一個ScheduledFutureTask物件)直接到DelayQueue佇列中。
ScheduledFutureTask主要由3個成員變數:
成員變數 | 描述 |
---|---|
long time | 任務被執行的時間 |
long sequenceNumber | 任務新增到執行緒池的序號 |
long period | 任務執行的間隔週期 |
DelayQueue 封裝了一個PriorityQueue,這個優先佇列會對任務列表的任務排序,排序規則如下:
- time小的排在前面(即時間早的任務優先被執行)。
- 若time相同,sequenceNumber小的排在前面(time相同,先提交的任務先執行)。
執行緒執行某個任務的步驟:
DelayQueue.take()
,執行緒1 從DelayQueue獲取已到期的任務。到期任務指的是 time大於等於當前時間的任務。- 執行緒1執行這個任務。
- 執行緒1修改任務的time變數為下次執行的時間。(根據period)
- 執行緒1把這個任務放回DelayQueue中。
DelayQueue.add()
。
3)FutureTask詳解
FutureTask 實現了2個介面:Future 、 Runnable。因此,
- FutureTask 可以交給Executor框架執行。
- 也可以呼叫
FutureTask.run()
直接啟動執行緒執行。
呼叫FutureTask.run()
有3種狀態:未啟動、已啟動 和 已完成,其狀態遷移圖如下:
在3種狀態下,呼叫get()
和cancel()
方法的執行示意圖如下:
4)FutureTask實現
FutureTask的實現基於AbstractQueuedSynchronizer(以下簡稱為AQS)。java.util.concurrent中的很多可阻塞類(比如ReentrantLock)都是基於AQS來實現的。AQS是一個同步框架,它提供通用機制來原子性管理同步狀態、阻塞和喚醒執行緒,以及維護被阻塞執行緒的佇列。
AQS被作為“模板方法模式”的基礎類提供給FutureTask的內部子類Sync,這個內部子類只
需要實現狀態檢查和狀態更新的方法即可,這些方法將控制FutureTask的獲取和釋放操作。具體來說,Sync實現了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通過這兩個方法來檢查和更新同步狀態。
FutureTask的設計示意圖如圖所示。
FutureTask.get()方法會呼叫AQS.acquireSharedInterruptibly(int arg)方法,這個方法的執行過程如下。
- 呼叫AQS.acquireSharedInterruptibly(int arg)方法,這個方法首先會回撥在子類Sync中實現的tryAcquireShared()方法來判斷acquire操作是否可以成功。acquire操作可以成功的條件為:state為執行完成狀態RAN或已取消狀態CANCELLED,且runner不為null。
- 如果成功則get()方法立即返回。如果失敗則到執行緒等待佇列中去等待其他執行緒執行
release操作。- 當其他執行緒執行release操作(比如FutureTask.run()或FutureTask.cancel(…))喚醒當前執行緒後,當前執行緒再次執行tryAcquireShared()將返回正值1,當前執行緒將離開執行緒等待佇列並喚醒它的後繼執行緒(這裡會產生級聯喚醒的效果,後面會介紹)。
- 最後返回計算的結果或丟擲異常。
FutureTask.run()的執行過程如下。
- 執行在建構函式中指定的任務(Callable.call())。
- 以原子方式來更新同步狀態(呼叫AQS.compareAndSetState(int expect,int update),設定state為執行完成狀態RAN)。如果這個原子操作成功,就設定代表計算結果的變數result的值為Callable.call()的返回值,然後呼叫AQS.releaseShared(int arg)。
- AQS.releaseShared(int arg)首先會回撥在子類Sync中實現tryReleaseShared(arg)來執行release操作(設定執行任務的執行緒runner為null,然會返回true);AQS.releaseShared(int arg),然後喚醒執行緒同步等待佇列中的第一個執行緒。
- 呼叫FutureTask.done()。
當執行FutureTask.get()方法時,如果FutureTask不是處於執行完成狀態RAN或已取消狀態CANCELLED,當前執行執行緒將到AQS的執行緒等待佇列中等待(見下圖的執行緒A、B、C和D)。當某個執行緒執行FutureTask.run()方法或FutureTask.cancel(…)方法時,會喚醒執行緒等待佇列的第一個執行緒(見圖所示的執行緒E喚醒執行緒A)。
假設開始時FutureTask處於未啟動狀態或已啟動狀態,等待佇列中已經有3個執行緒(A、B和
C)在等待。此時,執行緒D執行get()方法將導致執行緒D也到等待佇列中去等待。
當執行緒E(執行任務的執行緒Runner)執行run()方法時,會喚醒佇列中的第一個執行緒A。執行緒A被喚醒後,首先把自己從佇列中刪除,然後喚醒它的後繼執行緒B,最後執行緒A從get()方法返回。執行緒B、C和D重複A執行緒的處理流程。最終,在佇列中等待的所有執行緒都被級聯喚醒並從get()方法返回。