1. 程式人生 > >Java 執行緒池中的執行緒複用是如何實現的?

Java 執行緒池中的執行緒複用是如何實現的?

前幾天,技術群裡有個群友問了一個關於執行緒池的問題,內容如圖所示: ![](https://img-blog.csdnimg.cn/20200614214310802.jpg) 關於執行緒池相關知識可以先看下這篇:[為什麼阿里巴巴Java開發手冊中強制要求執行緒池不允許使用Executors建立?](https://mp.weixin.qq.com/s/scy1LEDC9_8j263BDtJerQ) 那麼就來和大家探討下這個問題,線上程池中,執行緒會從 workQueue 中讀取任務來執行,最小的執行單位就是 Worker,Worker 實現了 Runnable 介面,重寫了 run 方法,這個 run 方法是讓每個執行緒去執行一個迴圈,在這個迴圈程式碼中,去判斷是否有任務待執行,若有則直接去執行這個任務,因此執行緒數不會增加。 如下是執行緒池建立執行緒的整體流程圖: ![](https://img-blog.csdnimg.cn/20200614223220216.png) 首先會判斷執行緒池的狀態,也就是是否在執行,若執行緒為非執行狀態,則會拒絕。接下來會判斷執行緒數是否小於核心執行緒數,若小於核心執行緒數,會新建工作執行緒並執行任務,隨著任務的增多,執行緒數會慢慢增加至核心執行緒數,如果此時還有任務提交,就會判斷阻塞佇列 `workQueue` 是否已滿,若沒滿,則會將任務放入到阻塞佇列中,等待工作執行緒獲得並執行,如果任務提交非常多,使得阻塞佇列達到上限,會去判斷執行緒數是否小於最大執行緒數 `maximumPoolSize`,若小於最大執行緒數,執行緒池會新增工作執行緒並執行任務,如果仍然有大量任務提交,使得執行緒數等於最大執行緒數,如果此時還有任務提交,就會被拒絕。 現在我們對這個流程大致有所瞭解,那麼讓我們去看看原始碼是如何實現的吧! 執行緒池的任務提交從 submit 方法來說,submit 方法是 AbstractExecutorService 抽象類定義的,主要做了兩件事情: 1. 把 Runnable 和 Callable 都轉化成 FutureTask 2. 使用 execute 方法執行 FutureTask execute 方法是 ThreadPoolExecutor 中的方法,原始碼如下: ``` public void execute(Runnable command) { // 若任務為空,則拋 NPE,不能執行空任務 if (command == null) { throw new NullPointerException(); } int c = ctl.get(); // 若工作執行緒數小於核心執行緒數,則建立新的執行緒,並把當前任務 command 作為這個執行緒的第一個任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { return; } c = ctl.get(); } /** * 至此,有以下兩種情況: * 1.當前工作執行緒數大於等於核心執行緒數 * 2.新建執行緒失敗 * 此時會嘗試將任務新增到阻塞佇列 workQueue */ // 若執行緒池處於 RUNNING 狀態,將任務新增到阻塞佇列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { // 再次檢查執行緒池標記 int recheck = ctl.get(); // 如果執行緒池已不處於 RUNNING 狀態,那麼移除已入隊的任務,並且執行拒絕策略 if (!isRunning(recheck) && remove(command)) { // 任務新增到阻塞佇列失敗,執行拒絕策略 reject(command); } // 如果執行緒池還是 RUNNING 的,並且執行緒數為 0,那麼開啟新的執行緒 else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } /** * 至此,有以下兩種情況: * 1.執行緒池處於非執行狀態,執行緒池不再接受新的執行緒 * 2.執行緒處於執行狀態,但是阻塞佇列已滿,無法加入到阻塞佇列 * 此時會嘗試以最大執行緒數為界建立新的工作執行緒 */ else if (!addWorker(command, false)) { // 任務進入執行緒池失敗,執行拒絕策略 reject(command); } } ``` 可以看到 execute 方法中的的核心方法為 addWorker,再去看 addWorker 方法之前,先看下 Worker 的初始化方法: ``` Worker(Runnable firstTask) { // 每個任務的鎖狀態初始化為-1,這樣工作執行緒在執行之前禁止中斷 setState(-1); this.firstTask = firstTask; // 把 Worker 作為 thread 執行的任務 this.thread = getThreadFactory().newThread(this); } ``` 在 Worker 初始化時把當前 Worker 作為執行緒的構造器入參,接下來從 addWorker 方法中可以找到如下程式碼: ``` final Thread t = w.thread; // 如果成功添加了 Worker,就可以啟動 Worker 了 if (workerAdded) { t.start(); workerStarted = true; } ``` 這塊程式碼是新增 worker 成功,呼叫 start 方法啟動執行緒,`Thread t = w.thread;` 此時的 w 是 Worker 的引用,那麼`t.start();`實際上執行的就是 Worker 的 run 方法。 Worker 的 run 方法中呼叫了 runWorker 方法,簡化後的 runWorker 原始碼如下: ``` final void runWorker(Worker w) { Runnable task = w.firstTask; while (task != null || (task = getTask()) != null) { try { task.run(); } finally { task = null; } } } ``` 這個 while 迴圈有個 getTask 方法,getTask 的主要作用是阻塞從佇列中拿任務出來,如果佇列中有任務,那麼就可以拿出來執行,如果佇列中沒有任務,這個執行緒會一直阻塞到有任務為止(或者超時阻塞),其中 getTask 方法的時序圖如下: ![](https://img-blog.csdnimg.cn/20200615005233866.png) 其中執行緒複用的關鍵是 1.6 和 1.7 部分,這部分原始碼如下: ``` Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); ``` 使用佇列的 poll 或 take 方法從佇列中拿資料,根據佇列的特性,佇列中有任務可以返回,佇列中無任務會阻塞。 執行緒池的執行緒複用就是通過取 Worker 的 firstTask 或者通過 getTask 方法從 workQueue 中不停地取任務,並直接呼叫 Runnable 的 run 方法來執行任務,這樣就保證了每個執行緒都始終在一個迴圈中,反覆獲取任務,然後執行任務,從而實現了執行緒的複用。 # 總結 本文主要從原始碼的角度解析了 Java 執行緒池中的執行緒複用是如何實現的。歡迎大家留言交流討論。 **最好的關係就是互相成就**,大家的**在看、轉發、留言**三連就是我創作的最大動力。 更詳細的原始碼解析可以點選連結檢視:https://github.com/wupeixuan/JDKSourceCode1.8 > 參考 > > https://github.com/wupeixuan/JDKSourceCode1.8 > > 面試官系統精講Java原始碼及大廠真題 > > Java併發程式設計學習寶典 > > Java 併發面