java 執行緒池(1)
ThreadPoolExecutor概述
ThreadPoolExecutor 下文簡稱 TPE ,我們使用它都是從Executror 這個類中的方法 :
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4new LinkedBlockingQueue<Runnable>()); 5 } 6 7 8 public static ExecutorService newSingleThreadExecutor() { 9 return new FinalizableDelegatedExecutorService 10 (new ThreadPoolExecutor(1, 1, 11 0L, TimeUnit.MILLISECONDS, 12new LinkedBlockingQueue<Runnable>())); 13 } 14 15 16 17 public static ExecutorService newCachedThreadPool() { 18 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 19 60L, TimeUnit.SECONDS, 20 newSynchronousQueue<Runnable>()); 21 } 22 23 24 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 25 return new ScheduledThreadPoolExecutor(corePoolSize); 26 } 27 28 //ScheduledExecutorService 29 public ScheduledThreadPoolExecutor(int corePoolSize) { 30 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 31 new DelayedWorkQueue()); 32 } 33 34 35 public class ScheduledThreadPoolExecutor 36 extends ThreadPoolExecutor 37 implements ScheduledExecutorService 38 39
Executror 的方法名很明顯地說明了建立的物件的用途,我們也可以看到它們實際的都是走到了TLE建構函式,只是傳入的引數不同。
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory) { 7 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 8 threadFactory, defaultHandler); 9 } 10 11 12 public ThreadPoolExecutor(int corePoolSize, 13 int maximumPoolSize, 14 long keepAliveTime, 15 TimeUnit unit, 16 BlockingQueue<Runnable> workQueue, 17 ThreadFactory threadFactory, 18 RejectedExecutionHandler handler) { 19 if (corePoolSize < 0 || 20 maximumPoolSize <= 0 || 21 maximumPoolSize < corePoolSize || 22 keepAliveTime < 0) 23 throw new IllegalArgumentException(); 24 if (workQueue == null || threadFactory == null || handler == null) 25 throw new NullPointerException(); 26 this.corePoolSize = corePoolSize; 27 this.maximumPoolSize = maximumPoolSize; 28 this.workQueue = workQueue; 29 this.keepAliveTime = unit.toNanos(keepAliveTime); 30 this.threadFactory = threadFactory; 31 this.handler = handler; 32 }
由此可以推斷通過配置TLE的各個引數,實現不同的功能。
ThreadPoolExecutor重要知識點
官方文件中有詳細介紹,細節請看官方文件
Core and maximum pool sizes
這兩個在構造方法中需要指定,核心執行緒數和最大執行緒池執行緒數,很好理解,就像兩條上限線,當來任務時沒達到核心執行緒數,那麼就開啟一條新執行緒去執行,要是達到核心數量了,怎麼辦,任務入列,要是超過了我設定的最大執行緒數量,那麼不再接受任務。這兩個值可以動態設定: setCorePoolSize(int)
和setMaximumPoolSize(int)
.
On-demand construction
預設情況下,甚至核心執行緒最初只在新任務到達時建立並啟動,但可以使用方法prestartCoreThread()或prestartAllCoreThreads()動態覆蓋。 如果使用非空佇列構造池,則可能需要預啟動執行緒。
Creating new threads
使用ThreadFactory建立新執行緒。 如果沒有另外指定,則使用Executors.defaultThreadFactory(),它將所有執行緒建立在同一個ThreadGroup中,並具有相同的NORM_PRIORITY優先順序和非守護程序狀態。 通過提供不同的ThreadFactory,您可以更改執行緒的名稱,執行緒組,優先順序,守護程式狀態等。如果ThreadFactory在通過從newThread返回null請求時無法建立執行緒,則執行程式將繼續,但可能無法 執行任何任務。 執行緒應該擁有“modifyThread”RuntimePermission。 如果使用池的工作執行緒或其他執行緒不具有此許可權,則服務可能會降級:配置更改可能不會及時生效,並且關閉池可能保持可以終止但未完成的狀態。(取至官方文件谷歌翻譯)
Keep-alive times
前面說到 Core and maximum pool sizes 就像兩個上限線,當超過了核心執行緒數後,任務開始執行完成,那麼執行緒就空閒了,此時要是空閒達到了 Keep-alive times 這個設定值,那麼執行緒就會被回收。使用Long.MAX_VALUE型別的TimeUnit.NANOSECONDS有效地禁止空閒執行緒在關閉之前終止。
the keep-alive policy applies only when there are more than corePoolSize threads. But method
allowCoreThreadTimeOut(boolean)
can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero
Queuing
佇列的使用和執行緒池的執行緒數有關。
- 如果現有執行緒少於 corePoolSize 執行緒數量,嘗試開啟一條新的執行緒執行(能執行任務就不排隊解決的原則)
- 達到或超過 corePoolSize ,任務入列
- 任務佇列滿了,開新執行緒直到 maximumPoolSize,當執行緒數達到 maximumPoolSize,拒絕請求。
佇列處理策略 :
- 直接交付
發
- 無界佇列
- 有界佇列
狀態標識
- (後29位)workCount : 指示當前有效的執行緒數量,但是並不能代表當前存活的活躍的執行緒數。
- (高3位)runStatus : 指示執行緒池本身的狀態
下面是狀態變數的定義
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 2 private static final int COUNT_BITS = Integer.SIZE - 3; 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 4 5 // runState is stored in the high-order bits 6 private static final int RUNNING = -1 << COUNT_BITS; 7 private static final int SHUTDOWN = 0 << COUNT_BITS; 8 private static final int STOP = 1 << COUNT_BITS; 9 private static final int TIDYING = 2 << COUNT_BITS; 10 private static final int TERMINATED = 3 << COUNT_BITS; 11 12 // Packing and unpacking ctl 13 private static int runStateOf(int c) { return c & ~CAPACITY; } 14 private static int workerCountOf(int c) { return c & CAPACITY; } 15 private static int ctlOf(int rs, int wc) { return rs | wc; } 16 17 /* 18 * Bit field accessors that don't require unpacking ctl. 19 * These depend on the bit layout and on workerCount being never negative. 20 */ 21 22 private static boolean runStateLessThan(int c, int s) { 23 return c < s; 24 } 25 26 private static boolean runStateAtLeast(int c, int s) { 27 return c >= s; 28 } 29 30 private static boolean isRunning(int c) { 31 return c < SHUTDOWN; 32 }
runStatus 提供這幾種狀態 :
- RUNNING : 執行
- SHUTDOWN : 停止接收新任務,執行已經入隊的任務。
- STOP : 不再接受新任務,不執行已經入列的任務,中斷正在執行的任務。
- TIDING : 停止接收新任務,所有任務被暫停,workCount 變為 0 ,執行緒狀態變為 TIDYING.將執行hook 方法 terminated.
- TERMINATED : terminated方法完成。
ThreadPoolExecutor原始碼分析
看 execute 方法。
1 //可以建立執行緒處理就處理,不行就入列,入列也失敗,拒絕! 2 public void execute(Runnable command) { 3 if (command == null) 4 throw new NullPointerException(); 5 /* 6 * Proceed in 3 steps: 7 * 8 * 1. If fewer than corePoolSize threads are running, try to 9 * start a new thread with the given command as its first 10 * task. The call to addWorker atomically checks runState and 11 * workerCount, and so prevents false alarms that would add 12 * threads when it shouldn't, by returning false. 13 * 14 * 2. If a task can be successfully queued, then we still need 15 * to double-check whether we should have added a thread 16 * (because existing ones died since last checking) or that 17 * the pool shut down since entry into this method. So we 18 * recheck state and if necessary roll back the enqueuing if 19 * stopped, or start a new thread if there are none. 20 * 21 * 3. If we cannot queue task, then we try to add a new 22 * thread. If it fails, we know we are shut down or saturated 23 * and so reject the task. 24 */ 25 int c = ctl.get(); 26 //未達到 corePoolSize 增加新執行緒執行 27 if (workerCountOf(c) < corePoolSize) { 28 if (addWorker(command, true)) 29 return; 30 c = ctl.get(); 31 } 32 //增加執行緒失敗,或者有可能 worker的數量大於等於 core ,或是大於 maxSize ,任務入列 33 if (isRunning(c) && workQueue.offer(command)) { 34 //注意 :此時任務已成功入列!!! 35 int recheck = ctl.get(); 36 //再次檢查,要是 此時是 SHUTDOWN 狀態(執行緒池關閉),那麼移除這個任務,同時拒絕這個請求 37 if (! isRunning(recheck) && remove(command)) 38 reject(command); 39 //非running 狀態 ,同時 workerCounter 為 0 (可能執行緒池裡的任務都執行完了),那麼新建一個執行緒,而不去處理,為什麼要這樣呢? 40 //因為任務此時在佇列中了,建立執行緒後,自動會去獲取任務並處理 41 //要是都不是就退出這個方法,此時任務在佇列中等待被處理 42 else if (workerCountOf(recheck) == 0) 43 addWorker(null, false); 44 } 45 //執行緒池 shut down 或是佇列滿了,再次新建一個執行緒執行,但是這次的執行緒數的判斷邊界是 maxSize ,即是 46 // addWorker的第二個引數來指定 47 else if (!addWorker(command, false)) 48 reject(command); 49 }
假設目前還未達到core 的數量,那麼進入 addWorker方法 。
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runStateOf(c); 6 7 // Check if queue empty only if necessary. 8 // 執行緒池滿足如下條件中的任意一種時, 就會直接結束該方法, 並且返回 false 9 // 表示沒有建立新執行緒, 新提交的任務也沒有被執行. 10 // 1 .處於 STOP, TYDING 或 TERMINATD 狀態 11 // 2 .處於 SHUTDOWN 狀態, 並且引數 firstTask != null 12 // 3 .處於 SHUTDOWN 狀態, firstTask == null 且阻塞佇列 workQueue為空 13 if (rs >= SHUTDOWN && 14 ! (rs == SHUTDOWN && 15 firstTask == null && 16 ! workQueue.isEmpty())) 17 return false; 18 for (;;) { 19 int wc = workerCountOf(c); 20 //此處可以看到 第二個引數,core 是用來 選定執行緒數邊界的 21 if (wc >= CAPACITY || 22 wc >= (core ? corePoolSize : maximumPoolSize)) 23 return false; 24 //自旋增加 c , ctl的值 ,成功就 break 退出 25 if (compareAndIncrementWorkerCount(c)) 26 break retry; 27 c = ctl.get(); // Re-read ctl 28 if (runStateOf(c) != rs) 29 //說明有人搶了, 30 continue retry; 31 // else CAS failed due to workerCount change; retry inner loop 32 // 或者 CAS 失敗是因為 workerCount 改變,繼續loop 33 } 34 } 35 36 //下面是增加一個執行緒的操作,建立 worker ,上鎖,再次判斷,建立成功後執行緒開始執行。 37 boolean workerStarted = false; 38 boolean workerAdded = false; 39 Worker w = null; 40 try { 41 w = new Worker(firstTask); 42 final Thread t = w.thread; 43 if (t != null) { 44 final ReentrantLock mainLock = this.mainLock; 45 mainLock.lock(); 46 try { 47 // Recheck while holding lock. 48 // Back out on ThreadFactory failure or if 49 // shut down before lock acquired. 50 // recheck 當獲得鎖的時候 ,退出因為 ThreadFactory 失敗或是 在獲得鎖之前 執行緒池 shut down 51 int rs = runStateOf(ctl.get()); 52 53 if (rs < SHUTDOWN || 54 (rs == SHUTDOWN && firstTask == null)) { 55 if (t.isAlive()) // precheck that t is startable 56 throw new IllegalThreadStateException(); 57 workers.add(w); 58 int s = workers.size(); 59 if (s > largestPoolSize) 60 largestPoolSize = s; 61 workerAdded = true; 62 } 63 } finally { 64 mainLock.unlock(); 65 } 66 if (workerAdded) { 67 //走到這裡執行緒肯定是建立了,並且執行緒池一定是正常的。 68 t.start(); 69 workerStarted = true; 70 } 71 } 72 } finally { 73 if (! workerStarted) 74 addWorkerFailed(w); 75 } 76 return workerStarted; 77 }
addWoker 中建立了一個Worker,我們先看一下構造方法,再慢慢分析它。
1 Worker(Runnable firstTask) { 2 setState(-1); // inhibit interrupts until runWorker 3 this.firstTask = firstTask; 4 this.thread = getThreadFactory().newThread(this); 5 }
1 public interface ThreadFactory { 2 3 /** 4 * Constructs a new {@code Thread}. Implementations may also initialize 5 * priority, name, daemon status, {@code ThreadGroup}, etc. 6 * 7 * @param r a runnable to be executed by new thread instance 8 * @return constructed thread, or {@code null} if the request to 9 * create a thread is rejected 10 */ 11 Thread newThread(Runnable r); 12 }
可以看到 ThreadFactory 實際就是建立執行緒的方法,同時傳入一個 Runnable , worker 裡面傳入了一個this ,我們趕緊看一下worker的定義。
1 private final class Worker extends AbstractQueuedSynchronizer implements Runnable
意圖很明顯,就是worker 本身帶有一個任務(可以為NULL),讓剛建立的執行緒去執行這個任務。下面看一下它到底執行了什麼?
1 /** Delegates main run loop to outer runWorker */ 2 public void run() { 3 runWorker(this); 4 } 5 6 7 8 /** Delegates main run loop to outer runWorker */ 9 public void run() { 10 runWorker(this); 11 } 12 13 14 /** 15 * Main worker run loop. Repeatedly gets tasks from queue and 16 * executes them, while coping with a number of issues: 17 * 18 * 1. We may start out with an initial task, in which case we 19 * don't need to get the first one. Otherwise, as long as pool is 20 * running, we get tasks from getTask. If it returns null then the 21 * worker exits due to changed pool state or configuration 22 * parameters. Other exits result from exception throws in 23 * external code, in which case completedAbruptly holds, which 24 * usually leads processWorkerExit to replace this thread. 25 * 26 * 2. Before running any task, the lock is acquired to prevent 27 * other pool interrupts while the task is executing, and then we 28 * ensure that unless pool is stopping, this thread does not have 29 * its interrupt set. 30 * 31 * 3. Each task run is preceded by a call to beforeExecute, which 32 * might throw an exception, in which case we cause thread to die 33 * (breaking loop with completedAbruptly true) without processing 34 * the task. 35 * 36 * 4. Assuming beforeExecute completes normally, we run the task, 37 * gathering any of its thrown exceptions to send to afterExecute. 38 * We separately handle RuntimeException, Error (both of which the 39 * specs guarantee that we trap) and arbitrary Throwables. 40 * Because we cannot rethrow Throwables within Runnable.run, we 41 * wrap them within Errors on the way out (to the thread's 42 * UncaughtExceptionHandler). Any thrown exception also 43 * conservatively causes thread to die. 44 * 45 * 5. After task.run completes, we call afterExecute, which may 46 * also throw an exception, which will also cause thread to 47 * die. According to JLS Sec 14.20, this exception is the one that 48 * will be in effect even if task.run throws. 49 * 50 * The net effect of the exception mechanics is that afterExecute 51 * and the thread's UncaughtExceptionHandler have as accurate 52 * information as we can provide about any problems encountered by 53 * user code. 54 * 55 * @param w the worker 56 * 57 * 58 * 這裡可以看到執行完任務後,就會阻塞在 getTask ,而執行緒沒有被回收 59 * 除非getTask 返回 null ,所有我們利用 一些呼叫滿足 getTask 返回 null 60 * 例如 : 超時設定 61 * 62 * 63 **/ 64 final void runWorker(Worker w) { 65 Thread wt = Thread.currentThread(); 66 Runnable task = w.firstTask; 67 w.firstTask = null; 68 w.unlock(); // allow interrupts 允許中斷,釋放鎖(為了下面搶任務) 69 boolean completedAbruptly = true; 70 try { 71 //搶任務,搶到就加鎖,即是 firstTask 為null 時才會去 getTask 72 while (task != null || (task = getTask()) != null) { 73 // 加鎖,防止執行緒被其他執行緒中斷 74 w.lock(); 75 // If pool is stopping, ensure thread is interrupted; 76 // if not, ensure thread is not interrupted. This 77 // requires a recheck in second case to deal with 78 // shutdownNow race while clearing interrupt 79 if ((runStateAtLeast(ctl.get(), STOP) || 80 (Thread.interrupted() && 81 runStateAtLeast(ctl.get(), STOP))) && 82 !wt.isInterrupted()) 83 wt.interrupt(); 84 try { 85 //子類實現 86 beforeExecute(wt, task); 87 Throwable thrown = null; 88 try { 89 task.run(); 90 } catch (RuntimeException x) { 91 thrown = x; throw x; 92 } catch (Error x) { 93 thrown = x; throw x; 94 } catch (Throwable x) { 95 thrown = x; throw new Error(x); 96 } finally { 97 //丟擲異常後,依舊執行這裡 98 afterExecute(task, thrown); 99 } 100 } finally { 101 task = null; 102 w.completedTasks++; 103 w.unlock(); 104 } 105 } 106 //來到這裡,1.task == null 107 completedAbruptly = false; 108 } finally { 109 processWorkerExit(w, completedAbruptly); 110 } 111 } 112 113 114 115 116 117 /** 118 * Performs blocking or timed wait for a task, depending on 119 * current configuration settings, or returns null if this worker 120 * must exit because of any of: 121 * 1. There are more than maximumPoolSize workers (due to 122 * a call to setMaximumPoolSize). 123 * 2. The pool is stopped. 124 * 3. The pool is shutdown and the queue is empty. 125 * 4. This worker timed out waiting for a task, and timed-out 126 * workers are subject to termination (that is, 127 * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 128 * both before and after the timed wait, and if the queue is 129 * non-empty, this worker is not the last thread in the pool. 130 * 131 * @return task, or null if the worker must exit, in which case 132 * workerCount is decremented 133 * 134 * 135 * 136 * 上面的註解是 4 種返回 null 的情況, 其中第一種的原因有沒有可能是因為其他執行緒建立執行緒導致的呢? 137 * worker 必須退出,順便數量在這裡減少一 138 * 139 */ 140 private Runnable getTask() { 141 boolean timedOut = false; // Did the last poll() time out? 142 143 for (;;) { 144 int c = ctl.get(); 145 int rs = runStateOf(c); 146 147 // Check if queue empty only if necessary. 148 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 149 decrementWorkerCount(); 150 return null; 151 } 152 153 int wc = workerCountOf(c); 154 155 // Are workers subject to culling? 156 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 157 158 //自旋失敗後繼續 loop 直到 成功 159 if ((wc > maximumPoolSize || (timed && timedOut)) 160 && (wc > 1 || workQueue.isEmpty())) { 161 //這裡自旋 162 if (compareAndDecrementWorkerCount(c)) 163 return null; 164 continue; 165 } 166 167 try { 168 //前面 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 169 //可以知道 keepAliveTime 區別於 上面兩個條件 1. allowCoreThreadTimeOut 2.wc > corePoolSize 170 Runnable r = timed ? 171 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 172 workQueue.take(); 173 if (r != null) 174 return r; 175 timedOut = true; 176 // 在這裡會捕獲中斷異常!!這裡很重要,提供了假如是呼叫了 shutDown() 方法,執行緒可以退出的出口 177 } catch (InterruptedException retry) { 178 timedOut = false; 179 } 180 } 181 }
思路就是從任務佇列中不斷拿取任務(無任務狀態下,執行緒阻塞),然後執行該任務。有個很奇怪的地方,runworker 中在獲取任務前釋放了鎖,在獲取到任務後再次獲取鎖。為什麼呢?在 worker 這個類前有註解。
1 /** 2 * Class Worker mainly maintains interrupt control state for 3 * threads running tasks, along with other minor bookkeeping. 4 * This class opportunistically extends AbstractQueuedSynchronizer 5 * to simplify acquiring and releasing a lock surrounding each 6 * task execution. This protects against interrupts that are 7 * intended to wake up a worker thread waiting for a task from 8 * instead interrupting a task being run. We implement a simple 9 * non-reentrant mutual exclusion lock rather than use 10 * ReentrantLock because we do not want worker tasks to be able to 11 * reacquire the lock when they invoke pool control methods like 12 * setCorePoolSize. Additionally, to suppress interrupts until 13 * the thread actually starts running tasks, we initialize lock 14 * state to a negative value, and clear it upon start (in 15 * runWorker). 16 */
worker 使用了“中斷控制狀態”來維護執行緒執行,該類繼承 AbstractQueueSynchronizer ,它的作用是當這個執行緒在執行任務時不被其他執行緒中斷,而是讓其他執行緒等待被喚醒。同時,該類使用無重入的獨佔互斥鎖而不是 ReentrantLock ,因為我們不想在呼叫setCorePoolSize 重入該鎖。
現線上程池就在愉快地執行任務了,假如我這時候停止執行緒池。
1 public void shutdown() { 2 final ReentrantLock mainLock = this.mainLock; 3 mainLock.lock(); 4 try { 5 //檢查是否可以 shutdown 6 checkShutdownAccess(); 7 //設定為 SHUTDOWN 8 advanceRunState(SHUTDOWN); 9 //中斷所有worker 10 interruptIdleWorkers(); 11 onShutdown(); // hook for ScheduledThreadPoolExecutor 12 } finally { 13 mainLock.unlock(); 14 } 15 //終結這個執行緒池 16 tryTerminate(); 17 } 18 19 20 21 /** 22 * If there is a security manager, makes sure caller has 23 * permission to shut down threads in general (see shutdownPerm). 24 * If this passes, additionally makes sure the caller is allowed 25 * to interrupt each worker thread. This might not be true even if 26 * first check passed, if the SecurityManager treats some threads 27 * specially. 28 */ 29 private void checkShutdownAccess() { 30 SecurityManager security = System.getSecurityManager(); 31 if (security != null) { 32 security.checkPermission(shutdownPerm); 33 final ReentrantLock mainLock = this.mainLock; 34 mainLock.lock(); 35 try { 36 for (Worker w : workers) 37 security.checkAccess(w.thread); 38 } finally { 39 mainLock.unlock(); 40 } 41 } 42 } 43 44 45 46 /** 47 * Transitions runState to given target, or leaves it alone if 48 * already at least the given target. 49 * 50 * @param targetState the desired state, either SHUTDOWN or STOP 51 * (but not TIDYING or TERMINATED -- use tryTerminate for that) 52 */ 53 private void advanceRunState(int targetState) { 54 for (;;) { 55 int c = ctl.get(); 56 if (runStateAtLeast(c, targetState) || 57 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 58 break; 59 } 60 } 61 62 63 private void interruptIdleWorkers() { 64 interruptIdleWorkers(false); 65 } 66 67 68 69 /** 70 * Interrupts threads that might be waiting for tasks (as 71 * indicated by not being locked) so they can check for 72 * termination or configuration changes. Ignores 73 * SecurityExceptions (in which case some threads may remain 74 * uninterrupted). 75 * 76 * @param onlyOne If true, interrupt at most one worker. This is 77 * called only from tryTerminate when termination is otherwise 78 * enabled but there are still other workers. In this case, at 79 * most one waiting worker is interrupted to propagate shutdown 80 * signals in case all threads are currently waiting. 81 * Interrupting any arbitrary thread ensures that newly arriving 82 * workers since shutdown began will also eventually exit. 83 * To guarantee eventual termination, it suffices to always 84 * interrupt only one idle worker, but shutdown() interrupts all 85 * idle workers so that redundant workers exit promptly, not 86 * waiting for a straggler task to finish. 87 * 88 * 89 * 從方法名可以看出 : 中斷空閒 workers 90 * 從下面的程式碼也可以看到要是傳過來的引數是 false 91 * 那麼所有執行緒將被中斷 (shutDown()方法有運用到) 92 * 93 */ 94 private void interruptIdleWorkers(boolean onlyOne) { 95 final ReentrantLock mainLock = this.mainLock; 96 mainLock.lock(); 97 try { 98 99 for (Worker w : workers) { 100 Thread t = w.thread; 101 //這裡 worker 的 tryLock 需要注意一下,這裡要是 worker 正在執行任務(這就解釋了為什麼在runWorker 方法中,worker要加鎖了), 102 // 那麼 tryLock 返回 false, 103 if (!t.isInterrupted() && w.tryLock()) { 104 try { 105 t.interrupt(); 106 } catch (SecurityException ignore) { 107 //注意這裡忽略了這個 exception 108 //所以文件中指出了有可能某些執行緒依舊會保持為非中斷狀態 109 } finally { 110 w.unlock(); 111 } 112 } 113 if (onlyOne) 114 break; 115 } 116 } finally { 117 mainLock.unlock(); 118 } 119 } 120 121 122 /** 123 * Performs any further cleanup following run state transition on 124 * invocation of shutdown. A no-op here, but used by 125 * ScheduledThreadPoolExecutor to cancel delayed tasks. 126 */ 127 void onShutdown() { 128 } 129 130 131 132 133 /** 134 * Attempts to stop all actively executing tasks, halts the 135 * processing of waiting tasks, and returns a list of the tasks 136 * that were awaiting execution. These tasks are drained (removed) 137 * from the task queue upon return from this method. 138 * 139 * <p>This method does not wait for actively executing tasks to 140 * terminate. Use {@link #awaitTermination awaitTermination} to 141 * do that. 142 * 143 * <p>There are no guarantees beyond best-effort attempts to stop 144 * processing actively executing tasks. This implementation 145 * cancels tasks via {@link Thread#interrupt}, so any task that 146 * fails to respond to interrupts may never terminate. 147 * 148 * @throws SecurityException {@inheritDoc} 149 */ 150 public List<Runnable> shutdownNow() { 151 List<Runnable> tasks; 152 final ReentrantLock mainLock = this.mainLock; 153 mainLock.lock(); 154 try { 155 checkShutdownAccess(); 156 // STOP 終於在這裡發揮了作用!!S:D 157 advanceRunState(STOP); 158 interruptWorkers(); 159 //remove task 從佇列中 160 tasks = drainQueue(); 161 } finally { 162 mainLock.unlock(); 163 } 164 tryTerminate(); 165 return tasks; 166 } 167
看一下 tryTerminate 方法。
1 /** 2 * Transitions to TERMINATED state if either (SHUTDOWN and pool 3 * and queue empty) or (STOP and pool empty). If otherwise 4 * eligible to terminate but workerCount is nonzero, interrupts an 5 * idle worker to ensure that shutdown signals propagate. This 6 * method must be called following any action that might make 7 * termination possible -- reducing worker count or removing tasks 8 * from the queue during shutdown. The method is non-private to 9 * allow access from ScheduledThreadPoolExecutor. 10 * 11 * 12 * 如果滿足其中一個條件 : 13 * 1. SHUTDOWN 並且 workerCount為 0 並且 佇列為空 14 * 2. STOP 並且 workerCount為 0 15 * 那麼將狀態轉化為 TERMINATED ; 16 * 17 * 如果 workerCount(c)!=0 ,那麼呼叫 interruptIdleWorkers(true); 然後就return 18 * 19 * 20 * 所以我們可以知道線上程池正常的狀態的下必定直接 return 21 * 22 */ 23 final void tryTerminate() { 24 for (;;) { 25 int c = ctl.get(); 26 if (isRunning(c) || 27 runStateAtLeast(c, TIDYING) || 28 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 29 return; 30 //具備條件時執行下面 31 if (workerCountOf(c) != 0) { // Eligible to terminate 32 // 傳入一個引數 true ,表示只中斷一個,這是因為,每個執行緒當自己沒任務時,肯定 33 interruptIdleWorkers(ONLY_ONE); 34 return; 35 } 36 37 final ReentrantLock mainLock = this.mainLock; 38 mainLock.lock(); 39 try { 40 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 41 try { 42 //看到了嗎,介紹執行緒池狀態時,講到要是是 TIDYING 狀態時,會呼叫這個鉤子方法 43 terminated(); 44 } finally { 45 ctl.set(ctlOf(TERMINATED, 0)); 46 termination.signalAll(); 47 } 48 return; 49 } 50 } finally { 51 mainLock.unlock(); 52 } 53 // else retry on failed CAS 54 } 55 }
從上面我們看到要是呼叫了 shutDown() 或是 shutDownNow ()那麼我們正阻塞在 getTask()方法的執行緒就會收到中斷異常,於是就會getTask就會返回 null 。那麼我們繼續看一下執行緒繼續向下執行的邏輯。
1 /** 2 * Performs cleanup and bookkeeping for a dying worker. Called 3 * only from worker threads. Unless completedAbruptly is set, 4 * assumes that workerCount has already been adjusted to account 5 * for exit. This method removes thread from worker set, and 6 * possibly terminates the pool or replaces the worker if either 7 * it exited due to user task exception or if fewer than 8 * corePoolSize workers are running or queue is non-empty but 9 * there are no workers. 10 * 11 * @param w the worker 12 * @param completedAbruptly if the worker died due to user exception 13 */ 14 private void processWorkerExit(Worker w, boolean completedAbruptly) { 15 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 16 decrementWorkerCount(); 17 18 //加鎖執行 19 final ReentrantLock mainLock = this.mainLock; 20 mainLock.lock(); 21 try { 22 completedTaskCount += w.completedTasks; 23 //移除 worker 24 workers.remove(w); 25 } finally { 26 mainLock.unlock(); 27 } 28 29 tryTerminate(); 30 31 int c = ctl.get(); 32 // replacement 的意思在這裡,是上面已經移除了一個 worker , 這裡呼叫 addWorker 再補充 33 if (runStateLessThan(c, STOP)) { 34 if (!completedAbruptly) { 35 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 36 if (min == 0 && ! workQueue.isEmpty()) 37 min = 1; 38 if (workerCountOf(c) >= min) 39 return; // replacement not needed 40 } 41 addWorker(null, false); 42 } 43 } 44
下一篇我們我們講TLE 具體衍生的不同型別的執行緒池。