很久前(2020-10-23),就有想法學習執行緒池並輸出部落格,但是寫著寫著感覺看不懂了,就不了了之了。現在重拾起,重新寫一下(學習一下)。
執行緒池的優點也是老生常談的東西了
- 減少執行緒建立的開銷(任務數大於執行緒數時)
- 統一管理一系列的執行緒(資源)
在講ThreadPoolExecutor前,我們先看看它的父類都有些啥。
Executor,執行提交的Runnable任務的物件,將任務提交與何時執行分離開。
execute方法是Executor介面的唯一方法。
// 任務會在未來某時執行,可能執行在一個新執行緒中、執行緒池或呼叫該任務的執行緒中。
void execute(Runnable command);
ExecutorService是一個Executor,提供了管理終止的方法和返回Future來跟蹤非同步任務的方法(sumbit)。
終止的兩個方法
- shutdown(), 正在執行的任務繼續執行,不接受新任務
- shutdownNow(), 正在執行的任務也要被終止
AbstractExecutorService,實現了ExecutorService的sumbit、invokeAny,invokeAll
介紹
執行緒池主要元素
底層變數
ctl
我們講講先ctl(The main pool control state), 其包含兩個資訊
- 執行緒池的狀態(最高三位)
- 執行緒池的workerCount,有效的執行緒數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
執行緒池的狀態轉化圖
看一下每個狀態的含義
- RUNNING, 接受新的任務並且處理阻塞佇列的任務
- SHUTDOWN, 拒絕新任務,但是處理阻塞佇列的任務
- STOP, 拒絕新任務,並且拋棄阻塞佇列中的任務,還要中斷正在執行的任務
- TIDYING,所有任務執行完(包括阻塞佇列中的任務)後, 當前執行緒池活動執行緒為0, 將要呼叫terminated方法
- TERMINATED, 終止狀態。呼叫terminated方法後的狀態
workers
工作執行緒都新增到這個集合中。可以想象成一個集中管理的平臺,可以通過workers獲取活躍的執行緒數,中斷所有執行緒等操作。
private final HashSet<Worker> workers = new HashSet<Worker>();
可修改變數
構造器中的引數
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime, // 最大等待任務的時間,超過則終止超過corePoolSize的執行緒
TimeUnit unit,
BlockingQueue<Runnable> workQueue, // 阻塞佇列
ThreadFactory threadFactory, // executor使用threadFactory建立一個執行緒
RejectedExecutionHandler handler) // 拒絕策略
corePoolSize、maximumPoolSize,workQueue三者的關係:
- 當執行緒數小於corePoolSize,任務進入,即使有其他執行緒空閒,也會建立一個新的執行緒
- 大於corePoolSize且小於maximumPoolSize,workQueue未滿,將任務加入到workQueue中;只有workQueue滿了,才會新建一個執行緒
- 若workQueue已滿,且任務大於maximumPoolSize,將會採取拒絕策略(handler)
拒絕策略:
- AbortPolicy, 直接丟擲RejectedExecutionException
- CallerRunsPolicy, 使用呼叫者所線上程來執行任務
- DiscardPolicy, 默默丟棄
- DiscardOldestPolicy, 丟棄頭部的一個任務,重試
allowCoreThreadTimeOut
控制空閒時,core threads是否被清除。
探索原始碼
最重要的方法就是execute
提交的任務將在未來某個時候執行
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 獲取workCount與runState
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
根據上面的註釋,我們將execute分為三個部分來講解
- 當正在執行的執行緒數小於corePoolSize
- 當大於corePoolSize時,需要入隊
- 佇列已滿
當正在執行的執行緒數小於corePoolSize
// execute第一部分程式碼
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
addWorker, 建立工作執行緒。
當然它不會直接就新增一個新的工作執行緒,會檢測runState與workCount,來避免不必要的新增。檢查沒問題的話,新建執行緒,將其加入到wokers,並將執行緒啟動。
// firstTask,當執行緒啟動時,第一個任務
// core,為true就是corePoolSize作為邊界,反之就是maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
上面的程式碼很長,我們將它分為兩部分
// addWorker()第一部分程式碼
// 這部分主要是通過CAS增加workerCount
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 下面的條件我們將它轉化成
// if (rs >= SHUTDOWN &&
// (rs != SHUTDOWN ||
// firstTask != null ||
// workQueue.isEmpty()))
// 結合線程池狀態分析!
// 情況1. 當前的執行緒池狀態為STOP、TIDYING,TERMINATED
// 情況2. 當前執行緒池的狀態為SHUTDOWN且firstTask不為空,只有RUNNING狀態才可以接受新任務
// 情況3. 當前執行緒池的狀態為SHUTDOWN且firstTask為空且佇列為空。
// 這幾種情況,沒有必要新建worker(執行緒)。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加workerCount成功,繼續第二部分操作
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 這裡的執行緒池狀態被改變了,繼續外部迴圈,再次檢查執行緒池狀態
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
經過上面的程式碼,我們成功通過CAS使workerCount + 1,下面我們就會新建worker並新增到workers中,並啟動通過threadFactory建立的執行緒。
// addWorker()第二部分程式碼
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 第一種情況,rs為RUNNING
// 第二種情況是rs為SHUTDOWN,firstTask為null, 但是workQueue(阻塞佇列)不為null,建立執行緒進行處理
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 這裡是該執行緒已經被啟動了,我覺得的原因是threadFactory建立了兩個相同的thread,不知道還有其他原因沒。
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 上面的執行緒建立可能失敗,或者執行緒工廠返回null
// 或者執行緒啟動時,丟擲OutOfMemoryError
if (! workerStarted)
// 回滾狀態
addWorkerFailed(w);
}
return workerStarted;
看完了addWorker的步驟,程式碼中有個Worker類,看似是執行緒但又不完全是執行緒,我們去看看它的結構。
Worker
這個類的主要作用是,維護執行緒執行任務的中斷控制狀態和記錄每個執行緒完成的任務數。
整體結構
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** 每個執行緒的任務完成數 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 在建立執行緒時,將任務傳入到threadFactory中
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// 將執行委託給外部方法runWorker,下面會詳見。
// 這裡是執行任務的核心程式碼
runWorker(this);
}
// 實現AQS的獨佔模式的方法,該鎖不能重入。
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 執行緒執行之後才可以被中斷
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {}
}
}
}
我們來看看runWorker的實現。這個類主要的工作就是,不停地從阻塞佇列中獲取任務並執行,若firstTask不為空,就直接執行它。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// getTask控制阻塞等待任務或者是否超時就清除空閒的執行緒
// getTask非常之重要,後面會講到
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 第二種情況,重新檢測執行緒池狀態,因為此時可能其他執行緒會呼叫shutdownNow
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 執行前
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執行firstTask的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執行後
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 處理worker退出
processWorkerExit(w, completedAbruptly);
}
}
啟動一個執行緒,大致執行的方法流程
getTask,我們來看看它是怎樣阻塞或定時等待任務的。
Performs blocking or timed wait for a task, depending on current configuration settings, or returns null if this worker must exit because of any of:
- There are more than maximumPoolSize workers (due to a call to setMaximumPoolSize).
- The pool is stopped.
- The pool is shutdown and the queue is empty.
- This worker timed out waiting for a task, and timed-out workers are subject to termination (that is, allowCoreThreadTimeOut || workerCount > corePoolSize) both before and after the timed wait, and if the queue is non-empty, this worker is not the last thread in the pool. (超時等待任務的worker,在定時等待前後都會被終止(情況有,allowCoreThreadTimeOut || wc > corePoolSize)
Returns:
task, or null if the worker must exit, in which case workerCount is decremented(worker退出時,workerCount會減一)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢測是否有必要返回新任務,注意每個狀態的含義就明白了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 檢測worker是否需要被淘汰
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 下面的程式碼結合上面的timed變數,超時後,當大於corePoolSize時,返回null
// 或者當allowCoreThreadTimeOut = true時,超時後,返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 這裡r為null的話,只能是timed = true的情況;take(),一直會阻塞直到有任務返回
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
我們來看看當getTask返回null時,執行緒池是如何處理worker退出的
根據runWorker的程式碼,getTask為null,迴圈體正常退出,此時completedAbruptly = false;
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 1. 有異常退出的話, workerCount將會減一
// 2. 正常退出的話,因為在getTask中已經減一,所以這裡不用理會
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 將worker完成的任務數加到completedTaskCount
// 從workers中移除當前worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 檢測執行緒池是否有資格設定狀態為TERMINATED
tryTerminate();
int c = ctl.get();
// 此時的狀態是RUNNING或SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 1. 非正常退出的,addWorker()
// 2. 正常退出的, workerCount小於最小的執行緒數,就addWorker()
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
getTask是保證存在的執行緒不被銷燬的核心,getTask則利用阻塞佇列的take方法,一直阻塞直到獲取到任務為止。
當大於corePoolSize時,需要入隊
// execute第二部分程式碼
// 執行緒池狀態是RUNNING(只有RUNNING才可以接受新任務)
// 此時,workerCount >= corePoolSize, 將任務入隊
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 此時執行緒池可能被shutdown了。
// 需要清除剛新增的任務,若任務還沒有被執行,就可以讓它不被執行
if (! isRunning(recheck) && remove(command))
reject(command);
// 若此時沒有worker,新建一個worker去處理佇列中的任務
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
佇列已滿
// execute第三部分程式碼
// addWorker第二個引數false表明,以maximumPoolSize為界限
else if (!addWorker(command, false))
// workerCount > maximumPoolSize 就對任務執行拒絕策略
reject(command);
我們就講完了執行方法execute(),有興趣的同學可以去看看關閉方法shutdown()以及shutdownNow(),看看他們的區別。當然也可以去研究一下其他方法的原始碼。
探究一些小問題
- runWorker為啥這樣拋錯
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
...
}
We separately handle RuntimeException, Error (both of which the specs guarantee that we trap) and arbitrary Throwables. Because we cannot rethrow Throwables within Runnable.run, we wrap them within Errors on the way out (to the thread's UncaughtExceptionHandler). Any thrown exception also conservatively causes thread to die.
大致意思就是,分別處理RuntimeException、Error和任何的Throwable。因為不能在 Runnable.run 中重新丟擲 Throwables,所以將它們包裝在 Errors中(到執行緒的 UncaughtExceptionHandler).
在Runnable.run不能丟擲Throwables的原因是,Runnable中的run並沒有定義丟擲任何異常,繼承它的子類,拋錯的範圍不能超過父類
UncaughtExceptionHandler可以處理“逃逸的異常”,可以去了解一下。
建立執行緒池最好手動建立,引數根據系統自定義
圖中的設定執行緒數的策略只是初步設定,下一篇我們去研究具體的執行緒數調優為什麼建立執行緒開銷大
啟動一個執行緒時,將涉及大量的工作
- 必須為執行緒堆疊分配和初始化一大塊記憶體。
- 需要建立/註冊native thread在host OS中
- 需要建立、初始化描述符並將其新增到 JVM 內部資料結構中。
雖然啟動一個執行緒的時間不長,耗費的資源也不大,但有個東西叫"積少成多"。就像
Doug Lea寫的原始碼一樣,有些地方的細節優化,看似沒必要,但是請求一多起來,那些細節就是"點睛之筆"了。
當我們有大量需要執行緒時且每個任務都是獨立的,儘量考慮使用執行緒池
總結
執行緒池的總體流程圖
執行緒池新建執行緒,如何保證可以不斷地獲取任務,就是通過阻塞佇列(BlockingQueue)的take方法,阻塞自己直到有任務才返回。
本篇部落格也到這裡就結束了,學習執行緒池以及輸出部落格,中間也拖了很久,最後送給大家以及自己最近看到的一句話
往往最難的事和自己最應該做的事是同一件事
參考
- Why is creating a Thread said to be expensive? 建立執行緒為何開銷較大
- Java執行緒池實現原理及其在美團業務中的實踐 講了執行緒池的原理以及在美團的一些實際運用
- 10問10答:你真的瞭解執行緒池嗎? 一些使用執行緒池的建議