Java 執行緒池ThreadPoolExecutor(基於jdk1.8)(二)
- private boolean addWorker(Runnable firstTask, boolean core)
首先分析一下引數,firstTask就是指我們使用者傳入的需要執行的認為,core引數,當為真時,表示當前執行緒數小於corePoolSize,為假就是執行緒數大於corePoolSize。該方法主要有兩塊大的邏輯,第一部分是通過CAS操作去跟新當前的執行緒數量:
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
兩個大的for無限迴圈體,外邊的迴圈就是不斷檢測當前執行緒池的狀態,一旦發現不處於Running狀態了,就立刻返回false了,因為通過前邊文章分析我們知道,當執行緒池呼叫了shutdown方法後,就不再接收心的任務了。
再看內層的迴圈體,主要就是通過CAS操作去更新執行緒池數量,
if (compareAndIncrementWorkerCount(c))如果更新成功,那麼就會跳出外層迴圈,繼續後邊的邏輯。如果更新失敗,需要再次檢查當前執行緒池的狀態,因為CAS操作是沒有進行同步操作的,所以有可能此時執行緒池的狀態已經變了,需要再次檢測。發現狀態發生變化,那麼需要跳出內層迴圈直接進行下一次的外層迴圈,重新檢查執行緒池狀態。如果執行緒池的狀態沒有發生變化,則繼續下一次的內層迴圈,重新嘗試更新執行緒池數量直至更新成功。
執行緒池數量更新成功了,就進入下一個邏輯塊,新增執行緒:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
第5行首先新建了一個worker,這個類內部封裝了我們的runnable物件還有執行緒物件(隨後會進行分析),第9行進行加鎖,線上程池操作中,所有涉及到執行緒池狀態變化的程式碼都需要進行同步操作,前一部分用的是CAS,這一部分用到了可重入鎖。
在判斷執行緒池是處於running狀態後,在20行workers.add(w),把工作執行緒新增到一個Set集合中,workers就是一個HashSet用於存放當前執行緒池中存活的執行緒。後邊就很簡單,記錄了一下最大的執行緒數。
關鍵看30行,t.start()啟動了執行緒。這個t就是我們新建的worker中的執行緒。這樣一下來我們線上程數小於corePoolSize的時候,發起的任務就直接通過新建執行緒開始運行了。
為了瞭解發出的任務到底是怎麼執行的,還得繼續分析Worker這個類。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
...//省略了重寫AbstractQueuedSynchronizer類中的方法
}
這個worker還是繼承了Runnable介面,內部有三個成員變數,註釋中已經寫得很明白都要什麼作用。我們關注一下run方法,其實就呼叫了一個runWorker(this),把自己作為引數傳遞進去。那麼繼續跟進這個方法(請保持清醒,因為已經有好幾層呼叫了。。):
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
主體是一個while迴圈,判斷當前task是否為空,不為空就直接執行此任務,關鍵程式碼在第19行task.run(),直接呼叫Runnable的run方法,那就是在當前執行緒執行它,那麼問題來了,現在當前執行緒到底是哪個執行緒了?是不是有點亂了,趕緊來個流程圖,我們觀察一下函式呼叫:
第一二步都是在使用者執行緒(呼叫execute方法時的那個執行緒),後邊三步之後已經切換到新建的worker執行緒中去了,所以終端使用者傳遞給execute的那個Runnable物件是在新的執行緒中執行的。
ok!回到上邊程式碼的while迴圈,或條件判斷的第二個條件回去呼叫getTask方法。看名字都知道,肯定是去BlockingQueue(任務快取佇列)中去取下一個需要執行的任務了。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
第7行的boolean timed = allowCoreThreadTimeOut || wc > corePoolSize,主要是判斷當前執行緒是否需要進行銷燬,allowCoreThreadTimeOut 為真,就是說不管當前的執行緒數有沒有超過corePoolSize,都需要根據keepAliveTime去判斷執行緒的存活期。如果為假那麼就看第二個語句,就是當前執行緒數大於corePoolSize就需要判斷執行緒的存活期,如果不大於,那麼就不需要判斷存活期了。
接下來主要關注一下16-25行的try程式碼塊,需要判斷執行緒的存活期那麼r就為workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)返回的值,也就是說嘗試從快取佇列取任務,時間限制為keepAliveTime,如果在這段時間內取到了任務,就返回,如果沒取到那麼會丟擲異常,那麼就表明這個執行緒已經在keepAliveTime這個時間段裡沒有執行了,那麼該執行緒就該被銷燬了。
time為false的話,就表明不需要判斷執行緒的存活期,直接呼叫workQueue.take(),由於執行緒池採用的阻塞佇列,那麼該執行緒就會一直阻塞,直到從佇列中取到新的任務,然後返回。
到此我們的ThreadPoolExecutor的大致流程就分析完畢了,其中還有很多細節就沒法完全覆蓋到了,如果有理解不當的地方,歡迎大家指出,共同學習交流!