執行緒池的原始碼分析
阿新 • • 發佈:2019-02-03
重要變數基本介紹
- corePoolSize:核心執行緒數
- maximumPoolSize:最大執行緒數(理論上最大值 1<<29 - 1)
- largestPoolSize:用於統計實際執行緒的最大併發量
- workQueue:執行緒的儲存佇列
- AtomicInteger:這個類是ThreadPoolExecutor的內部類,其成員變數value(int)引數又來儲存執行緒數大小,其中最高3位表示執行緒狀態,剩餘29位用來儲存執行緒數
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 1110 0000 0000 0000 0000 0000 0000 0000(是一個負數,具體負多少,可以自行計算一下)
private static final int RUNNING = -1 << COUNT_BITS;
// 0000 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
// 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
// 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
**
工作原理,我們以ThreadPoolExecutor分析:
外部入口execute(Runnable command):
public void execute (Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
獲取當前執行緒數和執行緒狀態的引數ctl.get(),ctl在構建ThreadPoolExecutor物件是生成。看如下程式碼
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
...
...
...
private static int ctlOf(int rs, int wc) { return rs | wc; }// RUNNING: 1110 0000 0000 0000 0000 0000 0000 0000
通過函式workerCountOf(c)獲取當前執行緒數與corePoolSize比較,當前執行緒數小於核心執行緒數時,將執行緒加入到工作佇列中:
private static int workerCountOf(int c) { return c & CAPACITY; }
//比如初始化第一次執行時:c = ctl.getValue(),c = 1110 0000 0000 0000 0000 0000 0000 0000
& 0001 1111 1111 1111 1111 1111 1111 1111,結果為0,基表示當前執行緒數為0
分析addWork(command, true)原始碼:
private boolean addWorker(Runnable firstTask, boolean core) { //core是否小於核心執行緒
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// private static int runStateOf(int c) { return c & ~CAPACITY; }
// 就是比較前三位
// 比如: 1110 0000 0000 0000 0000 0000 0000 0000
// & 1110 0000 0000 0000 0000 0000 0000 0000
// = 1110 0000 0000 0000 0000 0000 0000 0000
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && // 第一個條件執行緒池狀態大於0(SHUTDOWN=0,STOP=1,TIDYING=2,TERMINATED=3)
//第二個條件不太理解
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || //**在這裡可以看出最大執行緒數不能大於(1<<29) -1**
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//滿足條件,此函式會把ctl中的value值對應增加,新增成功退出迴圈
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//若此事執行緒狀態發生改變,則繼續迴圈
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)// 計算實際最大執行緒數
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //執行執行緒
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 如果新增失敗,講執行緒移除佇列,ctl.value值減少一位,並改變執行緒狀態
addWorkerFailed(w);
}
return workerStarted;
}
今天先寫到這裡,下次繼續。。。