1. 程式人生 > >java 執行緒池(1)

java 執行緒池(1)

ThreadPoolExecutor概述

        ThreadPoolExecutor 下文簡稱 TPE ,我們使用它都是從Executror 這個類中的方法 :

  1     public static ExecutorService newFixedThreadPool(int nThreads) {
  2         return new ThreadPoolExecutor(nThreads, nThreads,
  3                                       0L, TimeUnit.MILLISECONDS,
  4
new LinkedBlockingQueue<Runnable>()); 5 } 6 7 8 public static ExecutorService newSingleThreadExecutor() { 9 return new FinalizableDelegatedExecutorService 10 (new ThreadPoolExecutor(1, 1, 11 0L, TimeUnit.MILLISECONDS, 12
new LinkedBlockingQueue<Runnable>())); 13 } 14 15 16 17 public static ExecutorService newCachedThreadPool() { 18 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 19 60L, TimeUnit.SECONDS, 20 new
SynchronousQueue<Runnable>()); 21 } 22 23 24 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 25 return new ScheduledThreadPoolExecutor(corePoolSize); 26 } 27 28 //ScheduledExecutorService 29 public ScheduledThreadPoolExecutor(int corePoolSize) { 30 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 31 new DelayedWorkQueue()); 32 } 33 34 35 public class ScheduledThreadPoolExecutor 36 extends ThreadPoolExecutor 37 implements ScheduledExecutorService 38 39

       Executror 的方法名很明顯地說明了建立的物件的用途,我們也可以看到它們實際的都是走到了TLE建構函式,只是傳入的引數不同。

  1     public ThreadPoolExecutor(int corePoolSize,
  2                               int maximumPoolSize,
  3                               long keepAliveTime,
  4                               TimeUnit unit,
  5                               BlockingQueue<Runnable> workQueue,
  6                               ThreadFactory threadFactory) {
  7         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  8              threadFactory, defaultHandler);
  9     }
 10 
 11 
 12     public ThreadPoolExecutor(int corePoolSize,
 13                               int maximumPoolSize,
 14                               long keepAliveTime,
 15                               TimeUnit unit,
 16                               BlockingQueue<Runnable> workQueue,
 17                               ThreadFactory threadFactory,
 18                               RejectedExecutionHandler handler) {
 19         if (corePoolSize < 0 ||
 20             maximumPoolSize <= 0 ||
 21             maximumPoolSize < corePoolSize ||
 22             keepAliveTime < 0)
 23             throw new IllegalArgumentException();
 24         if (workQueue == null || threadFactory == null || handler == null)
 25             throw new NullPointerException();
 26         this.corePoolSize = corePoolSize;
 27         this.maximumPoolSize = maximumPoolSize;
 28         this.workQueue = workQueue;
 29         this.keepAliveTime = unit.toNanos(keepAliveTime);
 30         this.threadFactory = threadFactory;
 31         this.handler = handler;
 32     }

          由此可以推斷通過配置TLE的各個引數,實現不同的功能。

 

 

ThreadPoolExecutor重要知識點

 

      官方文件中有詳細介紹,細節請看官方文件

 

Core and maximum pool sizes

        這兩個在構造方法中需要指定,核心執行緒數和最大執行緒池執行緒數,很好理解,就像兩條上限線,當來任務時沒達到核心執行緒數,那麼就開啟一條新執行緒去執行,要是達到核心數量了,怎麼辦,任務入列,要是超過了我設定的最大執行緒數量,那麼不再接受任務。這兩個值可以動態設定:   setCorePoolSize(int)setMaximumPoolSize(int).

 

On-demand construction

         預設情況下,甚至核心執行緒最初只在新任務到達時建立並啟動,但可以使用方法prestartCoreThread()或prestartAllCoreThreads()動態覆蓋。 如果使用非空佇列構造池,則可能需要預啟動執行緒。

 

