面試又被問線程池原理?教你如何反擊
在阿裏巴巴Java開發手冊中有這麽兩段話,如下圖所示
可以看到提到的兩點,第一要求不能顯示的創建線程,也就是new Thread的這種形式,需要使用線程池對線程進行管理,第二不允許使用官方提供的四種線程池,而是需要通過自行創建的方式去創建線程池,更加理解線程池的允許規則
本文就基於JDK1.8的代碼,對線程池源碼進行解析,帶大家能夠更好的理解線程池的概念以及其運行規則,如有錯誤,請大家指出
一、ThreadPoolExecutor源碼
1.構造函數
先從構造函數看起:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize:核心線程的數量,默認不會被回收掉,但是如果設置了allowCoreTimeOut為true,那麽當核心線程閑置時,也會被回收。
- maximumPoolSize :最大線程數量,線程池能容納的最大容量,上限被CAPACITY限制(2^29-1)(後續代碼會看到)
- keepAliveTime:閑置線程被回收的時間限制,也就是閑置線程的存活時間
- unit :keepAliveTime的單位
- workQueue :用於存放任務的隊列
- threadFactory :創建線程的工廠類
- handler:當任務執行失敗時,使用handler通知調用者,代表拒絕的策略
有的朋友可能還不是很清晰,舉個例子,一個公司,核心線程就是代表公司的內部核心員工,最大線程數量就是員工的最大數量,可能包含非內部員工,因為有一些試點或者簡單的項目,需要一些外協人員來做,也就是非核心線程,那麽當這些項目做完了或者失敗了,公司為了節約用人成本,就遣散非核心員工,也就是閑置線程的存活時間。假如核心員工每個人都很忙,但是需求又一波接一波,那就任務排期,也就是任務隊列,當任務隊列都滿了時候,還要來需求?對不起,不接受,直接拒絕,這也就是handler對應的拒絕策略了,
2.線程池狀態
打開源碼類,可以看到如下幾個變量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
AtomicInteger是一個原子操作類,保證線程安全,采用低29位表示線程的最大數量,高3位表示5種線程池狀態,維護兩個參數,workCount和runState。workCount表示有效的線程數量,runState表示線程池的運行狀態。
- RUNNING:運行狀態,可以接受新任務並處理
- SHUTDOWN:關閉狀態,不會接受新的任務了,但是會處理隊列中還存在的任務
- STOP:停止狀態,不會接受新的任務,也不處理隊列任務,直接中斷
- TIDYING:表示所有任務已經終止了
- TERMINATED:表示terminated()方法已經執行完成
引用一張圖片幫助大家理解5個狀態
3.執行流程
execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn‘t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//如果當前線程數量小於核心線程數量,執行addWorker創建新線程執行command任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果當前是運行狀態,將任務放入阻塞隊列,double-check線程池狀態
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果再次check,發現線程池狀態不是運行狀態了,移除剛才添加進來的任務,並且拒絕改任務
if (! isRunning(recheck) && remove(command))
reject(command);
//處於運行狀態,但是沒有線程,創建線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//往線程池中創建新的線程失敗,則reject任務
else if (!addWorker(command, false))
reject(command);
}
這裏大概總結下execute方法的執行流程,其實大家看源碼方法註釋是一樣很好的學習方法
- 首先判斷當前線程數量是不是比核心線程數量少,如果是,直接創建核心線程執行任務,否則走第二步
- 如果當前線程數量等於核心線程數量了,那麽就任務排期,將任務放進任務隊列,放入成功後,再次check線程池狀態,這裏說明一下,在多線程的環境下,ctl.get()這個方法並不是一個原子操作,如果加入隊列後,線程池狀態改變了,不是RUNNING狀態,那麽這個任務將永遠不會被執行,所以需要再次check,如果不是RUNNING狀態,移除任務並拒絕任務,如果是RUNNING狀態並且當前沒有線程,則直接創建線程
- 走到這一步前提就是第二步中的添加隊列失敗了,也就是任務隊列滿了,那麽這個時候就考慮到創建非核心線程去執行任務,如果添加非核心線程也失敗,那就直接拒絕
這裏註意一點,當核心線程滿的時候,並不會去直接創建非核心線程去執行任務,而是先放進任務隊列,可以理解為需求任務首先是需要讓內部核心員工去完成的,任務隊列的優先級是高於非核心員工的,addWorker(),這裏的傳進去的boolean值,就代表著創建核心線程或者非核心線程
reject()
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
拒絕任務很簡單,reject方法會調用handler的rejectedExecution(command,this)方法,handler是RejectedExecutionHandler接口,默認實現是AbortPolicy,下面是AbortPolicy的實現:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
可以看到默認策略是直接拋出異常的,這只是默認使用的策略,可以通過實現接口實現自己的邏輯。
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 這裏return false的情況有以下幾種
//1.當前狀態是stop及以上 2.當前是SHUTDOWN狀態,但是firstTask不為空
//3.當前是SHUTDOWN狀態,但是隊列中為空
//從第一節我們知道,SHUTDOWN狀態是不執行進來的任務的,但是會繼續執行隊列中的任務
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
這裏就主要流程分析下
- 2層循環,外部循環查詢線程池狀態,如果當前是stop及之上的狀態,直接return,如果是SHUTDOWN狀態,並且firstTask不為空或者隊列中是空的,直接return
- 內部循環查詢線程數量,通過傳遞進來的boolean值,分別和核心線程以及最大線程數量進行對比,如果成立,worker數量+1,並且跳出循環。
- 跳出循環就是實際執行任務了,Worker就將工作線程和任務封裝到了自己內部,我們可以將Worker看成就是一個工作線程,至於Worker是如何執行任務和從阻塞隊列中取任務
Worker()
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
......
}
可以看到,Worker內部維護,一個線程變量以及任務變量,啟動一個 Worker對象中包含的線程 thread, 就相當於要執行 runWorker()方法, 並將該 Worker對象作為該方法的參數.
runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//task不為空,執行當前任務,任務執行完後將task置位空,getTask方法接著不斷從隊列中取任務
while (task != null || (task = getTask()) != null) {
w.lock();
//再次check線程池狀態,如果是stop狀態,直接interrupt()中斷任務
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
通過while循環不斷的調用getTask方法,獲取任務task並進行執行,如果任務都執行完,跳出循環,線程結束並減少當前線程數量。
getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
這裏主要有兩個判斷需要說明下:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize
allowCoreThreadTimeOut :這個第一節有說過,如果核心線程設置了該屬性,也是需要進行回收的,wc > corePoolSize:當前線程是非核心線程也是需要回收的,滿足任何一個條件,timed 置位true
timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take()
如果上述的timed標誌位為true,調用poll方法獲取任務,同時設置超時時間,如果沒有任務,則超時返回null,跳出runWorker的循環,線程結束被回收,如果為false,調用take方法,此時如果沒有任務,不會返回null,而是會進入阻塞狀態,等待任務,不會被回收
二、總結
線程池中的細節比較多,大致做一下總結歸納
- 線程池的構造參數決定了線程池的運行策略,需要理解每個參數的含義,因為每個參數的不同很大程度上決定了這個線程池的運行規則,這也是為什麽阿裏巴巴開發手冊中提到通過自行創建的方式去創建線程池,而不是使用官方提供的4種線程池
- 線程池涉及多線程問題,狀態改變比較頻繁,在進行任務執行時,需要多次check線程池的狀態,保證任務被執行的準確性
- 任務隊列的優先級高於非核心線程,核心線程滿的時候,會先把任務放進任務隊列,其次開啟非核心線程進行執行
- 核心線程和非核心線程本質上並沒有什麽區別,在核心線程設置allowCoreThreadTimeOut 屬性為true時,最終也會因為超時而被銷毀
- 線程池總覽就是通過兩個變量來整體控制整個流程,線程池的狀態,線程池中線程數量,涉及的方法不是很多,但是循環比較多,需要理解每個循環的跳出條件以及對應的狀態。
大概分析就是這麽多,希望有能夠幫助到一些朋友更好的理解線程池的工作原理以及在使用中能夠更好的使用,如有疑問或者錯誤,歡迎一起討論
面試又被問線程池原理?教你如何反擊