1. 程式人生 > >Java併發包原始碼學習系列:執行緒池ThreadPoolExecutor原始碼解析

Java併發包原始碼學習系列:執行緒池ThreadPoolExecutor原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112301359) - [Java併發包原始碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838) - [Java併發包原始碼學習系列:ReentrantLock可重入獨佔鎖詳解](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112454874) - [Java併發包原始碼學習系列:ReentrantReadWriteLock讀寫鎖解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112689635) - [Java併發包原始碼學習系列:詳解Condition條件佇列、signal和await](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112727669) - [Java併發包原始碼學習系列:掛起與喚醒執行緒LockSupport工具類](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112757098) - [Java併發包原始碼學習系列:JDK1.8的ConcurrentHashMap原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113059783) - [Java併發包原始碼學習系列:阻塞佇列BlockingQueue及實現原理分析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113186979) - [Java併發包原始碼學習系列:阻塞佇列實現之ArrayBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113252384) - [Java併發包原始碼學習系列:阻塞佇列實現之LinkedBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113329416) - [Java併發包原始碼學習系列:阻塞佇列實現之PriorityBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113358710) - [Java併發包原始碼學習系列:阻塞佇列實現之DelayQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113440013) - [Java併發包原始碼學習系列:阻塞佇列實現之SynchronousQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113528143) - [Java併發包原始碼學習系列:阻塞佇列實現之LinkedTransferQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113576816) - [Java併發包原始碼學習系列:阻塞佇列實現之LinkedBlockingDeque原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113620915) - [Java併發包原始碼學習系列:基於CAS非阻塞併發佇列ConcurrentLinkedQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113728300) ## ThreadPoolExecutor概述 ### 執行緒池解決的優點 1. 當執行大量非同步任務時執行緒池能夠提供較好的效能,因為執行緒池中的執行緒是可複用的,不需要每次執行非同步任務時都建立和銷燬執行緒。 2. 提供資源限制和管理的手段,比如可以限制執行緒的個數,動態新增執行緒等等。 ### 執行緒池處理流程 ThreadPoolExecutor執行execute時,流程如下: ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210210220418824-1220801733.png) 1. 如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務,這裡需要加全域性鎖。 2. 如果執行的執行緒數>=corePoolSize,則將任務加入BlockingQueue。 3. 如果此時BlockingQueue已滿,則建立新的執行緒來處理任務,這裡也需要加全域性鎖。 4. 如果建立新執行緒將使當前執行的執行緒超出maximumPoolSize,則按照拒絕策略拒絕任務。 當然啦,這篇文章意在從原始碼角度學習執行緒池這些核心步驟的具體實現啦,執行緒池概念性的東西,可以參考一些其他的部落格: - [【Java併發程式設計】執行緒池相關知識點整理](https://tqbx.gitee.io/javablog/#/%E9%9D%A2%E8%AF%95%E7%9B%B8%E5%85%B3/%E5%B9%B6%E5%8F%91/%E7%BA%BF%E7%A8%8B%E6%B1%A0%E7%9A%84%E7%9F%A5%E8%AF%86%E7%82%B9) - [閃客sun : 圖解 | 原來這就是執行緒池](https://www.cnblogs.com/flashsun/p/14368520.html) ### 建立執行緒池 建立執行緒池有幾種方法,一種是使用Executors工具類快速建立內建的幾種執行緒池,也可以自定義。 一、通過Executor框架的工具類Executors可以建立三種類型的ThreadPoolExecutor。 二、使用ThreadPoolExecutor的各種構造方法。 《阿里巴巴 Java 開發手冊》中:**強制執行緒池不允許使用 Executors 去建立,而是通過 ThreadPoolExecutor 的方式**,這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險 > Executors 返回執行緒池物件的弊端如下: > > - **FixedThreadPool 和 SingleThreadExecutor** : 允許請求的佇列長度為 Integer.MAX_VALUE ,可能堆積大量的請求,從而導致 OOM。 > - **CachedThreadPool 和 ScheduledThreadPool** : 允許建立的執行緒數量為 Integer.MAX_VALUE ,可能會建立大量執行緒,從而導致 OOM。 本篇的重點就是這個ThreadPoolExecutor。 ## 重要常量及欄位 ```java public class ThreadPoolExecutor extends AbstractExecutorService { // 原子的Integer變數ctl,用於記錄執行緒池狀態【高3位】和執行緒池中執行緒個數【低29位】,這裡假設Integer是32位的 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 其實並不是每個平臺的Integer二進位制都是32位的,實際上是,二進位制位-3代表執行緒個數 private static final int COUNT_BITS = Integer.SIZE - 3; // 執行緒最大個數【約5億】 低COUNT_BITS都是1 000 11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 111 00000000000000000000000000000 高3位是 111 private static final int RUNNING = -1 << COUNT_BITS; // 000 00000000000000000000000000000 高3位是 000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 00000000000000000000000000000 高3位是 001 private static final int STOP = 1 << COUNT_BITS; // 010 00000000000000000000000000000 高3位是 110 private static final int TIDYING = 2 << COUNT_BITS; // 011 00000000000000000000000000000 高3位是 011 private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl // 獲取高3位的執行狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取低29位的執行緒個數 private static int workerCountOf(int c) { return c & CAPACITY; } // 通過RunState和WorkCount計算ctl的值 private static int ctlOf(int rs, int wc) { return rs | wc; } // 執行緒池狀態變換是單調遞增的 private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } // 只有RUNNING 是小於SHUTDOWN的 private static boolean isRunning(int c) { return c < SHUTDOWN; } // ... // 阻塞佇列 private final BlockingQueue workQueue; // 獨佔鎖 同步保證 private final ReentrantLock mainLock = new ReentrantLock(); // 存放 執行緒池中的工作執行緒 private final HashSet workers = new HashSet(); // 條件佇列,執行緒呼叫awaitTermination時存放阻塞的執行緒 private final Condition termination = mainLock.newCondition(); // ... // 繼承AQS和Runnable,任務執行緒 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /*.. */} } ``` - ThreadPoolExecutor通過AtomicInteger類的變數ctl記錄**執行緒池狀態**和**執行緒池中執行緒個數**,這裡以Integer為32為例。 - 高3位表示執行緒池的狀態,低29位表示執行緒個數,分別通過`runStateOf`和`workerCountOf`計算。 ## 執行緒池的五種狀態及轉換 - **執行緒池的狀態**有五種,他們提供了執行緒池宣告週期的控制: - RUNNING:能夠接收新任務,並且處理阻塞佇列裡的任務。 - SHUTDOWN:拒絕新任務,但會處理阻塞佇列裡的任務。 - STOP:拒絕新任務,並且拋棄阻塞佇列裡的任務,同時會中斷正在處理的任務。 - TIDYING:所有任務都執行完後當前執行緒池workerCount為0,將呼叫terminated()這個鉤子方法。 - TERMINATED:終止狀態。terminated方法呼叫完成。 - 執行緒池的狀態是有規律的,保證單調遞增,但是不一定每個狀態都會經歷,比如有以下幾種轉換: - RUNNING -> SHUTDOWN:可能是顯式呼叫了`shutdown()`方法,也可能在`finalize()`裡隱式呼叫。 - RUNNING或SHUTDOWN -> STOP:呼叫了`shutdownNow()`方法。 - SHUTDOWN -> TIDYING:佇列和執行緒池都為空的時候。 - STOP -> TIDYING:執行緒池為空的時候。 - TIDYING -> TERMINATED:鉤子方法`terminated()`呼叫完成的時候。 > 由於awaitTermination()方法而阻塞在條件佇列中的執行緒將會線上程池TERMINATED的時候返回。 ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210210220428421-1442941314.png) ## ThreadPoolExecutor構造引數及引數意義 ThreadPoolExecutor方法的構造引數有很多,我們看看最長的那個就可以了: ```java public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ``` - `corePoolSize`:核心執行緒數定義了**最小可以同時執行的執行緒數量**。 - `maximumPoolSize`:**當佇列中存放的任務達到佇列容量的時候**,當前可以同時執行的執行緒數量變為**最大執行緒數**。【如果使用的無界佇列,這個引數就沒啥效果】 - `workQueue`: 阻塞佇列,當新任務來的時候會先判斷當前執行的執行緒數量是否達到核心執行緒數,**如果達到核心執行緒數的話,新任務就會被存放在佇列中**。 - `keepAliveTime`:當執行緒池中的執行緒數量大於 `corePoolSize` 的時候,如果這時沒有新的任務提交,核心執行緒外的執行緒不會立即銷燬,而是會等待,直到等待的時間超過了 `keepAliveTime`才會被回收銷燬。 - `unit`:`keepAliveTime` 的時間單位。 - `threadFactory`:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字,預設使用Executors的靜態內部類`DefaultThreadFactory`。 - `handler`:飽和策略,當前同時執行的執行緒數量達到最大執行緒數量【`maximumPoolSize`】並且佇列也已經被放滿時,執行飽和策略。 關於各個引數的意義,強烈推薦這篇部落格:[閃客sun : 圖解 | 原來這就是執行緒池](https://www.cnblogs.com/flashsun/p/14368520.html) ## Work類 ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** 具體執行任務的執行緒 */ final Thread thread; /** 執行的第一個任務 */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 執行緒啟動時,執行runWorker方法 */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } // 不可重入的,state = 1表示已獲取 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // state = 0 表示鎖未被獲取 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } // 如果執行緒啟動,則中斷執行緒 state只有初始化的時候才是-1,其他的時間都是滿足>
=0的 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } ``` Worker繼承了AQS和Runnable介面,是具體承載任務的物件。 基於AQS,Worker實現了不可重入的獨佔鎖,state == 0 表示鎖未被獲取,state == 1表示鎖已經被獲取, state == -1為初始狀態。 firstTask記錄該工作執行緒執行的第一個任務,thread是執行任務的執行緒。 `interruptIfStarted()`方法會在shutdownNow中呼叫,意在中斷Worker執行緒,state初始化為-1,是不滿足getState條件的。 ## void execute(Runnable command) execute方法就是向執行緒池提交一個command任務進行執行。 ```java public void execute(Runnable command) { // 提交任務為null, 丟擲空指標異常 if (command == null) throw new NullPointerException(); // 獲取當前ctl的值 : 執行緒池狀態 + 執行緒個數 int c = ctl.get(); // 如果當前執行緒池中執行緒個數小於核心執行緒數corePoolSize if (workerCountOf(c) < corePoolSize) { // 通過addWorker新建一個執行緒,然後,啟動該執行緒從而執行任務 if (addWorker(command, true)) return; c = ctl.get(); } // 如果執行緒池處於RUNNING狀態,則新增任務到阻塞佇列 if (isRunning(c) && workQueue.offer(command)) { // double-check int recheck = ctl.get(); // 如果執行緒池不是處於RUNNING, 則從佇列中移除任務 if (! isRunning(recheck) && remove(command)) // 並執行拒絕策略 reject(command); // 如果當前執行緒個數為0, 則新增一個執行緒 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果佇列滿,則新增執行緒,新增失敗則執行拒絕策略 else if (!addWorker(command, false)) reject(command); } ``` 1. 如果執行緒池當前執行緒數小於corePoolSize,則呼叫addWorker建立新執行緒執行任務,成功則直接返回。 2. 如果執行緒池處於RUNNING狀態,則新增任務到阻塞佇列,如果新增成功,進行double-check,檢測出當前不是RUNNING,則進行移除操作,並執行拒絕策略。否則新增一個執行緒,確保有執行緒可以執行。 3. 如果執行緒池不是處於RUNNING或加入阻塞佇列失敗,並採取拒絕策略。 ## boolean addWorker(firstTask, core) ```java private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 檢測佇列是否只在必要時為空 // 等價為:下面幾種情況返回false /* if (rs >
= SHUTDOWN && rs 為STOP TIDYING TERMINATED時返回false (rs != SHUTDOWN || rs不為SHUTDOWN firstTask != null || rs為SHUTDOWN 但 已經有了第一個任務 workQueue.isEmpty())) rs為SHUTDOWN 並且任務佇列為空 */ if (rs >= SHUTDOWN && // ! (rs == SHUTDOWN && // firstTask == null && ! workQueue.isEmpty())) return false; // 迴圈, 通過CAS操作來增加執行緒個數 for (;;) { int wc = workerCountOf(c); // 執行緒個數如果超過限制,返回false if (wc >
= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS增加執行緒個數,操作成功跳出迴圈break if (compareAndIncrementWorkerCount(c)) break retry; // CAS失敗,檢測執行緒狀態是否發生了變化,如果發生變化,則跳到retry外層迴圈重新嘗試 // 否則在內層迴圈重新CAS c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 走到這代表CAS操作已經成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 建立worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 獨佔鎖保證同步 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 重新檢查執行緒池狀態,以避免在獲取鎖前呼叫了shutdown介面 int rs = runStateOf(ctl.get()); // 1. 執行緒池處於RUNNING // 2. 執行緒池處於SHUTDOWN 並且firstTask為null if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果t已經啟動 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 新增任務 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 新增任務成功後, 執行任務 if (workerAdded) { t.start(); // 執行 workerStarted = true; } } } finally { // 任務未執行成功 if (! workerStarted) addWorkerFailed(w); } return workerStarted; } ``` 主要分為兩步: 1. 雙重迴圈通過CAS操作增加執行緒數。 2. 使用全域性的獨佔鎖來控制:將併發安全的任務新增到works裡,並啟動。 ## final void runWorker(Worker w) 使用者執行緒提交任務到執行緒池後,由Worker執行,通過while迴圈不斷地從工作佇列裡獲取任務執行。 ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // Worker啟動執行runWorker public void run() { runWorker(this); } } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // state設定為0, 允許中斷 boolean completedAbruptly = true; try { // 如果task不為null 或者 task為null 但是 getTask從任務佇列獲取的任務不為null while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 如果執行緒池當前STOP,則確保執行緒是中斷狀態 // 如果不是STOP,確保執行緒沒有被中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 任務執行之前的hook方法 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); // 執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任務執行之後的hook方法 afterExecute(task, thrown); } } finally { task = null; // 統計當前的Worker完成的任務數量 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 清理工作 processWorkerExit(w, completedAbruptly); } } ``` ### Runnable getTask() ```java private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? // 迴圈 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 1. 執行緒池狀態 >= SHUTDOWN && 工作佇列為空 // 2. 執行緒池狀態 >= STOP // 兩種情況,都直接數量 -1 , 返回null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 工作執行緒的數量 int wc = workerCountOf(c); // 需否需要超時控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //1. 工作執行緒的數量超過了maximumPoolSize 或者 需要超時控制,且poll出為null,就是沒拿到 //2. 工作執行緒數量 > 1 或者 工作佇列為空 // 兩者都滿足, 則數量 -1 , 返回null if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 從工作佇列裡取出任務 Runnable r = timed ? // keepAliveTime時間內還沒有獲取到任務, 繼續迴圈 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ``` ### void processWorkerExit(w, completedAbruptly) ```java private void processWorkerExit(Worker w, boolean completedAbruptly) { // completedAbruptly 為true表示使用者執行緒執行異常,需要wc - 1 // 否則是不需要處理的,在getTask中已經處理過了 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 統計執行緒池完成的任務個數, 從workers中移除當前worker try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //如果當前執行緒池狀態為SHUTDOWN且工作佇列為空, //或者STOP狀態但執行緒池裡沒有活動執行緒,則設定執行緒池狀態為TERMINATED。 tryTerminate(); int c = ctl.get(); // 如果執行緒池為 RUNNING 或SHUTDOWN 表示,tryTerminate()沒有成功 // 判斷是否需要新增一個執行緒,如果workerCountOf(c) < min 新增一個執行緒 if (runStateLessThan(c, STOP)) { // 表示正常退出 if (!completedAbruptly) { // min 預設是corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // 如果workerCountOf(c) < min 新增一個執行緒 addWorker(null, false); } } ``` ## void shutdown() > SHUTDOWN : 拒絕新任務但是處理阻塞佇列裡的任務。 呼叫該方法之後,執行緒池不再接收新任務,但是工作佇列裡的任務還需要處理。 ```java public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 檢查許可權,判斷當前呼叫shutdown的執行緒是否擁有關閉執行緒的許可權 checkShutdownAccess(); // 設定執行緒池狀態為SHUTDOWN advanceRunState(SHUTDOWN); // 設定中斷標誌 interruptIdleWorkers(); // 鉤子方法 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 嘗試設定執行緒池狀態為TERMINATED tryTerminate(); } ``` ### void advanceRunState(int targetState) ```java // 設定執行緒池狀態為SHUTDOWN private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); // 當前的狀態已經是SHUTDOWN了就直接break返回,如果不是就CAS設定一下 if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } ``` ### void interruptIdleWorkers() ```java private void interruptIdleWorkers() { interruptIdleWorkers(false); } // onlyOne如果不傳,預設為false private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍歷所有的Worker for (Worker w : workers) { Thread t = w.thread; // 如果工作執行緒沒有被中斷 且 獲取Worker的鎖成功,則設定中斷標誌 // 這裡:獲取鎖成功代表,設定的是沒有在執行任務的執行緒,因為 // 正在執行任務的執行緒是已經獲取了鎖的,你tryLock不會成功的 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } // 只用設定一個 if (onlyOne) break; } } finally { mainLock.unlock(); } } ``` ### final void tryTerminate() 如果當前執行緒池狀態為SHUTDOWN且工作佇列為空,或者STOP狀態但執行緒池裡沒有活動執行緒,則設定執行緒池狀態為TERMINATED。 ```java final void tryTerminate() { // 迴圈 for (;;) { int c = ctl.get(); // 如果RUNNING TIDYING TERMINATED // 如果SHUTDOWN 且任務佇列不為空,還需要處理queue裡的任務 // 就不需要下面的操作了, 直接返回好了 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // STOP 但 執行緒池裡還有活動執行緒 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // CAS設定rs為TIDYING,且wc為0 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 鉤子方法 terminated(); } finally { // terminated() 完成之後, 就設定為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 啟用所有因為await等待的執行緒 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } ``` ## List(Runnable) shutdownNow() > STOP:拒絕新任務並且拋棄任務佇列裡的任務,同時會中斷正在處理的任務。 呼叫該方法後,將執行緒池狀態設定為STOP,拒絕新任務並且拋棄任務佇列裡的任務,同時會中斷正在處理的任務,返回佇列裡被丟棄的任務列表。 ```java public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 檢查許可權 checkShutdownAccess(); // 設定為STOP advanceRunState(STOP); // 設定中斷標誌 interruptWorkers(); // 將佇列任務移到tasks中 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } ``` ### void interruptWorkers() ```java private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) // 如果執行緒啟動,則中斷執行緒【正在執行 + 空閒的所有執行緒都會被中斷】 w.interruptIfStarted(); } finally { mainLock.unlock(); } } ``` ## boolean awaitTermination(timeout, unit) 當該方法被呼叫時,當前執行緒會被阻塞,直到超時時間到了,返回false。或者執行緒池狀態為TERMINATED時,返回true。 ```java public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { // 執行緒池狀態為TERMINATED 返回true if (runStateAtLeast(ctl.get(), TERMINATED)) return true; // 超時了, 返回false if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } ``` ## 參考閱讀 - 《Java併發程式設計之美》 - 《Java併發程式設計的藝術》 - [【Java併發程式設計】執行緒池相關知識點整理](https://tqbx.gitee.io/javablog/#/%E9%9D%A2%E8%AF%95%E7%9B%B8%E5%85%B3/%E5%B9%B6%E5%8F%91/%E7%BA%BF%E7%A8%8B%E6%B1%A0%E7%9A%84%E7%9F%A5%E8%AF%86%E7%82%B9) - [閃客sun : 圖解 | 原來這就是執行緒池](https://www.cnblogs.com/flashsun/p/143685