深入理解Java執行緒池(1):ThreadPoolExecutor整體流程梳理,建立worker相關方法
執行緒池作為一個執行緒的容器,主要的作用就是防止頻繁建立執行緒,節省時間資源和cpu資源。雖然一定程度上佔用了記憶體,但實際情況下利遠遠大於弊。
構造方法
public ThreadPoolExecutor(
int corePoolSize, //核心執行緒數量
int maximumPoolSize, //最大執行緒數量
long keepAliveTime, //最大存活時間
TimeUnit unit, //時間單位
BlockingQueue<Runnable> workQueue, //執行緒佇列
ThreadFactory threadFactory, //執行緒工廠
RejectedExecutionHandler handler) //超過最大執行緒後,處理方式
上邊的是最核心的構造方法,執行緒池的運作方式其實從構造方法就可以略知一二。我先簡述一下執行緒池的運作方法,以及設定的引數的作用。
- 建立執行緒池後,其實執行緒池中執行緒數量為0;
- 當有一個任務被提交後,會建立一個worker執行緒,worker會被放入執行緒池中並開始執行這個任務,任務完成後會等待我們設定的任務佇列(workQueue)中的任務。
- 這樣不停的提交任務,worker的數量也會不斷增多,直到增大到核心執行緒數量(corePoolSize),此時不會再增加執行緒池中的worker執行緒,多出的任務會放入佇列中,等到核心執行緒池中的worker空閒下來再執行放入佇列中的任務。如果佇列是無限佇列可能會出現建立過多執行緒撐爆記憶體的現象。
- 當佇列滿員後,會繼續往執行緒池中增加worker,直到達到執行緒池最大執行緒數量(maximumPoolSize)。此時,會根據設定的RejectedExecutionHandler實現類(handler)執行不同的拒絕策略,例如:丟擲異常,或者使用當前執行緒執行。若最大執行緒數量過大同樣會出現worker數量過多撐爆記憶體的現象。
- 當執行緒池的worker數量超過核心執行緒數量時,有worker的任務執行完畢後獲取佇列中的任務超過指定時間(keepAliveTime和unit確定的時間),這個worker就會被消滅。
- 執行緒工廠(threadFactory)就是建立執行緒的地方,可使用Guava的ThreadFactoryBuilder().setNameFormat(“demo-thread-%d”).build()建立threadFactory。也可以使用自帶的DefaultThreadFactory。
以上就是執行緒池執行的大概流程。佇列數量過大或者最大執行緒數量過大,會讓伺服器掛掉,由於一個服務幹爆整個伺服器實屬不智,應當謹慎使用Executors中的建立方式。
全域性變數
下邊先看看ThreadPoolExecutor中的全域性變數,構造方法中涉及的就不在贅述
//使用了執行緒安全的AtomicInteger類,保證了ctl的不會因多執行緒而出現問題
//ctl變數值的前 29 位表示工作執行緒數量 workerCount, 剩餘高位來表示執行緒池狀態runState。
//初始化時預設是running狀態,worker數量為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//running狀態;接受新任務,並處理佇列任務,是正常執行狀態
private static final int RUNNING = -1 << COUNT_BITS;
//shutdown狀態;不接受新任務,但會處理佇列任務
private static final int SHUTDOWN = 0 << COUNT_BITS;
//stop狀態;不接受新任務,不會處理佇列任務,而且會中斷正在處理過程中的任務
private static final int STOP = 1 << COUNT_BITS;
//tidying狀態;所有的任務已結束,workerCount為0,執行緒過渡到TIDYING狀態,將會執行terminated()鉤子方法(此方法為空)
private static final int TIDYING = 2 << COUNT_BITS;
//terminated狀態;terminated()方法已經完成後變更成此狀態。
private static final int TERMINATED = 3 << COUNT_BITS;
//儲存worker的容器,不是執行緒安全,所以其操作通常需要加鎖
private final HashSet<Worker> workers = new HashSet<Worker>();
//全域性鎖,會保證關鍵操作不會因併發而混亂,如對workers的操作。
private final ReentrantLock mainLock = new ReentrantLock();
其他全域性變數可能並非關鍵變數,如果遇到會在程式碼裡說明。 下面開始對原始碼進行分析
execute方法
這是執行緒池的最核心方法,加入Runnable任務,使用執行緒池中的worker執行。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//計算 workerCount 和 runState 時通過掩碼計算。
int c = ctl.get();
//若worker數量小於核心執行緒池中執行緒數,增加worker,若成功則直接返回。失敗繼續執行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//判斷若是running狀態,嘗試向佇列中新增任務。執行到此處,核心執行緒池已滿,嘗試向佇列中新增任務
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//由於沒有加鎖,需要重新判斷現在是否處於執行狀態,若不是需要移除佇列中任務,移除後執行插入失敗操作reject
if (! isRunning(recheck) && remove(command))
reject(command);
//如果正常需要檢視工作的worker數量,若為0,則加入一個空閒Worker。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//執行到此處說明,核心執行緒已經飽和,阻塞佇列已經滿員,嘗試再次增加worker環節佇列壓力
else if (!addWorker(command, false)) 。
reject(command);
}
增加空任務worker的邏輯比較奇怪,可能是狀態突然變更為shutdown,佇列中的任務已經被加入,shutdown操作意外殺死了所有worker執行緒,不得不執行這個操作。某種程度上保證了不會出現插入任務卻沒有worker的情況。根據addWorker方法中判斷是否新增worker的邏輯推斷得出,尚未完全明白為何會發生這種情況。
addWorker
addWorker方法會嘗試向執行緒池中新增執行緒。 呼叫此方法的方法往往不會對執行緒池狀態做出詳細判斷,所以addWorker方法需要對當前狀態做出明確判斷,根據不同狀態執行不同操作。 入參firstTask表示worker的第一個任務,可能為null,表示只增加worker,不設定初始任務,worker執行的任務從佇列中獲取;入參core表示是否加入核心執行緒池
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/* 判斷何時不新增worker直接返回。
* 若執行緒池狀態非running時,一下三個條件都滿足才繼續執行後邊的程式碼,否則其他非running狀態都是值節返回失敗
* 1 執行緒池是shutdown狀態 2 建立的worker的任務為空 3 任務佇列中有任務未完成
* 由此可以推斷出execute加入無任務worker的原因。
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//判斷執行緒池數量,大於執行緒池能儲存最大數量,直接返回
//判斷入參core,如果true執行緒池數量小於核心執行緒數,如果false小於設定最大執行緒數
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas增加worker數量,成功跳出,跳過所有迴圈;cas保證了併發增加worker不會發生異常
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
//執行到此處說明ctl改變,若是state改變則跳出到大迴圈,從新判斷;若是worker數量改變則在小迴圈中繼續執行
if (runStateOf(c) != rs)
continue retry;
}
}
//worker是否啟動
boolean workerStarted = false;
//woker是否被新增入worker容器
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加上鎖機制保證不會發生衝突
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//最後的判斷,和mainLock鎖一起保證不會發生併發衝突
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//在這裡開始真正增加worker。
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動worker執行緒,worker的run方法會呼叫runWorker方法,不停的領取任務
t.start();
workerStarted = true;
}
}
} finally {
//worker啟動失敗,此時呼叫addWorkerFailed方法。注意,此時ctl中worker數量已經+1,但worker容器中不一定新增。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorkerFailed
這個方法被呼叫時,worker因不明原因啟動或新增失敗。執行緒池總worker數量加一了。所以需要嘗試把worker容器中的啟動失敗的worker剔除,worker數量減一。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate(); //如有需要嘗試關閉執行緒池
} finally {
mainLock.unlock();
}
}