徹底弄懂 Java 執行緒池原理
概述
這篇文章是我在閱讀原始碼時整理的一些筆記,對原始碼的關鍵點進行了比較詳細的註釋,然後加上一些自己對執行緒池機制的理解。最終目的是要弄清楚下面這些問題:
- 執行緒池有 execute() 和 submit() 方法,執行機制分別是什麼?
- 如何新建執行緒?
- 任務如何執行?
- 執行緒如何銷燬?超時機制如何實現?
首先需要介紹一下執行緒池的兩個重要成員:
ctl
AtomInteger 型別。高3位儲存執行緒池狀態,低29位儲存當前執行緒數量。workerCountOf(c) 返回當前執行緒數量。runStateOf(c) 返回當前執行緒池狀態。 執行緒池有如下狀態:
- RUNNING:接收新任務,處理佇列任務。
- SHUTDOWN:不接收新任務,但處理佇列任務。
- STOP:不接收新任務,也不處理佇列任務,並且中斷所有處理中的任務。
- TIDYING:所有任務都被終結,有效執行緒為0。會觸發terminated()方法。
- TERMINATED:當terminated()方法執行結束。
Worker
這個執行緒線上程池中的包裝類。一個 Worker 代表一個執行緒。執行緒池用一個 HashSet 管理這些執行緒。
需要注意的是,Worker 本身並不區分核心執行緒和非核心執行緒,核心執行緒只是概念模型上的叫法,特性是依靠對執行緒數量的判斷來實現的
- 繼承自 AQS,本身實現了一個最簡單的不公平的不可重入鎖。
- 構造方法傳入 Runnable,代表第一個執行的任務,可以為空。構造方法中新建一個執行緒。
- 實現了 Runnable 介面,在新建執行緒時傳入 this。因此執行緒啟動時,會執行 Worker 本身的 run 方法。
- run 方法呼叫了 ThreadPoolExecutor 的 runWorker 方法,負責實際執行任務。
submit() 方法的執行機制
submit 返回一個 Future 物件,我們可以呼叫其 get 方法獲取任務執行的結果。程式碼很簡單,就是將 Runnable 包裝成 FutureTask 而已。可以看到,最終還是呼叫 Execute 方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
複製程式碼
FutureTask 的程式碼就不貼了,簡述一下原理:
- FutureTask 實現了 RunnableFuture 介面,RunnableFuture 繼承自Runnable。執行任務時會呼叫 FutureTask 的 run 方法,run 方法中執行真正的任務程式碼,執行完後呼叫 set 方法設定結果。
- 如果任務執行完畢,get 方法會直接返回結果,如果沒有,get 方法會阻塞並等待結果。
- set 方法中設定結果後會取消阻塞,使 get 方法返回結果。
execute() 方法的執行機制
這個機制大家應該都很熟了,再簡述一遍:
- 工作執行緒數小於核心執行緒數時,直接新建核心執行緒執行任務;
- 大於核心執行緒數時,將任務新增進等待佇列;
- 佇列滿時,建立非核心執行緒執行任務;
- 工作執行緒數大於最大執行緒數時,拒絕任務
具體的程式碼分析如下:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //小於核心執行緒數
if (addWorker(command, true)) //啟動核心執行緒並執行任務
return;
c = ctl.get(); //執行失敗時重新獲取值
}
if (isRunning(c) && workQueue.offer(command)) { //檢查執行狀態並將任務新增到佇列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //重新檢查,防止狀態有變化。如果有,移出佇列並拒絕任務
reject(command);
else if (workerCountOf(recheck) == 0) //如果執行緒數為0,建立非核心執行緒,第一個引數為空時會從佇列中取任務執行
addWorker(null, false);
}
else if (!addWorker(command, false)) //新增到佇列失敗,說明佇列已滿,建立非核心執行緒執行任務
reject(command); //執行失敗說明達到最大執行緒數,拒絕任務
複製程式碼
新任務如何新增進佇列?
執行緒池使用 addWorker 方法新建執行緒,第一個引數代表要執行的任務,執行緒會將這個任務執行完畢後再從佇列取任務執行。第二引數是核心執行緒的標誌,它並不是 Worker 本身的屬性,在這裡只用來判斷工作執行緒數量是否超標。
這個方法可以分成兩部分,第一部分進行一些前置判斷,並使用迴圈 CAS 結構將執行緒數量加1。程式碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //這個語法不常用,用於給外層 for 迴圈命名。方便巢狀 for 迴圈中,break 和 continue 指定是外層還是內層迴圈
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// firstTask 不為空代表這個方法用於新增任務,為空代表新建執行緒。SHUTDOWN 狀態下不接受新任務,但處理佇列中的任務。這就是第二個判斷的邏輯。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 使用迴圈 CAS 自旋,增加執行緒數量直到成功為止
for (;;) {
int wc = workerCountOf(c);
//判斷是否超過執行緒容量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//使用 CAS 將執行緒數量加1
if (compareAndIncrementWorkerCount(c))
break retry;
//修改不成功說明執行緒數量有變化
//重新判斷執行緒池狀態,有變化時跳到外層迴圈重新獲取執行緒池狀態
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
//到這裡說明狀態沒有變化,重新嘗試增加執行緒數量
}
}
... ...
}
複製程式碼
第二部分負責新建並啟動執行緒,並將 Worker 新增至 Hashset 中。程式碼很簡單,沒什麼好註釋的,用了 ReentrantLock 確保執行緒安全。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; //這個引數是測試用的,不用管它
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); //新增失敗時移除 Worker 並將執行緒數量減 1
}
return workerStarted;
}
複製程式碼
任務如何執行?
在 addWorker 方法中,執行緒會被啟動。新建執行緒時,Worker 將自身傳入,所以執行緒啟動後會執行 Worker 的 run 方法,這個方法呼叫了 ThreadPoolExecutor 的 runWorker 方法執行任務,runWorker 中會迴圈取任務執行,執行邏輯如下:
- 如果 firstTask 不為空,先執行 firstTask,執行完畢後置空;
- firstTask 為空後呼叫 getTask() 從佇列中取任務執行;
- 一直執行到沒有任務後,退出 while 迴圈
- 呼叫 processWorkerExit() 方法,將 Worker 移除出 HashSet,此時執行緒執行完畢,也不再被引用,會自動銷燬。
具體程式碼分析如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
//task 為我們傳給 execute 的任務。task 為空時從佇列中取任務執行
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//這段邏輯非常繞。實際上它實現了以下邏輯:
//1.如果執行緒池已停止且執行緒未中斷,條件成立,中斷執行緒
//2.如果執行緒池未停止,執行緒為中斷狀態,將執行緒狀態重置,並重新進行1的判斷
//3.如果執行緒池未停止,執行緒不為中斷狀態,條件不成立
//Thread.interrupted() 會重置中斷狀態,保證
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
//beforeExecute 和 afterExecute 為空方法,交給子類實現
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //執行任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//執行到這裡時說明執行緒執行完畢,此方法將執行緒從 HashSet 中移出。執行緒終止且沒有引用,會被自動回收。
processWorkerExit(w, completedAbruptly);
}
}
複製程式碼
執行緒如何銷燬?超時機制如何實現?
在 runWorker 方法中 getTask 方法返回 null 之後會導致執行緒執行完畢,被移除出 HashSet,從而被系統銷燬。 執行緒的超時機制也是在這個方法實現的,藉助於 BlockingQueue 的 poll 和 take 方法。
- poll 方法可以設定一個超時時間,當佇列為空時,在此時間內阻塞等待,超時後返回 null
- take 方法在佇列為空時直接丟擲異常
超時機制實現原理如下:
- 當 allowCoreThreadTimeOut 為 true,所有執行緒都會超時,全部呼叫 poll 方法,傳入 keepAliveTime 引數。
- 當 allowCoreThreadTimeOut 為 false 時,如果工作執行緒數量大於核心執行緒數,將此執行緒當作非核心執行緒處理,呼叫 poll 方法
- 當 allowCoreThreadTimeOut 為 false 且工作執行緒數量小於等於核心執行緒數時,將此執行緒當作核心執行緒處理,呼叫 take 方法,佇列為空時丟擲異常,進入下一次迴圈。如果佇列一直為空,核心執行緒會一直在此迴圈等待任務進行處理。
具體程式碼如下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 允許核心執行緒超時或者執行緒數大於核心執行緒
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// timed && timedOut 這兩個引數結合起來控制超時機制
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 佇列為空時,poll 方法會阻塞等待,超過 keepAliveTime 時返回空值。take 方法會直接返回異常。
// 當 allowCoreThreadTimeOut 為 true 時,核心執行緒和非核心執行緒沒有區別,一律呼叫poll方法
// 當 allowCoreThreadTimeOut 為 false 時,執行緒數量超過核心執行緒數才會進入超時機制,如果不超過,則將當前執行緒當作核心執行緒處理,呼叫 take,丟擲異常後進入下一次迴圈。如果佇列為空,此處會一直迴圈。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製程式碼