Creating new threads

         使用ThreadFactory建立新執行緒。 如果沒有另外指定,則使用Executors.defaultThreadFactory(),它將所有執行緒建立在同一個ThreadGroup中,並具有相同的NORM_PRIORITY優先順序和非守護程序狀態。 通過提供不同的ThreadFactory,您可以更改執行緒的名稱,執行緒組,優先順序,守護程式狀態等。如果ThreadFactory在通過從newThread返回null請求時無法建立執行緒,則執行程式將繼續,但可能無法 執行任何任務。 執行緒應該擁有“modifyThread”RuntimePermission。 如果使用池的工作執行緒或其他執行緒不具有此許可權,則服務可能會降級:配置更改可能不會及時生效,並且關閉池可能保持可以終止但未完成的狀態。(取至官方文件谷歌翻譯)

 

Keep-alive times

          前面說到  Core and maximum pool sizes  就像兩個上限線,當超過了核心執行緒數後,任務開始執行完成,那麼執行緒就空閒了,此時要是空閒達到了 Keep-alive times 這個設定值,那麼執行緒就會被回收。使用Long.MAX_VALUE型別的TimeUnit.NANOSECONDS有效地禁止空閒執行緒在關閉之前終止。

      the keep-alive policy applies only when there are more than corePoolSize threads. But method allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero

      
Queuing

         佇列的使用和執行緒池的執行緒數有關。

  • 如果現有執行緒少於 corePoolSize 執行緒數量,嘗試開啟一條新的執行緒執行(能執行任務就不排隊解決的原則)
  • 達到或超過 corePoolSize ,任務入列
  • 任務佇列滿了,開新執行緒直到 maximumPoolSize,當執行緒數達到 maximumPoolSize,拒絕請求。
        

         佇列處理策略 :

  • 直接交付

     發

  • 無界佇列

    

  • 有界佇列

 

狀態標識

          狀態控制有一個變數 ctl ,它又兩部分組成,
 
  • (後29位)workCount : 指示當前有效的執行緒數量,但是並不能代表當前存活的活躍的執行緒數。
  • (高3位)runStatus : 指示執行緒池本身的狀態

          下面是狀態變數的定義

  1     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2     private static final int COUNT_BITS = Integer.SIZE - 3;
  3     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
  4 
  5     // runState is stored in the high-order bits
  6     private static final int RUNNING    = -1 << COUNT_BITS;
  7     private static final int SHUTDOWN   =  0 << COUNT_BITS;
  8     private static final int STOP       =  1 << COUNT_BITS;
  9     private static final int TIDYING    =  2 << COUNT_BITS;
 10     private static final int TERMINATED =  3 << COUNT_BITS;
 11 
 12     // Packing and unpacking ctl
 13     private static int runStateOf(int c)     { return c & ~CAPACITY; }
 14     private static int workerCountOf(int c)  { return c & CAPACITY; }
 15     private static int ctlOf(int rs, int wc) { return rs | wc; }
 16 
 17     /*
 18      * Bit field accessors that don't require unpacking ctl.
 19      * These depend on the bit layout and on workerCount being never negative.
 20      */
 21 
 22     private static boolean runStateLessThan(int c, int s) {
 23         return c < s;
 24     }
 25 
 26     private static boolean runStateAtLeast(int c, int s) {
 27         return c >= s;
 28     }
 29 
 30     private static boolean isRunning(int c) {
 31         return c < SHUTDOWN;
 32     }

          runStatus 提供這幾種狀態 :

 

  • RUNNING : 執行
  • SHUTDOWN : 停止接收新任務,執行已經入隊的任務。
  • STOP :  不再接受新任務,不執行已經入列的任務,中斷正在執行的任務。
  • TIDING : 停止接收新任務,所有任務被暫停,workCount 變為 0 ,執行緒狀態變為 TIDYING.將執行hook 方法 terminated.
  • TERMINATED : terminated方法完成。

 

 

