ThreadPoolExecutor程式碼解析
派生體系
java.util.concurrent
ThreadPoolExecutor
AbstractExecutorService
ExecutorService
Executor
這個類是Executor框的核心實現,它的名字向我們表明,它是使用thread pool實現的。這個thread pool主要解決了兩個問題:
- 執行大量的單個非同步任務,一般情況下,它能提升整體的效能。
- 執行由多個任務組成的任務集合,通過Future列表返回每個任務的執行結果。
設計原理
重要概念
為了能夠在更多上下文環境中使用,ThreadPool定義了一些概念,這些概念都直接或間接對應著可調節引數,如果不瞭解這些概念含義,很難正確地使用這些引數。下面來看一下這些概念及其含義:
當前,核心,最小,最大執行緒數(poolSize, corePoolSize, minimumPoolSize, maximumPoolSize)
poolSize: 當前處於執行和空閒狀態的匯流排程數。
corePoolSize: 核心執行緒數, 當poolSize<=corePoolSize時,存在的執行緒稱為coreThread。
minimumPoolSize: 最小執行緒數,當minimumPoolsize = allowCoreThreadTimeOut ? 0 : corePoolSize ,
maximunPoolSize: 最大執行緒數。
ThreadPool在執行過程中會自動的調節執行緒數量(poolSize), 一般來說,poolSize處於[corePoolSize maximumPoolSize]區間之內。
當用戶呼叫execute提交一個任務時,如果poolSize<corePoolSize, 會建立一個新執行緒處理這個任務。如果如果poolSize處於[corePoolSize maximumPoolSize]區間內,只有佇列滿是才會建立新執行緒。無論如何,poolSize不會大於maximumPoolSize。
預設情況下,ThreadPool沒有收到任何任務時pooSize = 0, 只有當ThreadPool開始收到任務之後才會建立執行緒。但是可以通過覆蓋prestartCoreThread或prestartAllCoreThreads方法改變這種行為,提前建立執行緒。
執行緒工廠--ThreadFactory
ThreadPool使用實現了ThreadFactory介面的實現建立新執行緒。Executors工廠類提供了defaultThreadFactory方法,該方法返回一個預設的ThreadFactory例項。使用這個例項建立的執行緒具有相同的優先順序,是非後臺執行緒,命名上使用相同的字首。如果不滿意這這些行為,可以自己實現一個ThreadFactory交給ThreadPool使用。
保持存活時間(keepAliveTime)
如果poolSize > corePoolSize, 當一個執行緒的空閒時間大於keepAliveTime, 它會被終止掉。預設情況下當poolSize <= corePoolSize時,keepAliveTime不會有影響,如果呼叫 allowCoreThreadTimeOut(true), 可以讓keepAliveTime在這個時間也起作用。
任務排隊(queuing)
任何BlockingQueue的例項都可以用於儲存排隊的任務。不同的BlockingQueue實現決定了不同的排隊策略:
SynchronousQueue: 同步佇列,當提交一個任務時,要求ThreadPool中當前有至少一個空閒執行緒,或者可以建立新的執行緒(poolSize < maximumPoolSize)立即執行這個任務,否則ThreadPool會拒絕這個任務。
LinkedBlockingQueue: 無限制的佇列(只受限於能夠使用的記憶體), 不會處於full狀態, offer方法不會返回false,這意味這ThreadPool的pooSize<=corePoolSize, 不會建立大於corePoolSize的執行緒數。
ArrayBlockingQueue: 有限制的佇列, 受限於它的capacity。當poolSize == corePoolSize且佇列沒滿時, 新提交的任務會追加到佇列中排隊執行。 當poolSize在[corePoolSize maximumPooSize)區間同時隊被填列滿時,將會建立新的執行緒。直到poolSize == maximumPoolSize位置。 如果佇列被填滿同時pooSize == maximumPoolSize,新的任務會被拒絕。
拒絕任務(rejected tasks)
當ThreadPool遇到以下兩種情況時會觸發拒絕任務策略:
- 正常情況下BlockingQueue被填滿,同時poolSize == maximumPoolSize。
- 被關閉
ThreadPool使用RejectedExecutionHandler處理丟棄動作,預設定義了4中丟棄策略:
ThreadPoolExecutor.AbortPolicy: 丟擲RejectedExecutionException異常。
ThreadPoolExecutor.CallerRunsPolicy: 自己執行這個被拋棄的任務。
ThreadPoolExecutor.DiscardPolicy: 悄無聲息的丟棄掉這人任務。
狀態
ThreadPool定義了5狀態
RUNNING: 接受新提交的任務,執行佇列中發任務。
SHUTDOWN: 不接受新提交的任務,但仍然會執行佇列中的人。
STOP: 不接受新提交的任務,不執行佇列中的任務,而且會打斷正在執行中的任務。
TIDYING: 所有的任務都終止了,並且執行緒數為0,當所有的執行緒都過渡到TIDYING狀態後會呼叫treminated方法。
TERMINATED: treminated方法呼叫已經完成。
狀態之間的轉換關係
RUNNING --> SHUTDOWN
呼叫shutdown()
(RUNNING或SHUTDOWN) -- > STOP
呼叫shutdownNow()
SHUTDOWN --> TIDYING
佇列為空,同時執行緒數為0
TIDYING --> TREMINATED
treminated()執行完成。
向ThreadPool提交任務: execute
ThreadPoolExecutor例項建立之後,在沒有呼叫execute提交任務之前,ThreadPool中是沒有執行緒的,執行緒的建立是依賴exeute來驅動的。可以說,exeute是ThreadPoolExecutor執行的觸發器,所有我選擇先從exeute方法開始分析程式碼。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 如果執行緒數小於 corePoolSize, 建立一個新執行緒。
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 如果處於RUNNGIN狀態把任務放到佇列中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) / /再次檢查執行緒狀態,如果不是RUNNING狀態,把任務從佇列刪除,然後拒絕這個任務
reject(command);
else if (workerCountOf(recheck) == 0) // 如果執行緒數為0,建立一個新執行緒
addWorker(null, false);
}
/* 如果執行到這裡說明當前不是出於RUNNING狀態,或處於RUNNING狀態但佇列已經被填滿
*嘗試建立新的執行緒執行這個任務,如果失敗,拒絕這任務
*/
else if (!addWorker(command, false))
reject(command);
}
以上就是exeute程式碼,它很簡單,但其中ctl成員變數比較費解。ctl是AtomicInteger型別,它被用來打包儲存ThreadPoolExecutor的狀態和執行緒數。
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
它初始化時,把狀態設定成RUNNING,下面來看看它的結構
高位 ---- > 低位
運算狀態(run state) |
執行緒數(workerCount) |
31 -- 29 |
28 -- 0 |
狀態位
RUNNING
1 |
1 |
1 |
SHUTDOWN
0 |
0 |
0 |
STOP
0 |
0 |
1 |
TIDYING
0 |
1 |
0 |
TREMINATED
0 |
1 |
1 |
知道了這些資料的儲存方式,把他們取出來,只需要一些簡單的位運算就可以了。
狀態的大小關係 RUNNING < SHUTDOWN < STOP < TIDYING < TREMINATED,
runStateOf(clt.get()) < SHUTDOWN RUNNING狀態
runStateOf(clt.get()) >= SHUTDOWN 非RUNNING狀態
這個大小關係要記住,這樣理解程式碼會更快。
建立新執行緒
ThreadPool把執行緒封裝成Worker對物件,新增worker就是新增執行緒,addWorker方法做的事情就是新增執行緒。
private boolean addWorker(Runnable firstTask, boolean core) {
/* 這段程式碼的作用是確保滿足一下條件的任意一個時才建立新執行緒
*1. 處於RUNNING 狀態, 可以接受新任務,可以繼續執行佇列中的任務
*2. 處於SHUTDOWN狀態, 佇列不是空,且當前沒有提交新任務
*/
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && // 非RUNNINGG狀態
! (rs == SHUTDOWN &&
firstTask == null && // 當前提交的新任務
! workQueue.isEmpty())) // 佇列不是空
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 如果當前呼叫建立的是core執行緒, 確保當前執行緒數 <corePoolSize, 否則確保當前執行緒數< maximumPoolSize
return false;
if (compareAndIncrementWorkerCount(c)) // 原子操作,增加執行緒數
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 執行到這裡表示已經通過檢查可以建立新執行緒,並且執行緒數已經加1
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 再次檢查,確保當前仍然滿足允許建立執行緒的條件
if (t.isAlive()) // 確保Thread還沒有呼叫start()
throw new IllegalThreadStateException();
workers.add(w); // 把worker執行緒放進HashSet中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 啟動新執行緒
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
執行緒的主迴圈
Worker實現了Runnable介面
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
構造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
建立執行緒時把Worker例項本身當做執行緒的Runnable產生,所以當執行緒啟動後,將會呼叫Worker的run方法。
public void run() {
runWorker(this);
}
執行緒的主迴圈就在runWorker方法中實現
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 如果firstTask!=null, 先執行firstTask
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 如果沒有firstTask, 從佇列中取出一個task, 如果沒有取到,退出執行緒
w.lock();
// 如果處於狀態>=STOP(前面已經講過狀態直接的大小關係), 確保執行緒處於interrupted狀態
// 否則清除執行緒的interrupted狀態
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 執行任務前呼叫的方法,預設什麼都沒幹,使用者可以根據需要覆蓋它
Throwable thrown = null;
try {
task.run(); // 執行任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 任務完成之後呼叫的方法, 預設什麼都沒幹,使用者可以根據需要覆蓋它
}
} finally {
task = null; // 把當前task置空,這樣才能呼叫getTask從佇列裡取出任務
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 正常退出執行緒 completedAbruptly是true, 異常導致的執行緒退出為false
processWorkerExit(w, completedAbruptly);
}
}
從佇列中得到排隊的任務
在runWorker主迴圈中,除了第一次的任務從worker的firsTask(在它不是null的情況下)取之外, 後面每次都是呼叫getTask從佇列中取出一個任務。
下面是getTask的程式碼分析
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 得到當前狀態
// 如果當前狀態 > SHUTDOWN 退出執行緒
// 如果當前狀態 == SHUTDOWN 且 佇列為空,退出執行緒
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); //減少當前執行緒數
return null;
}
int wc = workerCountOf(c); // 得到當前的執行緒數
// 執行緒是否允許超時的條件: 設定允許coreThread超時,或者當前執行緒數 > corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 執行緒退出需要同時滿足以下兩個條件條件:
// 1. 當前執行緒數>maximumPooSize 或 允許超時同時檢查到已經超時
// 2. 當前執行緒數>1 或 佇列為空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // 減少當前執行緒數, 這個方法確保多執行緒環境下不會過多地結束執行緒。
return null;
continue;
}
try {
// 取出一個任務。如果允許超時,呼叫poll,否則呼叫take
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; // 已經超時,執行到這裡表明poll超時返回
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask的功能除了取出一個任務以外,它還負責在條件滿足的情況下正常地結束一個執行緒
執行緒結束
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果執行緒是由於異常原因結束的,這裡要糾正執行緒數
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); // 把執行緒從HashSet中刪除
} finally {
mainLock.unlock();
}
tryTerminate(); // 嘗試終止整個ThreadPool
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 如果當前狀態<STOP
if (!completedAbruptly) { // 如果不是異常結束
// 計算最小執行緒數min
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min) // 如果當前執行緒數>=min直接返回
return; // replacement not needed
}
// 建立新執行緒, 條件:
// 當前執行緒正常結束
// 當前執行緒異常結束,但當前執行緒數小於最小執行緒數
addWorker(null, false);
}
}
上面的程式碼實現了執行緒的生命週期的管理,執行緒只有在ThreadPoolExecutor的狀態處於RUNNGIN或SHUTDOWN時才可以存在。下面是這兩種狀態下執行緒的生存狀態:
RUNNING:
允許coreThread超時: 執行緒空閒(意味著佇列為空)時間超過 keepAliveTime, 執行緒會被結束, 直到執行緒數為0。
不允許coreThread超時: 執行緒空閒時間超過 keepAliveTime, 執行緒會被結束,直到執行緒數為corePoolSize。
SHUDOWN:
當執行緒把已經在佇列裡的所有任務執行完畢後,所有執行緒都會進入退出流程,最終退出。
整個ThreadPoolExecutor的狀態變遷
前面已經講過,ThreadPool的狀態和執行緒數被打包方進一個32整數中:
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
初始化把狀態設定成RUNNING, 執行緒為0
呼叫shutdown時把狀態從RUNNING置為SHUTDOWN, 隨後過渡到TIDYING->TREMINATED。
當呼叫shutdownNow時把狀態從(RUNNING 或 SHUTDOWN) 設定為STOP, 隨後過渡到TIDYING->TREMINATED。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 只有當前狀態<SHUTDOWN時才執行狀態設定的動作
interruptIdleWorkers(); // 打斷所有空閒的的執行緒,讓這些執行緒有機會自己結束
onShutdown(); // 回撥方法,預設什麼都沒做,子類可以覆蓋
} finally {
mainLock.unlock();
}
tryTerminate(); // 嘗試執行ThreadPool的結束操作
}
shutdownNow和shutdown的操作大致一樣,不同的是它把狀態設定成STOP,還會返回佇列中沒有來得及執行的任務list。
tryTerminate方法作用是嘗試結束整個ThreadPool, 它不一定會執行真正的結束動作。它在三個地方被呼叫, worker執行緒結束時,shudown中,shutdownNow中。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//滿足以下三個條件中的任何一個就立即返回
//1. 處於RUNNGING狀態
//2. 狀態>= TIDYING
//3. 處於SHUTDOWN狀態,且佇列不是空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果處於STOP狀態,且執行緒數不為0,通知一個處於空閒的執行緒結束自己
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 執行到這裡表示目前狀態>=SHUTDOWN,執行緒數已經是0
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 總有一個執行緒會執行到這裡,把狀態置為 TIDYING
try {
terminated(); // 呼叫回撥方面,預設什麼都沒幹,子類可以覆蓋
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 把狀態置為TREMINATED, 自此整個ThreadPool才算終結
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
tryTerminate之所以要在三個地方呼叫,是為了保證當呼叫shutdown或shutdownNow之後,總有一個執行緒會完成最後的終結工作。
引數設定
分析完前面程式碼後,再來使用它,它的引數怎麼設定自然就瞭然於心。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
public void allowCoreThreadTimeOut(boolean value)
public void setCorePoolSize(int corePoolSize)
public void setKeepAliveTime(long time, TimeUnit unit)
public void setMaximumPoolSize(int maximumPoolSize)
public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
public void setThreadFactory(ThreadFactory threadFactory)