原始碼分析—ThreadPoolExecutor執行緒池三大問題及改進方案
前言
在一次聚會中,我和一個騰訊大佬聊起了池化技術,提及到java的執行緒池實現問題,我說這個我懂啊,然後巴拉巴拉說了一大堆,然後騰訊大佬問我說,那你知道執行緒池有什麼缺陷嗎?我頓時啞口無言,甘拜下風,所以這次我再回來思考一下執行緒池的實現原理
原始碼分析
ThreadPoolExecutor構造器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //校驗幾個引數不能小於零,否則丟擲異常 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
- corePoolSize:核心執行緒數
- maximumPoolSize:最大執行緒數,執行緒池允許建立的最大執行緒數
- workQueue:任務佇列,BlockingQueue 介面的某個實現(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)
- keepAliveTime:空閒執行緒的保活時間,如果某執行緒的空閒時間超過這個值都沒有任務給它做,那麼可以被關閉了。注意這個值並不會對所有執行緒起作用,如果執行緒池中的執行緒數少於等於核心執行緒數 corePoolSize,那麼這些執行緒不會因為空閒太長時間而被關閉,當然,也可以通過呼叫 allowCoreThreadTimeOut(true)使核心執行緒數內的執行緒也可以被回收
- threadFactory:用於生成執行緒,一般我們使用Executors.defaultThreadFactory()
- handler:當執行緒池已經滿了,但是又有新的任務提交的時候,該採取什麼策略由這個來指定
預設的幾個屬性:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 這裡 COUNT_BITS 設定為 29(32-3),意味著前三位用於存放執行緒狀態,後29位用於存放執行緒數 private static final int COUNT_BITS = Integer.SIZE - 3; //最大執行緒數是 2^29-1=536870911 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; // 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl //將CAPACITY取費後和c進行取與運算,可以得到高3位的值,即執行緒池的狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } //將c和CAPACITY取與運算,可以得到低29位的值,即執行緒池的個數 private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
採用一個 32 位的整數來存放執行緒池的狀態和當前池中的執行緒數,其中高 3 位用於存放執行緒池狀態,低 29 位表示執行緒數(CAPACITY)
- RUNNING:這個沒什麼好說的,這是最正常的狀態:接受新的任務,處理等待佇列中的任務
- SHUTDOWN:不接受新的任務提交,但是會繼續處理等待佇列中的任務
- STOP:不接受新的任務提交,不再處理等待佇列中的任務,中斷正在執行任務的執行緒
- TIDYING:所有的任務都銷燬了,workCount 為 0。執行緒池的狀態在轉換為 TIDYING 狀態時,會執行鉤子方法 terminated()
- TERMINATED:terminated() 方法結束後,執行緒池的狀態就會變成這個
execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果當前的執行緒數小於corePoolSize
if (workerCountOf(c) < corePoolSize) {
//呼叫addWorker新建一個執行緒
if (addWorker(command, true))
return;
c = ctl.get();
}
// 到這裡說明,要麼當前執行緒數大於等於核心執行緒數,要麼剛剛 addWorker 失敗了
//校驗當前執行緒狀態是RUNNING,並將command入隊
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果不是執行狀態,那麼移除佇列,並執行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果執行緒池還是 RUNNING 的,並且執行緒數為 0,那麼開啟新的執行緒
//防止任務提交到佇列中了,但是執行緒都關閉了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//到這裡說明佇列已經滿了,所以新建一個執行緒,如果新建的執行緒數已經超過了maximumPoolSize,那麼執行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
用一張圖來概括一下上面的內容:
- 如果執行緒池中的執行緒數少於 coreThreadCount 時,處理新的任務時會建立新的執行緒;
- 如果執行緒數大於 coreThreadCount 則把任務丟到一個佇列裡面,由當前空閒的執行緒執行;
- 當佇列中的任務堆積滿了的時候,則繼續建立執行緒,直到達到 maxThreadCount;
- 當執行緒數達到 maxTheadCount 時還有新的任務提交,那麼我們就不得不將它們丟棄了。
我們下面看一下addWorker是如何建立執行緒的:
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//獲取當前執行緒池狀態
int rs = runStateOf(c);
//1
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//2. 校驗傳入的執行緒數是否超過了容量大小, 或者是否超過了corePoolSize或maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//到了這裡說明執行緒數沒有超,那麼就用CAS將執行緒池的個數加1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//3 說明有其他的執行緒搶先更新了狀態,繼續下一輪的迴圈,跳到外層迴圈
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//建立一個執行緒
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//4 如果執行緒是沒有問題的話,那麼將worker加入到佇列中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// largestPoolSize 用於記錄 workers 中的個數的最大值
// 因為 workers 是不斷增加減少的,通過這個值可以知道執行緒池的大小曾經達到的最大值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果worker入隊成功,那麼啟動執行緒
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果worker啟動失敗,那麼就回滾woker執行緒建立的狀態
if (! workerStarted)
addWorkerFailed(w);
}
// 返回執行緒是否啟動成功
return workerStarted;
}
- 這裡主要是列舉了幾個條件不能建立新的worker的情況
- 執行緒池狀態大於 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED
- firstTask != null
- workQueue.isEmpty()
如果執行緒池處於 SHUTDOWN,但是 firstTask 為 null,且 workQueue 非空,那麼是允許建立 worker 的
- 如果傳入的core引數是true代表使用核心執行緒數 corePoolSize 作為建立執行緒的界限,也就說建立這個執行緒的時候,如果執行緒池中的執行緒總數已經達到 corePoolSize,那麼不能響應這次建立執行緒的請求;如果是false,代表使用最大執行緒數 maximumPoolSize 作為界限
- 如果CAS失敗並不是因為有其他執行緒在嘈雜哦導致的,那麼就直接在裡層迴圈繼續下一次的迴圈就好了,如果是因為其他執行緒的操作,導致執行緒池的狀態發生了變更,如有其他執行緒關閉了這個執行緒池,那麼需要回到外層的for迴圈
- 如果是 小於 SHUTTDOWN 那就是 RUNNING,則繼續往下繼續,或者狀態是SHUTDOWN但是傳入的firstTask為空,代表繼續處理佇列中的任務
addWorkerFailed
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
addWorkerFailed的處理就是將workers集合裡面的worker移除,然後count減1,
worker物件
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
....
}
Worker是繼承AQS物件的,在建立Worker物件的時候會傳入一個Runnable物件,並設定AQS的state狀態為-1,並從執行緒工廠中新建一個執行緒
呼叫thread.start方法會呼叫到Worker的run方法中
public void run() {
runWorker(this);
}
Worker的run方法會呼叫到ThreadPoolExecutor的runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果task為空,那麼就從workQueue裡面獲取task
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果執行緒池狀態大於等於 STOP,那麼意味著該執行緒也要中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 這是一個鉤子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 這是一個鉤子方法
afterExecute(task, thrown);
}
} finally {
// 置空 task,準備 getTask 獲取下一個任務
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//異常情況或getTask獲取不到任務時會執行關閉
processWorkerExit(w, completedAbruptly);
}
}
傳入一個Worker首先去校驗firstTask是不是null,如果是那麼就呼叫getTask方法從workQueue佇列裡面獲取,然後判斷一下當前的執行緒是否需要中斷,如需要的話執行鉤子方法,然後呼叫task的run方法執行task;
如果while迴圈裡面getTask獲取不到任務的話,就結束迴圈呼叫processWorkerExit方法執行關閉;
如果是異常原因導致的while迴圈退出,那麼會呼叫processWorkerExit並傳入為true
getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//要麼狀態大於STOP,要麼狀態等於SHUTDOWN並且佇列是空的,那麼執行緒數減一後返回null
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 允許核心執行緒數內的執行緒回收,或當前執行緒數超過了核心執行緒數,那麼有可能發生超時關閉
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//校驗執行緒數是否超了,或者是否超時
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 到 workQueue 中獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
這個方法返回null有如下幾種情況:
- 當前狀態是SHUTDOWN並且workQueue佇列為空
- 當前狀態是STOP及以上
- 池中有大於 maximumPoolSize 個 workers 存在(通過呼叫 setMaximumPoolSize 進行設定)
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果是異常原因中斷,那麼需要將執行執行緒數減一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//設定完成任務數
completedTaskCount += w.completedTasks;
//將worker從集合裡移除
workers.remove(w);
} finally {
mainLock.unlock();
}
//判斷當前的執行緒池是否處於SHUTDOWN狀態,判斷是否要終止執行緒
tryTerminate();
int c = ctl.get();
//如果是RUNNING或SHUTDOWN則會進入這個方法
if (runStateLessThan(c, STOP)) {
//如不是以外中斷則會往下走
if (!completedAbruptly) {
//判斷是否保留最少核心執行緒數
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//如果當前執行的Worker數比當前所需要的Worker數少的話,那麼就會呼叫addWorker,新增新的Worker
addWorker(null, false);
}
}
- 判斷是否是意外退出的,如果是意外退出的話,那麼就需要把WorkerCount--
- 加完鎖後,同步將completedTaskCount進行增加,表示總共完成的任務數,並且從WorkerSet中將對應的Worker移除
- 呼叫tryTemiate,進行判斷當前的執行緒池是否處於SHUTDOWN狀態,判斷是否要終止執行緒
- 判斷當前的執行緒池狀態,如果當前執行緒池狀態比STOP大的話,就不處理
- 判斷是否是意外退出,如果不是意外退出的話,那麼就會判斷最少要保留的核心執行緒數,如果allowCoreThreadTimeOut被設定為true的話,那麼說明核心執行緒在設定的KeepAliveTime之後,也會被銷燬。
- 如果最少保留的Worker數為0的話,那麼就會判斷當前的任務佇列是否為空,如果任務佇列不為空的話而且執行緒池沒有停止,那麼說明至少還需要1個執行緒繼續將任務完成
- 判斷當前的Worker是否大於min,也就是說當前的Worker總數大於最少需要的Worker數的話,那麼就直接返回,因為剩下的Worker會繼續從WorkQueue中獲取任務執行
- 如果當前執行的Worker數比當前所需要的Worker數少的話,那麼就會呼叫addWorker,新增新的Worker,也就是新開啟執行緒繼續處理任務
執行緒池的三大問題
這個任務處理流程看似簡單,實際上有很多坑,你在使用的時候一定要注意。
- JDK 實現的這個執行緒池優先把任務放入佇列暫存起來,而不是建立更多的執行緒,它比較適用於執行 CPU 密集型的任務,也就是需要執行大量 CPU 運算的任務。所以噹噹前執行緒數超過核心執行緒數時,執行緒池不會增加執行緒,而是放在佇列裡等待核心執行緒空閒下來。
但是,我們平時開發的 Web 系統通常都有大量的 IO 操作,比方說查詢資料庫、查詢快取等等。任務在執行 IO 操作的時候 CPU 就空閒了下來,這時如果增加執行任務的執行緒數而不是把任務暫存在佇列中,就可以在單位時間內執行更多的任務,大大提高了任務執行的吞吐量。所以你看 Tomcat 使用的執行緒池就不是 JDK 原生的執行緒池,而是做了一些改造,當執行緒數超過 coreThreadCount 之後會優先建立執行緒,直到執行緒數到達 maxThreadCount,這樣就比較適合於 Web 系統大量 IO 操作的場景了,你在實際運用過程中也可以參考借鑑。
- 執行緒池中使用的佇列的堆積量也是我們需要監控的重要指標,對於實時性要求比較高的任務來說,這個指標尤為關鍵。
我在實際專案中就曾經遇到過任務被丟給執行緒池之後,長時間都沒有被執行的詭異問題。最初,我認為這是程式碼的 Bug 導致的,後來經過排查發現,是因為執行緒池的 coreThreadCount 和 maxThreadCount 設定的比較小,導致任務線上程池裡面大量的堆積,在調大了這兩個引數之後問題就解決了。跳出這個坑之後,我就把重要執行緒池的佇列任務堆積量,作為一個重要的監控指標放到了系統監控大屏上。
- 如果你使用執行緒池請一定記住不要使用無界佇列(即沒有設定固定大小的佇列)。也許你會覺得使用了無界佇列後,任務就永遠不會被丟棄,只要任務對實時性要求不高,反正早晚有消費完的一天。但是,大量的任務堆積會佔用大量的記憶體空間,一旦記憶體空間被佔滿就會頻繁地觸發 Full GC,造成服務不可用,我之前排查過的一次 GC 引起的宕機,起因就是系統中的一個執行緒池使用了無界佇列。
執行緒池的改造方案
我們這裡直接學習Tomcat是如何優化執行緒池的,在我們平時的使用中如果使用LinkedBlockingQueue的話,預設是使用Integer.MAX_VALUE,即無界佇列(這種情況下如果沒有配置佇列的capacity的話,佇列始終不會滿,那麼始終無法進入開啟新執行緒到達maxThreads個數的地步,則此時配置maxThreads其實是沒有意義的)。
而在Tomcat中使用的是TaskQueue,TaskQueue的佇列capacity為maxQueueSize,預設也是Integer.MAX_VALUE。但是,其重寫offer方法,當其執行緒池大小小於maximumPoolSize的時候,返回false,即在一定程度改寫了佇列滿的邏輯,修復了使用LinkedBlockingQueue預設的capacity為Integer.MAX_VALUE的時候,maxThreads失效的"bug"。從而可以繼續增長執行緒到maxThreads,超過之後,繼續放入佇列。
所以綜上,Tomcat的執行緒池使用了自己擴充套件的taskQueue,修改了offer的邏輯,以做到最小的改動實現了執行緒池的改造。
我們再來回顧一下ThreadPoolExecutor的execute方法是怎麼寫的:
ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//這裡,如果使用workQueue的offer成功的話,那麼就不會建立新的執行緒,如果失敗的話,就會走到else if方法進行建立新的執行緒
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
TaskQueue
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private ThreadPoolExecutor parent = null;
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//當其執行緒池大小小於maximumPoolSize的時候,返回false
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
}
我們從這裡可以看到
- 如果當前執行緒數已達到MaximumPoolSize,那麼就放入到佇列裡去
- 如果當前執行緒池的數量大於正在執行的執行緒數,說明有空閒的執行緒,那麼就將任務放入到佇列中去
- 若當其執行緒池大小小於maximumPoolSize的時候,返回false