Executor 之 執行緒池及定時器
1. Executor系列介面
Executor
用於解耦任務(Runnable
)提交者和執行者,它只有一個方法void execute(Runnable command)
,通過呼叫它向執行者提交任務,但無法知道執行的結果/進度,也無法拿到任務返回值。
ExecutorService
繼承Executor
,是一個更具體的介面。它額外提供了以下方法:
-
關閉執行者
shutdown(); // 拒絕接受任務,但繼續執行舊任務
shutdownNow(); // 拒絕接受任務,且返回所有未執行的舊任務
awaitTermination(long,TimeUnit); // 等待執行者完全關閉
-
查詢執行者當前狀態
isShutdown();
isTerminated();
-
submit
和invokeall
系列方法 接受Runnable/Callable
型別的引數,返回Future
,讓提交者可以瞭解任務執行的進度/拿到執行結果。Future
是提交者和執行者之間的通訊手段,它代表任務的執行情況: 其中,get
方法會阻塞直到任務被完成,常用於非同步轉同步的場景。
ScheduledExecutorService
是一個定時器,它在ExecutorService
的基礎上增加了若干定時/延遲/迴圈排程任務的方法:
// 延遲執行
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit);
// 固定頻率執行(不管單個任務執行時間,間隔到了就執行)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 固定延遲執行(任務1執行完和任務2開始執行之間的間隔是固定的)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
2. 執行緒池ThreadPoolExecutor
ThreadPoolExecutor
是 執行緒池 和 任務佇列(BlockingQueue) 實現的ExecutorService
,和所有的池一樣,它的主要功能有兩個:
- 降低頻繁建立/銷燬執行緒的開銷;
- 防止濫用執行緒資源
作為一個對外提供服務的容器,它很多方面的實現都值得參考。
2.1 服務能力的逐步降級 & 佔用資源的自動伸縮
提交任務時呼叫的是BlockingQueue#offer()
方法,如果不成功則立即返回false
,這意味著任務提交者不會因為佇列滿而被阻塞。
submit(...)
方法對傳入的Runnable
/Callable
包裝進一個FutureTask
,再呼叫execute(...)
處理,最後返回這個FutureTask
物件。
FutureTask
是一個實現了Runnable
和Future
的類,它本質上是一個任務,同時又是一個同步器,提供了具有阻塞/喚醒語義的查詢和執行方法。
execute(...)
處理策略如下:
- 輕負載 (當前執行緒數 <
corePoolSize
):建立一個執行緒(內部類Worker
),該任務作為其初始任務。 - 中負載 (當前執行緒數 =
corePoolSize
):任務入佇列(offer()
方法,客戶不會阻塞)。 - 重負載 (當前執行緒數 ∈ [
corePoolSize
,maximumPoolSize
],且任務佇列已滿):繼續建立執行緒。 - 超負載 (當前執行緒數 =
maximumPoolSize
,且任務佇列已滿): 拒絕新任務。
隨著負載增加,執行緒池的處理速度 逐漸下降;超負荷時拒絕新請求,保證容器不掛:
快 –> 部分快,部分慢 –> 慢 –> 慢,且拒絕新任務
Worker
執行緒的邏輯
Worker
不停地從任務佇列中拿任務執行,如果拿到 null,則退出。
Worker
從BlockingQueue
中取元素有兩種選擇:poll(timeout)
或 take()
,如佇列空二者均會阻塞,但前者有超時時間。選擇策略如下:
-
當前執行緒數 <=
corePoolSize
(輕負載):使用take()
,當佇列為空時執行緒一直阻塞而不退出,原因是執行緒池在大部分時間都會處在輕/中負載的狀態,避免此時執行緒的頻繁建立和銷燬。也可以更改這個預設行為。如果手動設定
allowCoreThreadTimeOut
,將用poll(timeout)
,超過指定時間還沒任務執行緒就退出。 -
當前執行緒數 >
corePoolSize
(重負載):使用poll(timeout)
,負載回落後多餘執行緒退出,避免不必要的資源佔用。
不管哪種情況,呼叫poll
的超時時間由引數keepAliveTime
指定。
Worker
通過以上行為保證了容器佔用的資源隨負載程度而自動彈性伸縮。
2.2 資源的懶載入和預載入
預設執行緒是懶載入的,即只有當新任務提交時才會建立Worker
。可以呼叫prestartAllCoreThreads()
或prestartCoreThread()
方法預啟動 corePoolSize 個或1個執行緒,一來避免任務佇列初始就有元素時沒有執行緒執行;二來避免接受任務時線上程建立上消耗時間。
2.3 拒絕服務的方式
當執行緒數和佇列容量都達到上限時,容器將拒絕新來的服務請求,這是RejectedExecutionHandler
介面負責的。
ThreadPoolExecutor
內部提供了以下 4 個實現:
CallerRunsPolicy
:由客戶執行緒自己執行任務;AbortPolicy
(預設):丟擲RejectedExecutionException
;DiscardPolicy
:忽略,什麼也不做;DiscardOldestPolicy
:移除任務佇列裡呆的最久的任務(poll()
),重新提交。
2.4 狀態管理
執行緒池的狀態圖如下:
初始狀態為 running;
進入 shutDown 和 stop 區別是:
- shutDown: 不接受新任務,但會繼續執行已有任務
- stop: 不接受新任務,也不執行已有任務;佇列中的剩餘任務被返回給客戶
當佇列為空且執行緒數為0時,進入 tidying 狀態。該狀態下會呼叫一個預留給子類實現的terminated
方法進行額外的資源清理工作。結束後進入 terminated 狀態。
2.4 任務佇列的選擇
ThreadPoolExecutor
的任務佇列是可配置的。
如果要求任務立即得到處理,可以使用SynchronousQueue
。
SynchronousQueue
是一個容量為0的阻塞佇列,當 offer 元素時,如果當前沒有執行緒阻塞在take / poll(timeout)
上就立即失敗;否則將元素交給其中的一個執行緒。因此它更像是執行緒之間傳遞物件的工具,而不是一個佇列。
使用SynchronousQueue
時,執行緒池將無法快取任務,每個任務都會立即建立一個執行緒來執行它,任務可以得到快速處理。這種策略下執行緒池最多隻能處理maximumPoolSize
個任務。
如果可以接受任務執行的延遲,但需要儘可能多的處理任務,可以使用無界的LinkedBlockingQueue
或有界的ArrayBlockingQueue
。若是前者,執行緒數將不超過corePoolSize
,引數maximumPoolSize
無效;若是後者,則需要線上程數和佇列長度間權衡:短佇列需要更多的執行緒,但這會帶來更多的資源佔用和 context switch;少執行緒則需要更長的佇列,但此時吞吐量較低。
2.5 定時器ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
是ScheduledExecutorService
定時器介面的實現,它繼承自ThreadPoolExecutor
,因此底層的工作原理和執行緒池是一樣的,但其任務佇列預設是DelayBlockingQueue
的變種(不可配置)。Worker
依次從中獲取最快到期的任務執行。對於需要重複執行的任務,第一次執行完畢後計算其下次執行時間,再重新入隊。
這個不太瞭解,以後看。
3. 工廠類 Executors
提供了建立執行緒池、定時器的工廠方法。
執行緒池:
-
newCachedThreadPool()
corePoolSize = 0, maximumPoolSize = int 最大值,keepAliveTime = 1分鐘,快取佇列為
SynchronousQueue
。每個任務都會重用已有執行緒或建立一個新執行緒執行,直到執行緒數達到上限;沒有任務時所有執行緒都會自然退出。
-
newFixedThreadPool
corePoolSize = n, maximumPoolSize = n,keepAliveTime = 0,快取佇列為無界的
LinkedBlockingQueue
。 -
newSingleThreadExecutor
corePoolSize = 1, maximumPoolSize = 1,keepAliveTime = 0,快取佇列為無界的
LinkedBlockingQueue
。效果和
newFixedThreadPool(1)
一樣。 -
newScheduledThreadPool 定時器
建立一個定時器ScheduledThreadPoolExecutor
,corePoolSize = n, maximumPoolSize = int 最大值,keepAliveTime = 0,快取佇列為DelayedWorkQueue
。