Java中執行緒池ThreadPoolExecutor原理探究
一、 前言
執行緒池主要解決兩個問題:一方面當執行大量非同步任務時候執行緒池能夠提供較好的效能,,這是因為使用執行緒池可以使每個任務的呼叫開銷減少(因為執行緒池執行緒是可以複用的)。另一方面執行緒池提供了一種資源限制和管理的手段,比如當執行一系列任務時候對執行緒的管理,每個ThreadPoolExecutor也保留了一些基本的統計資料,比如當前執行緒池完成的任務數目。
另外,執行緒池提供許多可調引數和可擴充套件性鉤子。程式設計師可以使用更方便
工廠方法比如newCachedThreadPool(無限執行緒池,執行緒自動回收),newFixedThreadPool(固定大小的執行緒池)newSingleThreadExecutor(單個執行緒),當然使用者還可以自定義。
二、 類圖結構
ClassDiagram1.jpgExecutors其實是個工具類,裡面提供了好多靜態方法,根據使用者選擇返回不同的執行緒池例項。
ThreadPoolExecutor繼承了AbstractExecutorService,成員變數ctl是個Integer的原子變數用來記錄執行緒池狀態 和 執行緒池執行緒個數,類似於ReentrantReadWriteLock使用一個變數存放兩種資訊。
Integer型別是32位二進位制標示,其中高3位用來表示執行緒池狀態,後面 29位用來記錄執行緒池執行緒個數。
//用來標記執行緒池狀態(高3位),執行緒個數(低29位)
//預設是RUNNING狀態,執行緒個數為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//執行緒個數掩碼位數
private static final int COUNT_BITS = Integer.SIZE - 3;
//執行緒最大個數(低29位)00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//(高3位):11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//(高3位):00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//(高3位):01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//(高3位):01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 獲取高三位 執行狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取低29位 執行緒個數
private static int workerCountOf(int c) { return c & CAPACITY; }
//計算ctl新值,執行緒狀態 與 執行緒個數
private static int ctlOf(int rs, int wc) { return rs | wc; }
<strong> 執行緒池狀態含義:</strong>
- RUNNING:接受新任務並且處理阻塞佇列裡的任務
- SHUTDOWN:拒絕新任務但是處理阻塞佇列裡的任務
- STOP:拒絕新任務並且拋棄阻塞佇列裡的任務同時會中斷正在處理的任務
- TIDYING:所有任務都執行完(包含阻塞佇列裡面任務)當前執行緒池活動執行緒為0,將要呼叫terminated方法
- TERMINATED:終止狀態。terminated方法呼叫完成以後的狀態
<strong> 執行緒池狀態轉換:</strong>
- RUNNING -> SHUTDOWN
顯式呼叫shutdown()方法,或者隱式呼叫了finalize(),它裡面呼叫了shutdown()方法。 - RUNNING or SHUTDOWN)-> STOP
顯式 shutdownNow()方法 - SHUTDOWN -> TIDYING
當執行緒池和任務佇列都為空的時候 - STOP -> TIDYING
當執行緒池為空的時候 - TIDYING -> TERMINATED
當 terminated() hook 方法執行完成時候
<strong> 執行緒池引數:</strong>
- corePoolSize:執行緒池核心執行緒個數
- workQueue:用於儲存等待執行的任務的阻塞佇列。
比如基於陣列的有界ArrayBlockingQueue、,基於連結串列的無界LinkedBlockingQueue,最多隻有一個元素的同步佇列SynchronousQueue,優先順序佇列PriorityBlockingQueue,具體可參考 https://www.atatech.org/articles/81568 - maximunPoolSize:執行緒池最大執行緒數量。
- ThreadFactory:建立執行緒的工廠
- RejectedExecutionHandler:飽和策略,當佇列滿了並且執行緒個數達到maximunPoolSize後採取的策略,比如AbortPolicy(丟擲異常),CallerRunsPolicy(使用呼叫者所線上程來執行任務),DiscardOldestPolicy(呼叫poll丟棄一個任務,執行當前任務),DiscardPolicy(默默丟棄,不丟擲異常)
- keeyAliveTime:存活時間。如果當前執行緒池中的執行緒數量比基本數量要多,並且是閒置狀態的話,這些閒置的執行緒能存活的最大時間
- TimeUnit,存活時間的時間單位
<strong> 執行緒池型別:</strong>
- newFixedThreadPool
建立一個核心執行緒個數和最大執行緒個數都為nThreads的執行緒池,並且阻塞佇列長度為Integer.MAX_VALUE,keeyAliveTime=0說明只要執行緒個數比核心執行緒個數多並且當前空閒則回收。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//使用自定義執行緒建立工廠
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- newSingleThreadExecutor
建立一個核心執行緒個數和最大執行緒個數都為1的執行緒池,並且阻塞佇列長度為Integer.MAX_VALUE,keeyAliveTime=0說明只要執行緒個數比核心執行緒個數多並且當前空閒則回收。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//使用自己的執行緒工廠
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- newCachedThreadPool
建立一個按需建立執行緒的執行緒池,初始執行緒個數為0,最多執行緒個數為Integer.MAX_VALUE,並且阻塞佇列為同步佇列,keeyAliveTime=60說明只要當前執行緒60s內空閒則回收。這個特殊在於加入到同步佇列的任務會被馬上被執行,同步佇列裡面最多隻有一個任務,並且存在後馬上會拿出執行。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//使用自定義的執行緒工廠
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- newSingleThreadScheduledExecutor
建立一個最小執行緒個數corePoolSize為1,最大為Integer.MAX_VALUE,阻塞佇列為DelayedWorkQueue的執行緒池。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
- newScheduledThreadPool
建立一個最小執行緒個數corePoolSize,最大為Integer.MAX_VALUE,阻塞佇列為DelayedWorkQueue的執行緒池。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
其中Worker繼承AQS和Runnable是具體承載任務的物件,Worker繼承了AQS自己實現了簡單的不可重入獨佔鎖,其中status=0標示鎖未被獲取狀態也就是未被鎖住的狀態,state=1標示鎖已經被獲取的狀態也就是鎖住的狀態。
DefaultThreadFactory是執行緒工廠,newThread方法是對執行緒的一個分組包裹,其中poolNumber是個靜態的原子變數,用來統計執行緒工廠的個數,threadNumber用來記錄每個執行緒工廠建立了多少執行緒。
三、 原始碼分析
3.1 新增任務到執行緒池exectue方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取當前執行緒池的狀態+執行緒個數變數
int c = ctl.get();
//當前執行緒池執行緒個數是否小於corePoolSize,小於則開啟新執行緒執行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果執行緒池處於RUNNING狀態,則新增任務到阻塞佇列
if (isRunning(c) && workQueue.offer(command)) {
//二次檢查
int recheck = ctl.get();
//如果當前執行緒池狀態不是RUNNING則從佇列刪除任務,並執行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
//否者如果當前執行緒池執行緒空,則新增一個執行緒
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果佇列滿了,則新增執行緒,新增失敗則執行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
- 如果當前執行緒池執行緒個數小於corePoolSize則開啟新執行緒
- 否則新增任務到任務佇列
- 如果任務佇列滿了,則嘗試新開啟執行緒執行任務,如果執行緒個數>maximumPoolSize則執行拒絕策略。
重點看addWorkder方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢查佇列是否只在必要時為空.(1)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//迴圈cas增加執行緒個數
for (;;) {
int wc = workerCountOf(c);
//如果執行緒個數超限則返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas增加執行緒個數,同時只有一個執行緒成功
if (compareAndIncrementWorkerCount(c))
break retry;
//cas失敗了,則看執行緒池狀態是否變化了,變化則跳到外層迴圈重試重新獲取執行緒池狀態,否者內層迴圈重新cas。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//到這裡說明cas成功了,(2)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//建立worker
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加獨佔鎖,為了workers同步,因為可能多個執行緒呼叫了執行緒池的execute方法。
mainLock.lock();
try {
//重新檢查執行緒池狀態,為了避免在獲取鎖前呼叫了shutdown介面(3)
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//新增任務
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//新增成功則啟動任務
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
程式碼比較長,主要分兩部分,第一部分雙重迴圈目的是通過cas增加執行緒池執行緒個數,第二部分主要是併發安全的把任務新增到workers裡面,並且啟動任務執行。
先看第一部分的(1)
rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())
展開!運算後等價於
s >= SHUTDOWN &&
(rs != SHUTDOWN ||
firstTask != null ||
workQueue.isEmpty())
也就是說下面幾種情況下會返回false:
- 當前執行緒池狀態為STOP,TIDYING,TERMINATED
- 當前執行緒池狀態為SHUTDOWN並且已經有了第一個任務
- 當前執行緒池狀態為SHUTDOWN並且任務佇列為空
內層迴圈作用是使用cas增加執行緒個數,如果執行緒個數超限則返回false,否者進行cas,cas成功則退出雙迴圈,否者cas失敗了,要看當前執行緒池的狀態是否變化了,如果變了,則重新進入外層迴圈重新獲取執行緒池狀態,否者進入內層迴圈繼續進行cas嘗試。
到了第二部分說明CAS成功了,也就是說執行緒個數加一了,但是現在任務還沒開始執行,這裡使用全域性的獨佔鎖來控制workers裡面新增任務,其實也可以使用併發安全的set,但是效能沒有獨佔鎖好(這個從註釋中知道的)。這裡需要注意的是要在獲取鎖後重新檢查執行緒池的狀態,這是因為其他執行緒可可能在本方法獲取鎖前改變了執行緒池的狀態,比如呼叫了shutdown方法。新增成功則啟動任務執行。
3.2 工作執行緒Worker的執行
先看下建構函式:
Worker(Runnable firstTask) {
setState(-1); // 在呼叫runWorker前禁止中斷
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//建立一個執行緒
}
這裡新增一個新狀態-1是為了避免當前執行緒worker執行緒被中斷,比如呼叫了執行緒池的shutdownNow,如果當前worker狀態>=0則會設定該執行緒的中斷標誌。這裡設定了-1所以條件不滿足就不會中斷該執行緒了。執行runWorker時候會呼叫unlock方法,該方法吧status變為了0,所以這時候呼叫shutdownNow會中斷worker執行緒。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // status設定為0,允許中斷
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果執行緒池當前狀態至少是stop,則設定中斷標誌;
// 如果執行緒池當前狀態是RUNNININ,則重置中斷標誌,重置後需要重新
//檢查下執行緒池狀態,因為當重置中斷標誌時候,可能呼叫了執行緒池的shutdown方法
//改變了執行緒池狀態。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//任務執行前幹一些事情
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//執行任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任務執行完畢後幹一些事情
afterExecute(task, thrown);
}
} finally {
task = null;
//統計當前worker完成了多少個任務
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//執行清了工作
processWorkerExit(w, completedAbruptly);
}
}
如果當前task為空,則直接執行,否者呼叫getTask從任務佇列獲取一個任務執行,如果任務佇列為空,則worker退出。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果當前執行緒池狀態>=STOP 或者執行緒池狀態為shutdown並且工作佇列為空則,減少工作執行緒個數
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
//根據timed選擇呼叫poll還是阻塞的take
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//統計整個執行緒池完成的任務個數
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//嘗試設定執行緒池狀態為TERMINATED,如果當前是shutdonw狀態並且工作佇列為空
//或者當前是stop狀態當前執行緒池裡面沒有活動執行緒
tryTerminate();
//如果當前執行緒個數小於核心個數,則增加
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
3.3 shutdown操作
呼叫shutdown後,執行緒池就不會在接受新的任務了,但是工作佇列裡面的任務還是要執行的,但是該方法立刻返回的,並不等待佇列任務完成在返回。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//許可權檢查
checkShutdownAccess();
//設定當前執行緒池狀態為SHUTDOWN,如果已經是SHUTDOWN則直接返回
advanceRunState(SHUTDOWN);
//設定中斷標誌
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//嘗試狀態變為TERMINATED
tryTerminate();
}
如果當前狀態>=targetState則直接返回,否者設定當前狀態為targetState
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
設定所有執行緒的中斷標誌,主要這裡首先加了全域性鎖,同時只有一個執行緒可以呼叫shutdown時候設定中斷標誌,然後嘗試獲取worker自己的鎖,獲取成功則設定中斷標示
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
3.4 shutdownNow操作
呼叫shutdown後,執行緒池就不會在接受新的任務了,並且丟棄工作佇列裡面裡面的任務,正在執行的任務會被中斷,但是該方法立刻返回的,並不等待啟用的任務執行完成在返回。返回佇列裡面的任務列表。
呼叫佇列的drainTo一次當前佇列的元素到taskList,
可能失敗,如果呼叫drainTo後佇列海不為空,則迴圈刪除,並新增到taskList
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//許可權檢查
advanceRunState(STOP);// 設定執行緒池狀態為stop
interruptWorkers();//中斷執行緒
tasks = drainQueue();//移動佇列任務到tasks
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
呼叫佇列的drainTo一次當前佇列的元素到taskList,
可能失敗,如果呼叫drainTo後佇列海不為空,則迴圈刪除,並新增到taskList
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
List<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
3.5 awaitTermination操作
等待執行緒池狀態變為TERMINATED則返回,或者時間超時。由於整個過程獨佔鎖,所以一般呼叫shutdown或者shutdownNow後使用。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
四、總結
執行緒池巧妙的使用一個Integer型別原子變數來記錄執行緒池狀態和執行緒池執行緒個數,設計時候考慮到未來(2^29)-1個執行緒可能不夠用,到時只需要把原子變數變為Long型別,然後掩碼位數變下就可以了,但是為啥現在不一勞永逸的定義為Long那,主要是考慮到使用int型別操作時候速度上比Long型別快些。
通過執行緒池狀態來控制任務的執行,每個worker執行緒可以處理多個任務,執行緒池通過執行緒的複用減少了執行緒建立和銷燬的開銷,通過使用任務佇列避免了執行緒的阻塞從而避免了執行緒排程和執行緒上下文切換的開銷。
另外需要注意的是呼叫shutdown方法作用僅僅是修改執行緒池狀態讓現在任務失敗並中斷當前執行緒,這個中斷並不是讓正在執行的執行緒終止,而是僅僅設定下執行緒的中斷標誌,如果執行緒內沒有使用中斷標誌做一些事情,那麼這個對執行緒沒有影響。
歡迎關注微信公眾號:‘技術原始積累’ 獲取更多技術乾貨__
image.png