1. 程式人生 > >拜託,不要再問我執行緒池啦!

拜託,不要再問我執行緒池啦!

Java提供了幾種便捷的方法建立執行緒池,通過這些內建的api就能夠很輕鬆的建立執行緒池。在`java.util.concurrent`包中的`Executors`類,其中的靜態方法就是用來建立執行緒池的: * newFixedThreadPool():建立一個固定執行緒數量的執行緒池,而且執行緒池中的任務全部執行完成後,空閒的執行緒也不會被關閉。 * newSingleThreadExecutor():建立一個只有一個執行緒的執行緒池,空閒時也不會被關閉。 * newCachedThreadPool():建立一個可快取的執行緒池,執行緒的數量為`Integer.MAX_VALUE`,空閒執行緒會臨時快取下來,執行緒會等待`60s`還是沒有任務加入的話就會被關閉。 `Executors`類中還有一些建立執行緒池的方法(jdk8新加的),但是現在這個觸極到我的知識盲區了~~ ![](https://arch-digest.oss-cn-shanghai.aliyuncs.com/thread-pool/14301602992990_.pic_hd.jpg) 上面那幾個方法,其實都是建立了一個`ThreadPoolExecutor`物件作為返回值,要搞清楚執行緒池的原理主要還是要分析`ThreadPoolExecutor`這個類。 `ThreadPoolExecutor`的構造方法: ``` public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... } ``` `ThreadPoolExecutor`的構造方法包含以下幾個引數: * corePoolSize: 核心執行緒數量,常駐執行緒池中的執行緒,即時執行緒池中沒有任務可執行,也不會被關閉。 * maximumPoolSize:最大執行緒數量 * keepAliveTime:空閒執行緒存活時間 * unit: 空閒執行緒存活時間的單位 * workQueue:工作佇列,執行緒池一下忙不過來,那新來的任務就需要排隊,排除中的任務就會放在workQueue中 * threadFactory:執行緒工廠,建立執行緒用的 * handler:`RejectedExecutionHandler`例項用於線上程池中沒有空閒執行緒能夠執行任務,並且`workQueue`中也容不下任務時拒絕任務時的策略。 `ThreadPoolExecutor`中的執行緒統稱為工作執行緒,但有一個小概念是`核心執行緒`,核心執行緒由引數`corePoolSize`指定,如`corePoolSize`設定5,那執行緒池中就會有5條執行緒常駐執行緒池中,不會被回收掉,但是也會有例外,如果`allowCoreThreadTimeOut`為`true`空閒一段時間後,也會被關閉。 ### 執行緒的狀態和工作執行緒數量 執行緒中的狀態和工作執行緒和數量都是由`ctl`表示,是一個`AtomicInteger`型別的屬性: ``` private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); ``` ctl的高四位為執行緒的狀態,其他位數為工作執行緒的數量,所以執行緒中最大的工作執行緒數量為`(2^29)-1`。 執行緒池中的狀態有五種: * RUNNING:接收新的任務和處理佇列中的任務 * SHUTDOWN:不能新增任務,但是會繼續處理已經新增的任務 * STOP:不能新增任務,不會繼續處理已經新增任務 * TIDYING:所有的任務已經被終止,工作執行緒為0 * TERMINATED:terminated()方法執行完成 狀態碼的定義如下: ``` private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; ``` ### 建立執行緒池 如果有面試官問:如何正確的建立執行緒池?千萬不要說使用`Executors`建立執行緒,雖然`Executors`能很方便的建立執行緒池,但是他提供的靜態建立方法會有一些坑。 **主要的原因是:`maximumPoolSize`和`workQueue`這兩個引數** `Executors`靜態方法在建立執行緒池時,如果`maximumPoolSize`設定為`Integer.MAX_VALUE`,這樣會導致執行緒池可以一直要以接收執行任務,可能導致cpu負載過高。 `workQueue`是一個阻塞佇列的例項,用於放置正在等待執行的任務。如果在建立執行緒種時`workQueue`例項沒有指定任務的容量,那麼等待佇列中可以一直新增任務,極有可能導致`oom`。 所以建立執行緒,**最好是根據執行緒池的用途,然後自己建立執行緒**。 ### 新增任務 呼叫執行緒池的`execute`並不是立即執行任務,執行緒池內部用經過一頓操作,如:判斷核心執行緒數、是否需要新增到等待佇列中。 下來的程式碼是`execute`的原始碼,程式碼很簡潔只有2個`if`語句: ``` public void execute(Runnable command) { int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } ``` 1. 第一個if,如果當前執行緒池中的工作執行緒數量小於`corePoolSize`,直接建立一個工作執行緒執行任務 2. 第二個if,當執行緒池處於執行狀態,呼叫`workQueue.offer(command)`方法將任務新增到`workQueue`,否則呼叫`addWorker(command, false)`嘗試去新增一個工作執行緒。 整理了一張圖,把執行緒池分為三部分`Core Worker`、`Worker`、`workQueue`: ![](https://arch-digest.oss-cn-shanghai.aliyuncs.com/thread-pool/14311602993000_.pic.jpg) 換一種說法,在呼叫`execute`方法時,任務首先會放在`Core Worker`內,然後才是`workQueue`,最後才會考慮`Worker`。 這樣做的原因可以保證`Core Worker`中的任務執行完成後,能立即從`workQueue`獲取下一個任務,而不需要啟動別的工作執行緒,用最少的工作執行緒辦更多的事。 ### 建立工作執行緒 在`execute`方法中,有三個地方呼叫了`addWorker`。`addWorker`方法可以分為二部分: 1. 增加工作執行緒數量 2. 啟動工作執行緒 `addWorker`的方法簽名如下: ``` private boolean addWorker(Runnable firstTask, boolean core) ``` * **firstTask**:第一個執行的任務,可以為空。如果為空任務會從`workQueue`中獲取。 * **core**: 是否是核心工作執行緒 #### 增加工作執行緒數量 ``` retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); .... for (;;) { int wc = workerCountOf(c); if (wc >
= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } ``` 上面程式碼省略了一部分程式碼,主要程式碼都在`for`迴圈中,利用`CAS`鎖,安全的完成執行緒池狀態的檢查與增加工作執行緒的數量。其中的`compareAndIncrementWorkerCount(c)`呼叫就是將工作執行緒數量+1。 #### 啟動工作執行緒 增加工作執行緒的數量後,緊接著就會啟動工作執行緒: ``` boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s >
largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } ``` 啟動工作執行緒的流程: * 建立一個`Worker`例項, `Worker`構造方法會使用`ThreadFactory`建立一個執行緒 ``` w = new Worker(firstTask); final Thread t = w.thread; ``` 就不說`Worker`類的實現了,直接給出構造方法來細品: ``` Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } ``` * 如果執行緒池狀態是在執行中,或者已經關閉,但工作執行緒要從`workQueue`中獲取任務,才能新增工作執行緒 ``` if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s >
largestPoolSize) largestPoolSize = s; workerAdded = true; } ``` **注意:**:當執行緒池處於`SHUTDOWN`狀態時,它不能接收新的任務,但是可以繼續執行未完成的任務。任務是否從`workQueue`中獲取,是根據`firstTask`判斷,每個`Worker`例項都有一個`firstTask `屬性,如果這個值為`null`,工作執行緒啟動的時候就會從`workQueue`中獲取任務,否則會執行`firstTask `。 * 啟動執行緒 呼叫執行緒的`start`方法,啟動執行緒。 ``` if (workerAdded) { t.start(); workerStarted = true; } ``` ### 執行任務 回過頭來看一個`Worker`類的定義: ``` private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } ... } ``` `Worker`類實現了`Runnable`介面,同時在構造方法中會將`this`傳遞給執行緒,到這裡你就知道了`Worker`例項中有`run`方法,它會線上程啟動後執行: ``` public void run() { runWorker(this); } ``` `run`方法內部接著呼叫`runWorker`方法執行任務,在這裡才是真正的開始執行任務了: ``` final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } ``` * 獲取任務 首先將`firstTask`傳遞給`task`臨時變數: ``` Runnable task = w.firstTask; ``` 然後迴圈檢查`task`或者從`workQueue`中獲取任務: ``` while (task != null || (task = getTask()) != null) { ... } ``` `getTask()`稍後再做分析。 * 執行任務 去掉一些狀態檢查、異常捕獲、和勾子方法呼叫後,保留最重要的呼叫`task.run()`: ``` while (task != null || (task = getTask()) != null) { ... task.run(); ... } ``` `task`其實就是通過呼叫`execute`方法傳遞進來的`Runnable`例項,也就是你的任務。只不過它可能儲存在`Worker.firstTask`中,或者在`workQueue`中,儲存在哪裡在前面的`任務新增順序`中已經說明。 ### 從workQueue中獲取任務 試想一下如果每個任務執行完成,就關閉掉一個執行緒那有多浪費資源,這樣使用執行緒池也沒有多大的意義。所以執行緒的主要的功能就是執行緒複用,一旦任務執行完成直接去獲取下一個任務,或者掛起執行緒等待下一個提交的任務,然後等待一段時間後還是沒有任務提交,然後才考慮是否關閉部分空閒的執行緒。 `runWorker`中會迴圈的獲取任務: ``` while (task != null || (task = getTask()) != null) { ... task.run(); ... } ``` 上面的程式碼`getTask()`就是從`workQueue`中獲取任務: ``` private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { ... int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ``` 獲取任務的時候會有兩種方式: 1. 超時等待獲取任務 2. 一直等待任務,直到有新任務 如果`allowCoreThreadTimeOut`為`true`,`corePoolSize`指定的核心執行緒數量會被忽略,直接使用 ` workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)` 獲取任務,否則的話會根據當前工作執行緒的數量,如果`wc > corePoolSize`為`false`則當前會被認為是核心執行緒,呼叫`workQueue.take()`一直等待任務。 ### 工作執行緒的關閉 還是在`runWorker`方法中: ``` final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { task.run(); } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } ``` * completedAbruptly變數:標記當前工作執行緒是正常執行完成,還是異常完成的。completedAbruptly為`false`可以確定執行緒池中沒有可執行的任務了。 上面程式碼是簡潔後的程式碼,一個`while`迴圈保證不間斷的獲取任務,沒有任務可以執行(task為null)退出迴圈,最後再才會呼叫`processWorkerExit`方法: ``` private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } ``` `processWorkerExit`接收一個`Worker`例項與`completedAbruptly`變數。processWorkerExit的大致工作流程: * 判斷當前工作執行緒是否異常完成,如果是直接減少工作執行緒的數量,簡單的說就是校正一下工作執行緒的數量。 * 增加完成的任務數量,將`Worker`從`workers`中移除 * tryTerminate() 檢查執行緒池狀態,因為執行緒池可以延遲關閉,如果你呼叫`shutdown`方法後不會立即關閉,要等待所有的任務執行完成,所以這裡呼叫tryTerminate()方法,嘗試去呼叫`terminated`方法。 #### 工作執行緒完成策略 如果某個工作執行緒完成,執行緒池內部會判斷是否需要重新啟動一個: ``` //判斷執行緒池狀態 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { //獲取最小工作執行緒數量 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果最小工作執行緒數量為0,但是workQueue中還有任務,那重置最小工作執行緒數量1 if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果當前工作執行緒數數量大於或等於最小工作執行緒數量,則不需要啟動新的工作執行緒 if (workerCountOf(c) >= min) return; // replacement not needed } //啟動一個新的工作執行緒 addWorker(null, false); } ``` 工作執行緒完成後有兩種處理策略: 1. 對於異常完成的工作執行緒,直接啟動一個新的替換 2. 對於正常完成的工作執行緒,判斷當前工作執行緒是否足夠,如果足夠則不需要新啟動工作執行緒 **注意:**這裡的完成,表示工作執行緒的任務執行完成,`workQueue`中也沒有任務可以獲取了。 ### 執行緒池的關閉 關閉執行緒池有可以通過`shutdown`方法: ``` public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } ``` `shutdown`方法,第一步就是先改變執行緒池的狀態,呼叫`advanceRunState(SHUTDOWN)`方法,將執行緒池當前狀態更改為`SHUTDOWN`,advanceRunState程式碼如下: ``` private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } ``` 然後立即呼叫`interruptIdleWorkers()`方法,`interruptIdleWorkers()`內部會呼叫它的過載方法`interruptIdleWorkers(boolean onlyOne)`同時onlyOne引數傳遞的`false`來關閉空閒的執行緒: ``` private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } ``` 以上程式碼會遍歷`workers`中的`Worker`例項,然後呼叫執行緒的`interrupt()`方法。 #### 什麼樣的執行緒才是空閒工作執行緒? 前面提到過在`getTask()`中,執行緒從`workQueue`中獲取任務時會阻塞,**被阻塞的執行緒就是空閒的**。 再次回到`getTask()`的程式碼中: ``` private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } ... int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ``` 再次分析`getTask()`中的程式碼中有一段捕獲`InterruptedException`的程式碼塊,interruptIdleWorkers方法中斷執行緒後,`getTask()`會捕獲中斷異常,因為外面是一個`for`迴圈,隨後程式碼走到判斷執行緒池狀態的地方: ``` if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } ``` 上面的程式碼的會判斷當前執行緒池狀態,如果狀態大於`STOP`或者狀態等於`SHUTDOWN`並且`workQueue`為空時則返回`null`,`getTask()`返回空那麼在`runWorker`中迴圈就會退出,當前工作執行緒的任務就完成了,可以退出了: ``` final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { task.run(); } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } ``` #### shutdownNow 除了shutdown方法能關閉執行緒池,還有`shutdownNow`也可以關閉執行緒池。它兩的區別在於: * `shutdownNow`會清空`workQueue`中的任務 * `shutdownNow`還會中止當前正在執行的任務 * `shutdownNow`會使執行緒進入`STOP`狀態,而`shutdown()`是`SHUTDOWN`狀態 ``` public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } ``` 上面程式碼基本流程: * advanceRunState(STOP): 使執行緒池進行`STOP`狀態,與`shutdown()`中的一致 ,只是使用的狀態碼是`STOP` * interruptWorkers(): 與`shutdown()`中的一致 * drainQueue(): 清空佇列 #### 任務是中止執行還是繼續執行? 呼叫shutdownNow()後執行緒池處於`STOP`狀態,緊接著所有的工作執行緒都會被呼叫`interrupt`方法,如果此時`runWorker`還在執行會發生什麼? 在`runWorker`有一段程式碼,就是工作執行緒中止的重要程式碼: ``` final void runWorker(Worker w) { ... while (task != null || (task = getTask()) != null) { if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); task.run(); } ... } ``` 重點關注: ``` if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); ``` 這個if看起來有點難理解,理解下來大致意思是:如果執行緒池狀態大於等於`STOP`,立即中斷執行緒,否則清除執行緒的中斷標記,也就是說當執行緒池狀態為`RUNNING`和`SHUTDOWN`時,執行緒的中斷標記會被清除(執行緒的中斷程式碼在`interruptWorkers`方法中),可以繼續執行任務。 以上程式碼執行完成後,緊接著就會呼叫`task.run()`方法,**這裡面我們自己就可以根據執行緒的中斷標記來判斷任務是否被中斷。** ### 總結 個人水平有限,文中如有錯誤,謝謝大家指正。 本文從執行緒池的原始碼入手,分析執行緒池的建立、新增任務、執行任務等流程,整個分析下來基本上大多數公司關於執行緒池面試的問題都可以回答得上來,當然還有一些小細節如:`Worker`類是繼承`AQS`的,為什麼這麼做其實原始碼中都有一些苗頭,`Worker`在執行時會鎖住執行的程式碼塊,而`shutdown`在關閉空閒的`Worker`時,首先就要去獲取`Worker`的同步鎖才能繼續操作,這樣才能安全的關閉工作執行緒。 > > 歡迎關注我的公眾號:架構文摘,獲得獨家整理120G的免費學習資源助力你的架構師學習之路! > > **公眾號後臺回覆`arch028`獲取資料:** ![](https://img2020.cnblogs.com/blog/585087/202010/585087-20201019131329397-20237905