多執行緒學習筆記八之執行緒池ThreadPoolExecutor實現分析
目錄
簡介
在Web開發中,如果要密集處理多個任務時,相對於每次都一個建立執行緒去執行任務,新建執行緒來執行任務相對來說是個更好的選擇,體現在以下三點:
- 降低資源消耗。 通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
- 提高響應速度。 當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
- 提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。
下面從最常用的執行緒池ThreadPoolExecutor的原始碼分析如何實現執行緒池。
繼承結構
Executor是最基礎的執行介面,只提供了一個execute(Runnable command)提交任務方法;ExecutorService介面繼承了Executor,在其上做了一些shutdown()、submit()的擴充套件,可以說是真正的執行緒池介面AbstractExecutorService抽象類實現了ExecutorService介面中的大部分方法;TheadPoolExecutor繼承了AbstractExecutorService,是執行緒池的具體實現。
實現分析
ThreadPoolExecutor類屬性
public class ThreadPoolExecutor extends AbstractExecutorService { // 執行緒池的控制狀態(用來表示執行緒池的執行狀態(整形的高3位)和執行的worker數量(低29位)) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 偏移量 private static final int COUNT_BITS = Integer.SIZE - 3; // 最大工作執行緒數量(2^29 - 1) private static final int CAPACITY= (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 執行緒執行狀態,總共有5個狀態,需要3位來表示(所以偏移量的29 = 32 - 3) private static final int RUNNING= -1 << COUNT_BITS; private static final int SHUTDOWN=0 << COUNT_BITS; private static final int STOP=1 << COUNT_BITS; private static final int TIDYING=2 << COUNT_BITS; private static final int TERMINATED =3 << COUNT_BITS; // 阻塞佇列,存放提交給執行緒池的任務 private final BlockingQueue<Runnable> workQueue; // 可重入鎖 private final ReentrantLock mainLock = new ReentrantLock(); // 存放工作執行緒集合 private final HashSet<Worker> workers = new HashSet<Worker>(); // 終止條件 private final Condition termination = mainLock.newCondition(); // 最大執行緒池容量 private int largestPoolSize; // 已完成任務數量 private long completedTaskCount; // 執行緒工廠 private volatile ThreadFactory threadFactory; // 拒絕執行處理器 private volatile RejectedExecutionHandler handler; // 執行緒等待執行時間 private volatile long keepAliveTime; // 是否執行核心執行緒超時 private volatile boolean allowCoreThreadTimeOut; // 核心池的大小 private volatile int corePoolSize; // 最大執行緒池大小 private volatile int maximumPoolSize; // 預設拒絕執行處理器 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); }
執行緒池狀態
執行緒池本身有兩個很重要的狀態資訊:執行緒池的執行狀態和工作執行緒數,這兩個狀態資訊都包含在變數ctl(int型,32位)中:ctl的高3位表示執行緒狀態runState,低29位表示工作執行緒worker的數量workCount。執行緒狀態資訊如下:
- RUNNING:-1<<COUNT_BITS,即高3位為1,低29位為0,該狀態的執行緒池會接收新任務,會處理在阻塞佇列中等待處理的任務
- SHUTDOWN:0<<COUNT_BITS,即高3位為000,低29位為0,該狀態的執行緒池不會再接收新任務,但還會處理已經提交到阻塞佇列中等待處理的任務
- STOP:1<<COUNT_BITS,即高3位為001,低29位為0,該狀態的執行緒池不會再接收新任務,不會處理在阻塞佇列中等待的任務,而且還會中斷正在執行的任務
- TIDYING:2<<COUNT_BITS,即高3位為010,低29位為0,所有任務都被終止了,workerCount為0,為此狀態時還將呼叫terminated()方法
- TERMINATED:3<<COUNT_BITS,即高3位為011,低29位為0,terminated()方法呼叫完成後變成此狀態
構造方法
核心引數含義如下:
- corePoolSize:核心執行緒數量
- maximumPoolSize:最大執行緒數量,可能大於corePoolSize,也可能等於
- workQueue: 必須是BlockingQueue阻塞佇列。當執行緒池中的執行緒數超過它的corePoolSize的時候,執行緒會進入阻塞佇列進行阻塞等待。通過workQueue,執行緒池實現了阻塞功能
- keepAliveTime:執行緒池維護執行緒所允許的空閒時間。當執行緒池中的執行緒數量大於corePoolSize的時候,如果這時沒有新的任務提交,核心執行緒外的執行緒不會立即銷燬,而是會等待,直到等待的時間超過了keepAliveTime。
- threadFactory:它是ThreadFactory型別的變數,用來建立新執行緒。預設使用Executors.defaultThreadFactory() 來建立執行緒。使用預設的ThreadFactory來建立執行緒時,會使新建立的執行緒具有相同的NORM_PRIORITY優先順序並且是非守護執行緒,同時也設定了執行緒的名稱。
- handler:它是RejectedExecutionHandler型別的變數,表示執行緒池的飽和策略。如果阻塞佇列滿了並且沒有空閒的執行緒,這時如果繼續提交任務,就需要採取一種策略處理該任務。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
execute(Runnable command)
execute方法是向執行緒池提交任務的,此時執行緒池的狀態為RUNNING(其他狀態不接收新提交的任務),主要判斷:
- 如果執行的執行緒少於 corePoolSize,則建立新的工作執行緒來處理任務,即使執行緒池中的其他執行緒是空閒的;
- 如果執行緒池中的執行緒數量大於等於 corePoolSize,且阻塞佇列未滿,將任務加入阻塞佇列workQueue;
- 如果執行緒池中的執行緒數量大於等於 corePoolSize 且小於 maximumPoolSize,則只有當workQueue滿時才建立新的執行緒去處理任務;
- 如果執行的執行緒數量大於等於maximumPoolSize,這時如果workQueue已經滿了,則通過handler所指定的策略來拒絕任務提交;
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //ctl記錄執行緒池狀態資訊和執行緒池執行緒數 int c = ctl.get(); //比較當前執行緒數是否小於corePoolSize,如果小於則新建一個執行緒放入執行緒池中 if (workerCountOf(c) < corePoolSize) { //成功加入則返回 if (addWorker(command, true)) return; //加入失敗,重新獲取ctl c = ctl.get(); } //如果當前執行緒數大於等於corePoolSize,判斷執行緒池是否仍在執行,是的話加入阻塞佇列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次檢查執行緒池是否仍在執行 if (! isRunning(recheck) && remove(command)) reject(command); /** 執行緒池在執行但是工作執行緒數為0,此時可能阻塞佇列有任務但執行緒池沒有工作執行緒池, * 如果配置了引數allowCoreThreadTimeOut(預設是false)為true可能因為核心執行緒執行 * 完任務且阻塞佇列也沒有執行緒等待獲取任務,此時屬於空閒執行緒,由於超時會回收核心執行緒 **/ else if (workerCountOf(recheck) == 0) /** 傳false將會在addWorker方法中判斷執行緒池的工作執行緒數量和最大執行緒數量做比較 * 傳一個空的任務,開啟一個工作執行緒,但這個工作執行緒會發現當前的任務是空,然後會去佇列中取任務 * 這樣就避免了執行緒池的狀態是running,而且佇列中還有任務,但執行緒池卻不執行佇列中的任務 **/ addWorker(null, false); } /** * 如果執行到這裡,有兩種情況: * 1. 執行緒池已經不是RUNNING狀態; * 2. 執行緒池是RUNNING狀態,但workerCount >= corePoolSize並且workQueue已滿。 * 這時,再次呼叫addWorker方法,但第二個引數傳入為false,將執行緒池的有限執行緒數量的上限設定為 * maximumPoolSize;如果失敗則拒絕該任務 **/ else if (!addWorker(command, false)) reject(command); }
addWorker(Runnable firstTask, boolean core)
addWorker方法用與建立工作執行緒,firstTask表示第一個任務,core為true那麼執行緒數受corePoolSize制約,為false則受maximumPoolSize制約。執行流程:
- 檢查執行緒池狀態決定是否新建工作執行緒
- 新建Worker物件並加入到集合中
- 啟動工作執行緒
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //執行狀態 int rs = runStateOf(c); /** * 如果rs >= SHUTDOWN,則表示此時不再接收新任務 * 滿足rs >= SHUTDOWN條件後接著判斷以下3個條件,只要有1個不滿足,則返回false: * 1. rs == SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞佇列中已保 * 存的任務 2. firsTask為空 3. 阻塞佇列不為空 **/ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //當前工作執行緒數 int wc = workerCountOf(c); //檢查執行緒數量是否超出 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 嘗試CAS增加workerCount,如果成功,則跳出第一個for迴圈 if (compareAndIncrementWorkerCount(c)) break retry; //CAS失敗,重新獲取ctl的值 c = ctl.get();// Re-read ctl // 如果當前的執行狀態不等於rs,說明狀態已被改變,返回第一個for迴圈繼續執行 if (runStateOf(c) != rs) continue retry; } } //CAS增加workCount成功,退出迴圈進入到這裡 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根據firstTask來建立Worker物件 w = new Worker(firstTask); // 每一個Worker物件都會建立一個執行緒 final Thread t = w.thread; if (t != null) { //上鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING狀態; // 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態並且firstTask為null,向執行緒池中新增執行緒。 // 因為在SHUTDOWN時不會在新增新的任務,但還是會執行workQueue中的任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //將工作執行緒work加入到HashSet物件workers workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //啟動執行緒 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
內部類Worker
執行緒池的工作執行緒是通過包裝成Worker物件,Worker類本身既實現了Runnable介面,又繼承了同步器AQS,實現了一個簡易的不可重入的互斥鎖,通過同步狀態state控制中斷:
- 初始AQS狀態為-1,此時不允許中斷interrupt(),只有在worker執行緒啟動了,執行了runWoker(),將state置為0,才能中斷,不允許中斷體現在:
- shutdown()執行緒池時,會對每個worker tryLock()上鎖,而Worker類這個AQS的tryAcquire()方法是固定將state從0->1,故初始狀態state==-1時tryLock()失敗,無法interrupt()
- shutdownNow()執行緒池時,不用tryLock()上鎖,但呼叫worker.interruptIfStarted()終止worker,interruptIfStarted()也有state>0才能interrupt的邏輯
- 為了防止某種情況下,在執行中的worker被中斷,runWorker()每次執行任務時都會lock()上鎖,而shutdown()這類可能會終止worker的操作需要先獲取worker的鎖,這樣就防止了中斷正在執行的執行緒
private final class Worker extends AbstractQueuedSynchronizerimplements Runnable{ private static final long serialVersionUID = 6138294804551838833L; //工作執行緒 final Thread thread; //新建Worker傳入的任務command,可能為null Runnable firstTask; //執行完的任務數量 volatile long completedTasks; //同步狀態state為0代表為鎖定,state為1代表鎖定,state為-1代表初始狀態 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //建立執行緒 this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock(){ acquire(1); } public boolean tryLock(){ return tryAcquire(1); } public void unlock(){ release(1); } public boolean isLocked() { return isHeldExclusively(); } }
runWorker(Worker w)
runWork是工作執行緒執行任務的方法,執行過程如下:
- 通過while迴圈步斷獲取任務
- 檢查執行緒池執行狀態,如果處於STOP及以上,中斷執行緒;如果是RUNNING或SHUTDOWN,不中斷工作執行緒
- task.run()執行任務
- 當取得的任務task為null退出迴圈,執行processWorkerExit方法,此時Work的工作執行緒run()方法執行完畢,執行緒銷燬
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; //同步狀態state設定為0,允許中斷 w.unlock(); // allow interrupts //用於標識是否工作執行緒由於異常突然終止,在執行任務丟擲異常或執行緒被中斷兩種情況為true boolean completedAbruptly = true; try { //迴圈取任務執行 while (task != null || (task = getTask()) != null) { //上鎖,表示正在工作執行緒正在執行任務,不能響應中斷 w.lock(); /** * 確保線上程池狀態在STOP及以上時,才會被設定中斷標示,否則清除中斷標示,判斷以下兩個條件: * 1、如果執行緒池狀態>=stop,且當前執行緒沒有設定中斷狀態,wt.interrupt() * 2、如果一開始判斷執行緒池狀態<stop,但Thread.interrupted()為true,即執行緒已經被中斷,又 * 清除了中斷標示,再次判斷執行緒池狀態是否>=stop(可能呼叫了shutdownNow關閉執行緒池) **/ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
getTask()
當工作執行緒數達到corePoolSize,後續提交的任務就會放到阻塞佇列workQueue中,工作執行緒通過getTask方法從阻塞佇列取出任務,執行以下步驟:
- 檢查執行緒池狀態及阻塞佇列是否為空
- 控制核心執行緒數(使工作執行緒數不超過corePoolSize)
- 從阻塞佇列取任務
private Runnable getTask() { // timeOut變數的值表示上次從阻塞佇列中取任務時是否超時 boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); /** * 1.rs>SHUTDOWN 所以rs至少等於STOP,這時不再處理佇列中的任務,不管workQueue是否為空都返回null * 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,這時還需要處理佇列中的任務除非workQueue為空 * 如果以上條件滿足,則將workerCount減1並返回null。因為如果當前執行緒池狀態的值是SHUTDOWN * 或以上時,不允許再向阻塞佇列中新增任務。 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); /** * timed表示工作執行緒是否需要剔除,為true * allowCoreThreadTimeOut預設為false,表示核心執行緒不做超時控制 * wc > corePoolSize 超過核心執行緒數 * timed為true下面的if條件通過返回null,從而剔除掉超過corePoolSize數目的執行緒,使執行緒數 * 回覆corePoolSize **/ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * 條件1: * wc > maximumPoolSize 檢查是否超出maximumPoolSize,執行緒池可能重置了maximumPoolSize * timed && timedOut 當前執行緒需要超時控制且上次取任務超時為true * 條件2:如果執行緒數量大於1,或者阻塞佇列是空的 * 兩個條件都為true把workCount減一,返回null **/ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; //CAS失敗重新迴圈 continue; } try { /** * 根據timed來判斷,如果為true,則通過阻塞佇列的poll方法進行超時控制,如果在 * keepAliveTime時間內沒有獲取到任務,則返回null; * 否則通過take方法,如果這時佇列為空,則take方法會阻塞直到佇列不為空。 **/ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //如果r==null,說明是超時了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
processWorkerExit(Worker w, boolean completedAbruptly)
當getTask返回null,會跳出runWork的while迴圈,此時工作執行緒的run方法執行完畢,執行緒會終止,同時會執行processWorkerExit方法,步驟如下:
- 根據completedAbruptly引數調整執行緒池的工作執行緒數
- 統計完成的任務數並從集合中移出Worker物件
- 根據執行緒池狀態進行判斷是否結束執行緒池
private void processWorkerExit(Worker w, boolean completedAbruptly) { //如果是突然終止,重新調整workCount if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統計完成的任務數 completedTaskCount += w.completedTasks; //從集合中移出Worker物件 workers.remove(w); } finally { mainLock.unlock(); } // 根據執行緒池狀態進行判斷是否結束執行緒池 tryTerminate(); int c = ctl.get(); //執行緒狀態小於STOP,即執行緒池處於RUNNING或SHUTDOWN狀態 if (runStateLessThan(c, STOP)) { //檢查是否異常終止 if (!completedAbruptly) { //如果allowCoreThreadTimeOut=true,並且等待佇列有任務,至少保留一個worker; //如果allowCoreThreadTimeOut=false,workerCount不少於corePoolSize。 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //突然終止,新增一個Worker addWorker(null, false); } }
shutdown()
關閉執行緒池,執行緒池狀態由RUNNING變為SHUTDOWN,只處理已有任務不再接收新提交的任務,中斷空閒執行緒。
為什麼要中斷空閒執行緒:當執行緒池狀態為RUNNING但是阻塞佇列為空,allowCoreThreadTimeOut為預設值false(既不支援核心執行緒超時回收),那麼工作執行緒必然堵塞在workQueue.take()方法上,而呼叫了shutdown()方法後執行緒池狀態變為SHUTDOWN不接收新提交的任務,那麼阻塞佇列永遠為空,所以需要通過中斷讓執行緒由阻塞狀態返回null。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //檢查是否有關閉執行緒池許可權 checkShutdownAccess(); //把執行緒池執行狀態切換為SHUTDOWN advanceRunState(SHUTDOWN); //中斷空閒執行緒 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
interruptIdleWorkers()
中斷空閒執行緒。
private void interruptIdleWorkers() { //false表明中斷所有空閒執行緒 interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // t.isInterrupted()檢查執行緒是否已經中斷過 // w.tryLock() runWork在執行任務會上鎖,執行完解鎖去阻塞佇列獲得任務,如果tryLock成功 //說明沒有執行任務,是空閒執行緒。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
tryTerminate()
根據執行緒池狀態嘗試關閉執行緒池。這裡解釋一下interruptIdleWorkers(ONLY_ONE):
當到達workerCountOf(c) != 0這個判斷時,說明執行緒池處於SHUTDOWN狀態,且阻塞佇列已經為空,這是若判斷成立,那麼還有工作執行緒等待線上程池上,會中斷一個空閒執行緒,這個被中斷的空閒執行緒的Worker返回null又會呼叫tryTerminate,從而把執行緒池關閉的訊息傳給每個執行緒,回收空閒執行緒。
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 當前執行緒池的狀態為以下幾種情況時,直接返回: * 1. RUNNING,因為還在執行中,不能停止; * 2. TIDYING或TERMINATED,因為執行緒池中已經沒有正在執行的執行緒了; * 3. SHUTDOWN並且等待佇列非空,這時要執行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //工作執行緒數不為0 if (workerCountOf(c) != 0) { // Eligible to terminate //中斷一個空閒執行緒(等待在阻塞佇列上獲取任務的執行緒) //中斷的執行緒在回收Worker時還會呼叫tryTerminate方法,從而回收空閒執行緒 interruptIdleWorkers(ONLY_ONE); return; } //到這裡說明工作執行緒數workCount為0,執行緒池狀態置為TIDYING final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
shutdownNow()
關閉執行緒池,執行狀態修改為 STOP, 中斷所有執行緒; 並返回未處理的任務
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //更改執行緒池狀態 advanceRunState(STOP); // 中斷所有工作執行緒,無論是否空閒 interruptWorkers(); //取出阻塞佇列中沒有被執行的任務 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
interruptWorkers()
不論執行緒是否空閒,中斷所有執行緒。
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; /** * getState() >= 0 同步狀態state=-1執行緒還沒啟動,大於等於0說明執行緒以及啟動,處於 * 執行任務或空閒狀態。 * (t = thread) != null 執行緒不為null * !t.isInterrupted() 檢查執行緒是否被中斷過。 **/ if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
總結
本文分析了執行緒池ThreadPoolExecutor的實現,主要從向執行緒池提交任務和關閉執行緒池這兩個方法分析的,瞭解了執行緒池複用執行緒資源減少執行緒建立和切換的開銷背後的祕密。