ThreadPoolExecutor原始碼分析

         看 execute 方法。

  1      //可以建立執行緒處理就處理,不行就入列,入列也失敗,拒絕!
  2      public void execute(Runnable command) {
  3         if (command == null)
  4             throw new NullPointerException();
  5         /*
  6          * Proceed in 3 steps:
  7          *
  8          * 1. If fewer than corePoolSize threads are running, try to
  9          * start a new thread with the given command as its first
 10          * task.  The call to addWorker atomically checks runState and
 11          * workerCount, and so prevents false alarms that would add
 12          * threads when it shouldn't, by returning false.
 13          *
 14          * 2. If a task can be successfully queued, then we still need
 15          * to double-check whether we should have added a thread
 16          * (because existing ones died since last checking) or that
 17          * the pool shut down since entry into this method. So we
 18          * recheck state and if necessary roll back the enqueuing if
 19          * stopped, or start a new thread if there are none.
 20          *
 21          * 3. If we cannot queue task, then we try to add a new
 22          * thread.  If it fails, we know we are shut down or saturated
 23          * and so reject the task.
 24          */
 25         int c = ctl.get();
 26         //未達到 corePoolSize 增加新執行緒執行
 27         if (workerCountOf(c) < corePoolSize) {
 28             if (addWorker(command, true))
 29                 return;
 30             c = ctl.get();
 31         }
 32         //增加執行緒失敗,或者有可能 worker的數量大於等於 core ,或是大於 maxSize ,任務入列 
 33         if (isRunning(c) && workQueue.offer(command)) {
 34             //注意 :此時任務已成功入列!!!
 35             int recheck = ctl.get();
 36             //再次檢查,要是 此時是 SHUTDOWN 狀態(執行緒池關閉),那麼移除這個任務,同時拒絕這個請求
 37             if (! isRunning(recheck) && remove(command))
 38                 reject(command);
 39             //非running 狀態 ,同時 workerCounter 為 0 (可能執行緒池裡的任務都執行完了),那麼新建一個執行緒,而不去處理,為什麼要這樣呢?
 40             //因為任務此時在佇列中了,建立執行緒後,自動會去獲取任務並處理
 41             //要是都不是就退出這個方法,此時任務在佇列中等待被處理
 42             else if (workerCountOf(recheck) == 0)
 43                 addWorker(null, false);
 44         }
 45         //執行緒池 shut down 或是佇列滿了,再次新建一個執行緒執行,但是這次的執行緒數的判斷邊界是 maxSize ,即是
 46         // addWorker的第二個引數來指定 
 47         else if (!addWorker(command, false))
 48             reject(command);
 49     }

         假設目前還未達到core 的數量,那麼進入 addWorker方法 。

  1     private boolean addWorker(Runnable firstTask, boolean core) {
  2         retry:
  3         for (;;) {
  4             int c = ctl.get();
  5             int rs = runStateOf(c);
  6 
  7             // Check if queue empty only if necessary.  
  8             // 執行緒池滿足如下條件中的任意一種時, 就會直接結束該方法, 並且返回 false
  9            // 表示沒有建立新執行緒, 新提交的任務也沒有被執行.
 10            // 1 .處於 STOP, TYDING 或 TERMINATD 狀態
 11            // 2 .處於 SHUTDOWN 狀態, 並且引數 firstTask != null
 12           // 3 .處於 SHUTDOWN 狀態, firstTask == null 且阻塞佇列 workQueue為空
 13             if (rs >= SHUTDOWN &&
 14                 ! (rs == SHUTDOWN &&
 15                    firstTask == null &&
 16                    ! workQueue.isEmpty()))
 17                 return false;
 18             for (;;) {
 19                 int wc = workerCountOf(c);
 20                 //此處可以看到 第二個引數,core 是用來 選定執行緒數邊界的
 21                 if (wc >= CAPACITY ||
 22                     wc >= (core ? corePoolSize : maximumPoolSize))
 23                     return false;
 24                 //自旋增加 c , ctl的值 ,成功就 break 退出
 25                 if (compareAndIncrementWorkerCount(c))
 26                     break retry;
 27                 c = ctl.get();  // Re-read ctl
 28                 if (runStateOf(c) != rs)
 29                     //說明有人搶了,
 30                     continue retry;
 31                 // else CAS failed due to workerCount change; retry inner loop
 32                 // 或者 CAS 失敗是因為 workerCount 改變,繼續loop 
 33             }
 34         }
 35 
 36         //下面是增加一個執行緒的操作,建立 worker ,上鎖,再次判斷,建立成功後執行緒開始執行。
 37         boolean workerStarted = false;
 38         boolean workerAdded = false;
 39         Worker w = null;
 40         try {
 41             w = new Worker(firstTask);
 42             final Thread t = w.thread;
 43             if (t != null) {
 44                 final ReentrantLock mainLock = this.mainLock;
 45                 mainLock.lock();
 46                 try {
 47                     // Recheck while holding lock.
 48                     // Back out on ThreadFactory failure or if
 49                     // shut down before lock acquired.
 50                     // recheck 當獲得鎖的時候 ,退出因為 ThreadFactory 失敗或是 在獲得鎖之前 執行緒池 shut down 
 51                     int rs = runStateOf(ctl.get());
 52 
 53                     if (rs < SHUTDOWN ||
 54                         (rs == SHUTDOWN && firstTask == null)) {
 55                         if (t.isAlive()) // precheck that t is startable
 56                             throw new IllegalThreadStateException();
 57                         workers.add(w);
 58                         int s = workers.size();
 59                         if (s > largestPoolSize)
 60                             largestPoolSize = s;
 61                         workerAdded = true;
 62                     }
 63                 } finally {
 64                     mainLock.unlock();
 65                 }
 66                 if (workerAdded) {
 67                     //走到這裡執行緒肯定是建立了,並且執行緒池一定是正常的。
 68                     t.start();
 69                     workerStarted = true;
 70                 }
 71             }
 72         } finally {
 73             if (! workerStarted)
 74                 addWorkerFailed(w);
 75         }
 76         return workerStarted;
 77     }

          addWoker 中建立了一個Worker,我們先看一下構造方法,再慢慢分析它。

  1         Worker(Runnable firstTask) {
  2             setState(-1); // inhibit interrupts until runWorker
  3             this.firstTask = firstTask;
  4             this.thread = getThreadFactory().newThread(this);
  5         }
  1 public interface ThreadFactory {
  2 
  3     /**
  4      * Constructs a new {@code Thread}.  Implementations may also initialize
  5      * priority, name, daemon status, {@code ThreadGroup}, etc.
  6      *
  7      * @param r a runnable to be executed by new thread instance
  8      * @return constructed thread, or {@code null} if the request to
  9      *         create a thread is rejected
 10      */
 11     Thread newThread(Runnable r);
 12 }

          可以看到 ThreadFactory 實際就是建立執行緒的方法,同時傳入一個 Runnable , worker 裡面傳入了一個this ,我們趕緊看一下worker的定義。

 

  1     private final class Worker  extends AbstractQueuedSynchronizer  implements Runnable

         

        意圖很明顯,就是worker 本身帶有一個任務(可以為NULL),讓剛建立的執行緒去執行這個任務。下面看一下它到底執行了什麼?

  1         /** Delegates main run loop to outer runWorker  */
  2         public void run() {
  3             runWorker(this);
  4         }
  5 
  6 
  7 
  8         /** Delegates main run loop to outer runWorker  */
  9     public void run() {
 10         runWorker(this);
 11     }
 12 
 13 
 14      /**
 15      * Main worker run loop.  Repeatedly gets tasks from queue and
 16      * executes them, while coping with a number of issues:
 17      *
 18      * 1. We may start out with an initial task, in which case we
 19      * don't need to get the first one. Otherwise, as long as pool is
 20      * running, we get tasks from getTask. If it returns null then the
 21      * worker exits due to changed pool state or configuration
 22      * parameters.  Other exits result from exception throws in
 23      * external code, in which case completedAbruptly holds, which
 24      * usually leads processWorkerExit to replace this thread.
 25      *
 26      * 2. Before running any task, the lock is acquired to prevent
 27      * other pool interrupts while the task is executing, and then we
 28      * ensure that unless pool is stopping, this thread does not have
 29      * its interrupt set.
 30      *
 31      * 3. Each task run is preceded by a call to beforeExecute, which
 32      * might throw an exception, in which case we cause thread to die
 33      * (breaking loop with completedAbruptly true) without processing
 34      * the task.
 35      *
 36      * 4. Assuming beforeExecute completes normally, we run the task,
 37      * gathering any of its thrown exceptions to send to afterExecute.
 38      * We separately handle RuntimeException, Error (both of which the
 39      * specs guarantee that we trap) and arbitrary Throwables.
 40      * Because we cannot rethrow Throwables within Runnable.run, we
 41      * wrap them within Errors on the way out (to the thread's
 42      * UncaughtExceptionHandler).  Any thrown exception also
 43      * conservatively causes thread to die.
 44      *
 45      * 5. After task.run completes, we call afterExecute, which may
 46      * also throw an exception, which will also cause thread to
 47      * die. According to JLS Sec 14.20, this exception is the one that
 48      * will be in effect even if task.run throws.
 49      *
 50      * The net effect of the exception mechanics is that afterExecute
 51      * and the thread's UncaughtExceptionHandler have as accurate
 52      * information as we can provide about any problems encountered by
 53      * user code.
 54      *
 55      * @param w the worker
 56      *
 57      *
 58      *  這裡可以看到執行完任務後,就會阻塞在 getTask ,而執行緒沒有被回收
 59      *  除非getTask 返回 null ,所有我們利用 一些呼叫滿足 getTask 返回 null
 60      *  例如 : 超時設定
 61      *
 62      *
 63      **/
 64     final void runWorker(Worker w) {
 65         Thread wt = Thread.currentThread();
 66         Runnable task = w.firstTask;
 67         w.firstTask = null;
 68         w.unlock(); // allow interrupts  允許中斷,釋放鎖(為了下面搶任務)
 69         boolean completedAbruptly = true;
 70         try {
 71             //搶任務,搶到就加鎖,即是 firstTask 為null 時才會去 getTask 
 72             while (task != null || (task = getTask()) != null) {
 73                 // 加鎖,防止執行緒被其他執行緒中斷 
 74                 w.lock();
 75                 // If pool is stopping, ensure thread is interrupted;
 76                 // if not, ensure thread is not interrupted.  This
 77                 // requires a recheck in second case to deal with
 78                 // shutdownNow race while clearing interrupt
 79                 if ((runStateAtLeast(ctl.get(), STOP) ||
 80                      (Thread.interrupted() &&
 81                       runStateAtLeast(ctl.get(), STOP))) &&
 82                     !wt.isInterrupted())
 83                     wt.interrupt();
 84                 try {
 85                     //子類實現
 86                     beforeExecute(wt, task);
 87                     Throwable thrown = null;
 88                     try {
 89                         task.run();
 90                     } catch (RuntimeException x) {
 91                         thrown = x; throw x;
 92                     } catch (Error x) {
 93                         thrown = x; throw x;
 94                     } catch (Throwable x) {
 95                         thrown = x; throw new Error(x);
 96                     } finally {
 97                         //丟擲異常後,依舊執行這裡
 98                         afterExecute(task, thrown);
 99                     }
100                 } finally {
101                     task = null;
102                     w.completedTasks++;
103                     w.unlock();
104                 }
105             }
106             //來到這裡,1.task == null  
107             completedAbruptly = false;
108         } finally {
109             processWorkerExit(w, completedAbruptly);
110         }
111     }
112 
113 
114 
115 
116 
117     /**
118      * Performs blocking or timed wait for a task, depending on
119      * current configuration settings, or returns null if this worker
120      * must exit because of any of:
121      * 1. There are more than maximumPoolSize workers (due to
122      *    a call to setMaximumPoolSize).
123      * 2. The pool is stopped.
124      * 3. The pool is shutdown and the queue is empty.
125      * 4. This worker timed out waiting for a task, and timed-out
126      *    workers are subject to termination (that is,
127      *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
128      *    both before and after the timed wait, and if the queue is
129      *    non-empty, this worker is not the last thread in the pool.
130      *
131      * @return task, or null if the worker must exit, in which case
132      *         workerCount is decremented
133      *
134      *
135      *
136      * 上面的註解是 4 種返回 null 的情況, 其中第一種的原因有沒有可能是因為其他執行緒建立執行緒導致的呢?
137      *  worker 必須退出,順便數量在這裡減少一
138      *
139      */
140     private Runnable getTask() {
141         boolean timedOut = false; // Did the last poll() time out?
142 
143         for (;;) {
144             int c = ctl.get();
145             int rs = runStateOf(c);
146 
147             // Check if queue empty only if necessary.
148             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
149                 decrementWorkerCount();
150                 return null;
151             }
152 
153             int wc = workerCountOf(c);
154 
155             // Are workers subject to culling?
156             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
157 
158             //自旋失敗後繼續 loop 直到 成功 
159             if ((wc > maximumPoolSize || (timed && timedOut))
160                 && (wc > 1 || workQueue.isEmpty())) {
161                 //這裡自旋
162                 if (compareAndDecrementWorkerCount(c))
163                     return null;
164                 continue;
165             }
166 
167             try {
168                 //前面  boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
169                 //可以知道 keepAliveTime 區別於 上面兩個條件  1. allowCoreThreadTimeOut  2.wc > corePoolSize 
170                 Runnable r = timed ?
171                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
172                     workQueue.take();
173                 if (r != null)
174                     return r;
175                 timedOut = true;
176             // 在這裡會捕獲中斷異常!!這裡很重要,提供了假如是呼叫了 shutDown() 方法,執行緒可以退出的出口
177             } catch (InterruptedException retry) {
178                 timedOut = false;
179             }
180         }
181     }

         思路就是從任務佇列中不斷拿取任務(無任務狀態下,執行緒阻塞),然後執行該任務。有個很奇怪的地方,runworker 中在獲取任務前釋放了鎖,在獲取到任務後再次獲取鎖。為什麼呢?在 worker 這個類前有註解。

  1    /**
  2      * Class Worker mainly maintains interrupt control state for
  3      * threads running tasks, along with other minor bookkeeping.
  4      * This class opportunistically extends AbstractQueuedSynchronizer
  5      * to simplify acquiring and releasing a lock surrounding each
  6      * task execution.  This protects against interrupts that are
  7      * intended to wake up a worker thread waiting for a task from
  8      * instead interrupting a task being run.  We implement a simple
  9      * non-reentrant mutual exclusion lock rather than use
 10      * ReentrantLock because we do not want worker tasks to be able to
 11      * reacquire the lock when they invoke pool control methods like
 12      * setCorePoolSize.  Additionally, to suppress interrupts until
 13      * the thread actually starts running tasks, we initialize lock
 14      * state to a negative value, and clear it upon start (in
 15      * runWorker).
 16      */

         worker 使用了“中斷控制狀態”來維護執行緒執行,該類繼承 AbstractQueueSynchronizer ,它的作用是當這個執行緒在執行任務時不被其他執行緒中斷,而是讓其他執行緒等待被喚醒。同時,該類使用無重入的獨佔互斥鎖而不是 ReentrantLock ,因為我們不想在呼叫setCorePoolSize 重入該鎖。

 

         現線上程池就在愉快地執行任務了,假如我這時候停止執行緒池。

  1     public void shutdown() {
  2         final ReentrantLock mainLock = this.mainLock;
  3         mainLock.lock();
  4         try {
  5             //檢查是否可以 shutdown 
  6             checkShutdownAccess();
  7             //設定為 SHUTDOWN 
  8             advanceRunState(SHUTDOWN);
  9             //中斷所有worker 
 10             interruptIdleWorkers();
 11             onShutdown(); // hook for ScheduledThreadPoolExecutor
 12         } finally {
 13             mainLock.unlock();
 14         }
 15         //終結這個執行緒池
 16         tryTerminate();
 17     }
 18 
 19 
 20 
 21     /**
 22      * If there is a security manager, makes sure caller has
 23      * permission to shut down threads in general (see shutdownPerm).
 24      * If this passes, additionally makes sure the caller is allowed
 25      * to interrupt each worker thread. This might not be true even if
 26      * first check passed, if the SecurityManager treats some threads
 27      * specially.
 28      */
 29     private void checkShutdownAccess() {
 30         SecurityManager security = System.getSecurityManager();
 31         if (security != null) {
 32             security.checkPermission(shutdownPerm);
 33             final ReentrantLock mainLock = this.mainLock;
 34             mainLock.lock();
 35             try {
 36                 for (Worker w : workers)
 37                     security.checkAccess(w.thread);
 38             } finally {
 39                 mainLock.unlock();
 40             }
 41         }
 42     }
 43 
 44 
 45 
 46     /**
 47      * Transitions runState to given target, or leaves it alone if
 48      * already at least the given target.
 49      *
 50      * @param targetState the desired state, either SHUTDOWN or STOP
 51      *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
 52      */
 53     private void advanceRunState(int targetState) {
 54         for (;;) {
 55             int c = ctl.get();
 56             if (runStateAtLeast(c, targetState) ||
 57                 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
 58                 break;
 59         }
 60     }
 61 
 62 
 63     private void interruptIdleWorkers() {
 64         interruptIdleWorkers(false);
 65     }
 66 
 67 
 68 
 69     /**
 70      * Interrupts threads that might be waiting for tasks (as
 71      * indicated by not being locked) so they can check for
 72      * termination or configuration changes. Ignores
 73      * SecurityExceptions (in which case some threads may remain
 74      * uninterrupted).
 75      *
 76      * @param onlyOne If true, interrupt at most one worker. This is
 77      * called only from tryTerminate when termination is otherwise
 78      * enabled but there are still other workers.  In this case, at
 79      * most one waiting worker is interrupted to propagate shutdown
 80      * signals in case all threads are currently waiting.
 81      * Interrupting any arbitrary thread ensures that newly arriving
 82      * workers since shutdown began will also eventually exit.
 83      * To guarantee eventual termination, it suffices to always
 84      * interrupt only one idle worker, but shutdown() interrupts all
 85      * idle workers so that redundant workers exit promptly, not
 86      * waiting for a straggler task to finish.
 87      *
 88      *
 89      *  從方法名可以看出 : 中斷空閒 workers
 90      *  從下面的程式碼也可以看到要是傳過來的引數是 false
 91      *  那麼所有執行緒將被中斷  (shutDown()方法有運用到)
 92      *
 93      */
 94     private void interruptIdleWorkers(boolean onlyOne) {
 95         final ReentrantLock mainLock = this.mainLock;
 96         mainLock.lock();
 97         try {
 98 
 99             for (Worker w : workers) {
100                 Thread t = w.thread;
101                 //這裡 worker 的 tryLock 需要注意一下,這裡要是 worker 正在執行任務(這就解釋了為什麼在runWorker 方法中,worker要加鎖了),
102                 // 那麼 tryLock 返回 false,
103                 if (!t.isInterrupted() && w.tryLock()) {
104                     try {
105                         t.interrupt();
106                     } catch (SecurityException ignore) {
107                         //注意這裡忽略了這個 exception 
108                         //所以文件中指出了有可能某些執行緒依舊會保持為非中斷狀態 
109                     } finally {
110                         w.unlock();
111                     }
112                 }
113                 if (onlyOne)
114                     break;
115             }
116         } finally {
117             mainLock.unlock();
118         }
119     }
120 
121 
122     /**
123      * Performs any further cleanup following run state transition on
124      * invocation of shutdown.  A no-op here, but used by
125      * ScheduledThreadPoolExecutor to cancel delayed tasks.
126      */
127     void onShutdown() {
128     }
129 
130 
131 
132 
133     /**
134      * Attempts to stop all actively executing tasks, halts the
135      * processing of waiting tasks, and returns a list of the tasks
136      * that were awaiting execution. These tasks are drained (removed)
137      * from the task queue upon return from this method.
138      *
139      * <p>This method does not wait for actively executing tasks to
140      * terminate.  Use {@link #awaitTermination awaitTermination} to
141      * do that.
142      *
143      * <p>There are no guarantees beyond best-effort attempts to stop
144      * processing actively executing tasks.  This implementation
145      * cancels tasks via {@link Thread#interrupt}, so any task that
146      * fails to respond to interrupts may never terminate.
147      *
148      * @throws SecurityException {@inheritDoc}
149      */
150     public List<Runnable> shutdownNow() {
151         List<Runnable> tasks;
152         final ReentrantLock mainLock = this.mainLock;
153         mainLock.lock();
154         try {
155             checkShutdownAccess();
156             // STOP 終於在這裡發揮了作用!!S:D
157             advanceRunState(STOP);
158             interruptWorkers();
159             //remove task 從佇列中
160             tasks = drainQueue();
161         } finally {
162             mainLock.unlock();
163         }
164         tryTerminate();
165         return tasks;
166     }
167 

         看一下 tryTerminate 方法。

  1     /**
  2      * Transitions to TERMINATED state if either (SHUTDOWN and pool
  3      * and queue empty) or (STOP and pool empty).  If otherwise
  4      * eligible to terminate but workerCount is nonzero, interrupts an
  5      * idle worker to ensure that shutdown signals propagate. This
  6      * method must be called following any action that might make
  7      * termination possible -- reducing worker count or removing tasks
  8      * from the queue during shutdown. The method is non-private to
  9      * allow access from ScheduledThreadPoolExecutor.
 10      *
 11      *
 12      * 如果滿足其中一個條件 :
 13      *    1. SHUTDOWN 並且 workerCount為 0 並且 佇列為空
 14      *    2. STOP 並且 workerCount為 0
 15      *    那麼將狀態轉化為 TERMINATED ;
 16      *
 17      *  如果  workerCount(c)!=0 ,那麼呼叫  interruptIdleWorkers(true); 然後就return
 18      *
 19      *
 20      *  所以我們可以知道線上程池正常的狀態的下必定直接 return
 21      *
 22      */
 23     final void tryTerminate() {
 24         for (;;) {
 25             int c = ctl.get();
 26             if (isRunning(c) ||
 27                 runStateAtLeast(c, TIDYING) ||
 28                 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
 29                 return;
 30             //具備條件時執行下面
 31             if (workerCountOf(c) != 0) { // Eligible to terminate
 32                 // 傳入一個引數  true ,表示只中斷一個,這是因為,每個執行緒當自己沒任務時,肯定
 33                 interruptIdleWorkers(ONLY_ONE);
 34                 return;
 35             }
 36 
 37             final ReentrantLock mainLock = this.mainLock;
 38             mainLock.lock();
 39             try {
 40                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
 41                     try {
 42                         //看到了嗎,介紹執行緒池狀態時,講到要是是 TIDYING 狀態時,會呼叫這個鉤子方法
 43                         terminated();
 44                     } finally {
 45                         ctl.set(ctlOf(TERMINATED, 0));
 46                         termination.signalAll();
 47                     }
 48                     return;
 49                 }
 50             } finally {
 51                 mainLock.unlock();
 52             }
 53             // else retry on failed CAS
 54         }
 55     }

       從上面我們看到要是呼叫了 shutDown() 或是 shutDownNow ()那麼我們正阻塞在 getTask()方法的執行緒就會收到中斷異常,於是就會getTask就會返回 null 。那麼我們繼續看一下執行緒繼續向下執行的邏輯。

  1     /**
  2      * Performs cleanup and bookkeeping for a dying worker. Called
  3      * only from worker threads. Unless completedAbruptly is set,
  4      * assumes that workerCount has already been adjusted to account
  5      * for exit.  This method removes thread from worker set, and
  6      * possibly terminates the pool or replaces the worker if either
  7      * it exited due to user task exception or if fewer than
  8      * corePoolSize workers are running or queue is non-empty but
  9      * there are no workers.
 10      *
 11      * @param w the worker
 12      * @param completedAbruptly if the worker died due to user exception
 13      */
 14     private void processWorkerExit(Worker w, boolean completedAbruptly) {
 15         if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
 16             decrementWorkerCount();
 17 
 18         //加鎖執行
 19         final ReentrantLock mainLock = this.mainLock;
 20         mainLock.lock();
 21         try {
 22             completedTaskCount += w.completedTasks;
 23             //移除 worker 
 24             workers.remove(w);
 25         } finally {
 26             mainLock.unlock();
 27         }
 28 
 29         tryTerminate();
 30 
 31         int c = ctl.get();
 32         // replacement 的意思在這裡,是上面已經移除了一個 worker , 這裡呼叫 addWorker 再補充 
 33         if (runStateLessThan(c, STOP)) {
 34             if (!completedAbruptly) {
 35                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
 36                 if (min == 0 && ! workQueue.isEmpty())
 37                     min = 1;
 38                 if (workerCountOf(c) >= min)
 39                     return; // replacement not needed
 40             }
 41             addWorker(null, false);
 42         }
 43     }
 44 

        下一篇我們我們講TLE 具體衍生的不同型別的執行緒池。

 

 

參考資料 :