1. 程式人生 > >面試官:執行緒池如何按照core、max、queue的執行循序去執行?(內附詳細解析)

面試官:執行緒池如何按照core、max、queue的執行循序去執行?(內附詳細解析)

### 前言 **這是一個真實的面試題。** 前幾天一個朋友在群裡分享了他剛剛面試候選者時問的問題:**"執行緒池如何按照core、max、queue的執行循序去執行?"**。 我們都知道執行緒池中程式碼執行順序是:**corePool->workQueue->maxPool**,原始碼我都看過,你現在問題讓我改原始碼?? 一時間群裡炸開了鍋,小夥伴們紛紛打聽他所在的公司,然後拉黑避坑。**(手動狗頭,大家一起調侃٩(๑❛ᴗ❛๑)۶)** **關於執行緒池他一共問了這麼幾個問題:** - 執行緒池如何按照core、max、queue的順序去執行? - 子執行緒丟擲的異常,主執行緒能感知到麼? - 執行緒池發生了異常改怎樣處理? 全是一些有意思的問題,我之前也寫過一篇很詳細的圖文教程:[【萬字圖文-原創】 | 學會Java中的執行緒池,這一篇也許就夠了!][1] ,不瞭解的小夥伴可以再回顧下~ 但是針對這幾個問題,可能大家一時間也有點懵。今天的文章我們以原始碼為基礎來分析下該如何回答這三個問題。**(之前沒閱讀過原始碼也沒關係,所有的分析都會貼出原始碼及圖解)** ### 執行緒池如何按照core、max、queue的順序執行? #### 問題思考 對於這個問題,很多小夥伴肯定會疑惑:**"別人原始碼中寫好的執行流程你為啥要改?這面試官腦子有病吧......"** 這裡來思考一下現實工作場景中是否有這種需求?之前也看到過一份簡歷也寫到過這個問題: ![場景描述.png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072705573-1473063398.png) 一個執行緒池執行的任務屬於`IO`密集型,`CPU`大多屬於閒置狀態,系統資源未充分利用。如果一瞬間來了大量請求,如果執行緒池數量大於`coreSize`時,多餘的請求都會放入到等待佇列中。等待著`corePool`中的執行緒執行完成後再來執行等待佇列中的任務。 **試想一下,這種場景我們該如何優化?** 我們可以修改執行緒池的執行順序為**corePool->maxPool->workQueue**。 這樣就能夠充分利用`CPU`資源,提交的任務會被優先執行。當執行緒池中執行緒數量大於`maxSize`時才會將任務放入等待佇列中。 你就說巧不巧?面試官的這個問題顯然是經過認真思考來提問的,這是一個很有意思的溫恩提,下面就一起看看如何解決吧。 #### 執行緒池執行流程 我們都知道執行緒池執行流程是先`corePool`再`workQueue`,最後才是`maxPool`的一個執行流程。 ![執行流程.png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072705858-681238152.png) #### 執行緒池核心引數 在回顧下`ThreadPoolExecutor.execute()`原始碼前我們先回顧下執行緒池中的幾個重要引數: ![執行緒池核心引數.png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072706060-1065893697.png) 我們來看下這幾個引數的定義: `corePoolSize`: 執行緒池中核心執行緒數量 `maximumPoolSize`: 執行緒池中最大執行緒數量 `keepAliveTime`: 非核心的空閒執行緒等待新任務的時間 `unit`: 時間單位。配合`allowCoreThreadTimeOut`也會清理核心執行緒池中的執行緒。 `workQueue`: 基於`Blocking`的任務佇列,最好選用有界佇列,指定佇列長度 `threadFactory`: 執行緒工廠,最好自定義執行緒工廠,可以自定義每個執行緒的名稱 `handler`: 拒絕策略,預設是`AbortPolicy` #### ThreadPoolExecutor.execute()原始碼分析 我們可以看下`execute()`如下: ![execute執行原始碼.png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072706289-1610433293.png) 接著來分析下執行過程: 1. 第一步:`workerCountOf(c)`時間計算當前執行緒池中執行緒的個數,當執行緒個數小於核心執行緒數 2. 第二步:執行緒池執行緒數量大於核心執行緒數,此時提交的任務會放入`workQueue`中,使用`offer()`進行操作 3. 第三步:`workQueue.offer()`執行失敗,新提交的任務會直接執行,`addWorker()`會判斷如果當前執行緒池數量大於最大執行緒數,則執行拒絕策略 好了,到了這裡我們都已經很清楚了,關鍵在於第二步和第三步如何交換順序執行呢? #### 解決思路 仔細想一想,如果修改`workQueue.offer()`的實現不就可以達到目的了?我們先來畫圖來看一下: ![問題思路.png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072706519-331395459.png) 現在的問題就在於,如果當前執行緒池中`coreSize < workCount < maxSize`時,一定會先執行`offer()`操作。 我們如果修改`offer`的實現是否可以完成執行順序的更換呢?這裡也是畫圖來展示一下: ![解決方式.png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072706730-1807526275.png) #### Dubbo中EagerThreadPool解決方案 湊巧`Dubbo`中也有類似的實現,在`Dubbo`的`EagerThreadPool`自定義了一個`BlockingQueue`,在`offer()`方法中,如果當前執行緒池數量小於最大執行緒池時,直接返回`false`,這裡就達到了調節執行緒池執行順序的目的。 ![dubbo中解決方案.png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072706935-1372798478.png) **原始碼直達**:https://github.com/apache/dubbo/blob/master/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java 看到這裡一切都真相大白了,解決思路以及方案都很簡單,學會了沒有? 這個問題背後還隱藏了一些場景的優化、原始碼的擴充套件等等知識,果然是一個值得思考的好問題。 ### 子執行緒丟擲的異常,主執行緒能感知到麼? #### 問題思考 這個問題其實也很容易回答,也僅僅是一個面試題而已,實際工作中子執行緒的異常不應該由主執行緒來捕獲。 **針對這個問題,希望大家清楚的是:** 我們要明確執行緒程式碼的邊界,非同步化過程中,子執行緒丟擲的異常應該由子執行緒自己去處理,而不是需要主執行緒感知來協助處理。 #### 解決方案 解決方案很簡單,在虛擬機器中,當一個執行緒如果沒有顯式處理異常而丟擲時會將該異常事件報告給該執行緒物件的 `java.lang.Thread.UncaughtExceptionHandler` 進行處理,如果執行緒沒有設定 `UncaughtExceptionHandler`,則預設會把異常棧資訊輸出到終端而使程式直接崩潰。 所以如果我們想線上程意外崩潰時做一些處理就可以通過實現 `UncaughtExceptionHandler` 來滿足需求。 我們使用執行緒池設定`ThreadFactory`時可以指定`UncaughtExceptionHandler`,這樣就可以捕獲到子執行緒丟擲的異常了。 #### 程式碼示例 具體程式碼如下: ```java /** * 測試子執行緒異常問題 * * @author wangmeng * @date 2020/6/13 18:08 */ public class ThreadPoolExceptionTest { public static void main(String[] args) throws InterruptedException { MyHandler myHandler = new MyHandler(); ExecutorService execute = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setUncaughtExceptionHandler(myHandler).build()); TimeUnit.SECONDS.sleep(5); for (int i = 0; i < 10; i++) { execute.execute(new MyRunner()); } } private static class MyRunner implements Runnable { @Override public void run() { int count = 0; while (true) { count++; System.out.println("我要開始生產Bug了============"); if (count == 10) { System.out.println(1 / 0); } if (count == 20) { System.out.println("這裡是不會執行到的=========="); break; } } } } } class MyHandler implements Thread.UncaughtExceptionHandler { private final static Logger LOGGER = LoggerFactory.getLogger(MyHandler.class); @Override public void uncaughtException(Thread t, Throwable e) { LOGGER.error("threadId = {}, threadName = {}, ex = {}", t.getId(), t.getName(), e.getMessage()); } } ``` 執行結果: ![執行結果.png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072707185-630328833.png) #### UncaughtExceptionHandler 解析 我們來看下`Thread`中的內部介面`UncaughtExceptionHandler`: ```java public class Thread { ...... /** * 當一個執行緒因未捕獲的異常而即將終止時虛擬機器將使用 Thread.getUncaughtExceptionHandler() * 獲取已經設定的 UncaughtExceptionHandler 例項,並通過呼叫其 uncaughtException(...) 方 * 法而傳遞相關異常資訊。 * 如果一個執行緒沒有明確設定其 UncaughtExceptionHandler,則將其 ThreadGroup 物件作為其 * handler,如果 ThreadGroup 物件對異常沒有什麼特殊的要求,則 ThreadGroup 會將呼叫轉發給 * 預設的未捕獲異常處理器(即 Thread 類中定義的靜態未捕獲異常處理器物件)。 * * @see #setDefaultUncaughtExceptionHandler * @see #setUncaughtExceptionHandler * @see ThreadGroup#uncaughtException */ @FunctionalInterface public interface UncaughtExceptionHandler { /** * 未捕獲異常崩潰時回撥此方法 */ void uncaughtException(Thread t, Throwable e); } /** * 靜態方法,用於設定一個預設的全域性異常處理器。 */ public static void setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh) { defaultUncaughtExceptionHandler = eh; } /** * 針對某個 Thread 物件的方法,用於對特定的執行緒進行未捕獲的異常處理。 */ public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) { checkAccess(); uncaughtExceptionHandler = eh; } /** * 當 Thread 崩潰時會呼叫該方法獲取當前執行緒的 handler,獲取不到就會呼叫 group(handler 型別)。 * group 是 Thread 類的 ThreadGroup 型別屬性,在 Thread 構造中例項化。 */ public UncaughtExceptionHandler getUncaughtExceptionHandler() { return uncaughtExceptionHandler != null ? uncaughtExceptionHandler : group; } /** * 執行緒全域性預設 handler。 */ public static UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() { return defaultUncaughtExceptionHandler; } ...... } ``` 部分內容參考自:https://mp.weixin.qq.com/s/ghnNQnpou6-NemhFjpl4Jg ### 執行緒池發生了異常改怎樣處理? 執行緒池中執行緒執行過程中出現了異常該怎樣處理呢?執行緒池提交任務有兩種方式,分別是`execute()`和`submit()`,這裡會依次說明。 #### ThreadPoolExecutor.runWorker()實現 不管是使用`execute()`還是`submit()`提交任務,最終都會執行到`ThreadPoolExecutor.runWorker()`,我們來看下原始碼(原始碼基於JDK1.8): ![runWorker().png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072707418-737082823.png) 我們看到在執行`task.run()`時,出現異常會直接向上丟擲,這裡處理的最好的方式就是在我們業務程式碼中使用`try...catch()`來捕獲異常。 #### FutureTask.run()實現 如果我們使用`submit()`來提交任務,在`ThreadPoolExecutor.runWorker()`方法執行時最終會呼叫到`FutureTask.run()`方法裡面去,不清楚的小夥伴也可以看下我之前的文章: [執行緒池續:你必須要知道的執行緒池submit()實現原理之FutureTask!][2] ![FutureTask.run().png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072707687-894104673.png) 這裡可以看到,如果業務程式碼丟擲異常後,會被`catch`捕獲到,然後呼叫`setExeception()`方法: ![FutureTask.setException().png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072707856-1037893017.png) 可以看到其實類似於直接吞掉了,當我們呼叫`get()`方法的時候異常資訊會包裝到FutureTask內部的變數outcome中,我們也會獲取到對應的異常資訊。 在`ThreadPoolExecutor.runWorker()`最後`finally`中有一個`afterExecute()`鉤子方法,如果我們重寫了`afterExecute()`方法,就可以獲取到子執行緒丟擲的具體異常資訊`Throwable`了。 #### 結論 對於執行緒池、包括執行緒的異常處理推薦以下方式: 1. 直接使用`try/catch`,這個也是最推薦的方式 2. 在我們構造執行緒池的時候,重寫`uncaughtException()`方法,上面示例程式碼也有提到: ```java public class ThreadPoolExceptionTest { public static void main(String[] args) throws InterruptedException { MyHandler myHandler = new MyHandler(); ExecutorService execute = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setUncaughtExceptionHandler(myHandler).build()); TimeUnit.SECONDS.sleep(5); for (int i = 0; i < 10; i++) { execute.execute(new MyRunner()); } } } class MyHandler implements Thread.UncaughtExceptionHandler { private final static Logger LOGGER = LoggerFactory.getLogger(MyHandler.class); @Override public void uncaughtException(Thread t, Throwable e) { LOGGER.error("threadId = {}, threadName = {}, ex = {}", t.getId(), t.getName(), e.getMessage()); } } ``` 3 直接重寫`afterExecute()`方法,感知異常細節 ### 總結 這篇文章到這裡就結束了,不知道小夥伴們有沒有一些感悟或收穫? 通過這幾個面試問題,我也深刻的感受到學習知識要多思考,看原始碼的過程中要多設定一些場景,這樣才會收穫更多。 [1]:https://www.cnblogs.com/wang-meng/p/12945703.html [2]:https://www.cnblogs.com/wang-meng/p/13023710.html ![原創乾貨分享.png](https://img2020.cnblogs.com/other/799093/202006/799093-20200615072709565-1835814