1. 程式人生 > >Java線程池的底層實現與使用

Java線程池的底層實現與使用

線程 了解 hand 通過 rem 高程 滿了 nbsp 被調用

正文

前言

  在我們進行開發的時候,為了充分利用系統資源,我們通常會進行多線程開發,實現起來非常簡單,需要使用線程的時候就去創建一個線程(繼承Thread類、實現Runnable接口、使用Callable和Future),但是這樣也有一點問題,就是如果並發的線程數量很多,創建線程、銷毀線程都是需要消耗時間、資源,這個時候線程池就派上用場了

一、四種線程池的介紹

  Java通過Executors提供了四種線程池,分別是

  1.newSingleThreadExecutor()

  創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務都是按照指定的順序(FIFO,LIFO,優先級)執行

1 public static ExecutorService newSingleThreadExecutor() {
2         return new FinalizableDelegatedExecutorService
3             (new ThreadPoolExecutor(1, 1,
4                                     0L, TimeUnit.MILLISECONDS,
5                                     new LinkedBlockingQueue<Runnable>()));
6 }

2.newFixedThreadExecutor()

  創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待

1     public static ExecutorService newFixedThreadPool(int nThreads) {
2         return new ThreadPoolExecutor(nThreads, nThreads,
3                                       0L, TimeUnit.MILLISECONDS,
4                                       new
LinkedBlockingQueue<Runnable>()); 5 }

 3.newCachedThreadPool()

  創建一個可緩存的線程池,如果當前沒有可用線程,在執行結束後緩存60s,如果不被調用則移除線程。調用execute()方法時可以重用緩存中的線程。適用於很多短期異步任務的環境,可以提高程序性能。

1     public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                       60L, TimeUnit.SECONDS,
4                                       new SynchronousQueue<Runnable>());
5     }

4.newScheduledThreadPool()(在ScheduleThreadPoolExecutor類中,ThreadPoolExecutor的子類)

  創建一個定長線程池,支持定時及周期性任務執行

1     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
2         return new ScheduledThreadPoolExecutor(corePoolSize);
3     }
4 
5     public ScheduledThreadPoolExecutor(int corePoolSize) {
6         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
7               new DelayedWorkQueue());
8     }

 線程池的使用

  使用方式一(不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來創建線程池):

1         ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
2                 new ArrayBlockingQueue<Runnable>(5));
3         threadPoolExecutor.execute(new Runnable() {
4             @Override
5             public void run() {
6                 
7             }
8         });

使用方式二:

 1         ExecutorService executorService1 = Executors.newFixedThreadPool(3);
 2         ExecutorService executorService2 = Executors.newSingleThreadExecutor();
 3         ExecutorService executorService3 = Executors.newCachedThreadPool();
 4         ExecutorService executorService4 = Executors.newScheduledThreadPool(3);
 5         executorService1.execute(new Runnable() {
 6             @Override
 7             public void run() {
 8                 
 9             }
10         });

 ExecutorService是真正的線程池接口,所以我們在通過Executors創建各種線程時,都是采用上述代碼所示的方式

二、線程池的底層類與接口

  在介紹線程池的實現機制之前,先了解一下線程池重要的類或接口

  ExecutorService是真正的線程池接口

  Executors是靜態工廠的功能,生產各種類型線程池

  Executor是線程池的頂級接口,只是一個執行線程的工具,只提供一個execute(Runnable command)的方法,真正的線程池接口是ExecutorService

  AbstractExecutorService實現了ExecutorService接口,實現了其中大部分的方法(有沒有實現的方法,所以被聲明為Abstract)

  ThreadPoolExecutor,繼承了AbstractExecutorService,是ExecutorService的默認實現

1、ThreadPoolExecutor類

(1)構造函數

  java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。

  在ThreadPoolExecutor類中提供了四個構造方法(前面三個構造器都是調用第四個構造器進行的初始化工作):

 1 public class ThreadPoolExecutor extends AbstractExecutorService {
 2     .....
 3     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 4             BlockingQueue<Runnable> workQueue);
 5  
 6     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 7             BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 8  
 9     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
10             BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
11  
12     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
13         BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
14     ...
15 }

構造器中各個參數的含義:

  • corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實現原理有非常大的關系。在創建了線程池後,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池後,線程池中的線程數為0,當有任務來之後,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
  • maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程
  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
  • unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小時
    TimeUnit.MINUTES;           //分鐘
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //納秒
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這裏的阻塞隊列有以下幾種選擇:ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。
  • threadFactory:線程工廠,主要用來創建線程;
  • handler:表示當拒絕處理任務時的策略,有以下四種取值:
  ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
  ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 
  ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程)
  ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務 

(2)ThreadPoolExecutor方法

1 execute()
2 submit()
3 shutdown()
4 shutdownNow()    
5 getQueue() 
6 getPoolSize() 
7 getActiveCount()
8 getCompletedTaskCount()

execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。

  submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。

  shutdown()和shutdownNow()是用來關閉線程池的。

  getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法

  還有在下文提到的runwork()、addwork()、processworkerExit() 方法等等

2、AbstractExecutorService類

  下面我們看一下AbstractExecutorService的具體方法

 1 public abstract class AbstractExecutorService implements ExecutorService {
 2 
 3     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {...};
 4     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {...};
 5     public Future<?> submit(Runnable task) {};
 6     public <T> Future<T> submit(Runnable task, T result) {...};
 7     public <T> Future<T> submit(Callable<T> task) {...};
 8     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
 9                             boolean timed, long nanos)
10         throws InterruptedException, ExecutionException, TimeoutException {...};
11     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
12         throws InterruptedException, ExecutionException {...};
13     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
14                            long timeout, TimeUnit unit)
15         throws InterruptedException, ExecutionException, TimeoutException {...};
16     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
17         throws InterruptedException {...};
18     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
19                                          long timeout, TimeUnit unit)
20         throws InterruptedException {...};
21 }

AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。

3、ExecutorService接口

 1 public interface ExecutorService extends Executor {
 2  
 3     void shutdown();
 4     boolean isShutdown();
 5     boolean isTerminated();
 6     boolean awaitTermination(long timeout, TimeUnit unit)
 7         throws InterruptedException;
 8     <T> Future<T> submit(Callable<T> task);
 9     <T> Future<T> submit(Runnable task, T result);
10     Future<?> submit(Runnable task);
11     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
12         throws InterruptedException;
13     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
14                                   long timeout, TimeUnit unit)
15         throws InterruptedException;
16  
17     <T> T invokeAny(Collection<? extends Callable<T>> tasks)
18         throws InterruptedException, ExecutionException;
19     <T> T invokeAny(Collection<? extends Callable<T>> tasks,
20                     long timeout, TimeUnit unit)
21         throws InterruptedException, ExecutionException, TimeoutException;
22 }

而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:

4、Executor接口

1 public interface Executor {
2     void execute(Runnable command);
3 }

上述類與接口之間的關系

  Executor是一個頂層接口,在它裏面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;

  然後ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;

  然後ThreadPoolExecutor繼承了類AbstractExecutorService。

三、線程池的底層實現原理

1、線程池的狀態

 1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 2     private static final int COUNT_BITS = Integer.SIZE - 3;
 3     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 4 
 5     // runState is stored in the high-order bits
 6     private static final int RUNNING    = -1 << COUNT_BITS;
 7     private static final int SHUTDOWN   =  0 << COUNT_BITS;
 8     private static final int STOP       =  1 << COUNT_BITS;
 9     private static final int TIDYING    =  2 << COUNT_BITS;
10     private static final int TERMINATED =  3 << COUNT_BITS;
11 
12     // Packing and unpacking ctl
13     private static int runStateOf(int c)     { return c & ~CAPACITY; }
14     private static int workerCountOf(int c)  { return c & CAPACITY; }
15     private static int ctlOf(int rs, int wc) { return rs | wc; }

其中ctl這個AtomicInteger的功能很強大,其高3位用於維護線程池運行狀態,低29位維護線程池中線程數量

1、RUNNING:-1<<COUNT_BITS,即高3位為1,低29位為0,該狀態的線程池會接收新任務,也會處理在阻塞隊列中等待處理的任務

2、SHUTDOWN:0<<COUNT_BITS,即高3位為0,低29位為0,該狀態的線程池不會再接收新任務,但還會處理已經提交到阻塞隊列中等待處理的任務

3、STOP:1<<COUNT_BITS,即高3位為001,低29位為0,該狀態的線程池不會再接收新任務,不會處理在阻塞隊列中等待的任務,而且還會中斷正在運行的任務

4、TIDYING:2<<COUNT_BITS,即高3位為010,低29位為0,所有任務都被終止了,workerCount為0,為此狀態時還將調用terminated()方法

5、TERMINATED:3<<COUNT_BITS,即高3位為100,低29位為0,terminated()方法調用完成後變成此狀態

這些狀態均由int型表示,大小關系為 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,這個順序基本上也是遵循線程池從 運行 到 終止這個過程。

runStateOf(int c) 方法:c & 高3位為1,低29位為0的~CAPACITY,用於獲取高3位保存的線程池狀態

workerCountOf(int c)方法:c & 高3位為0,低29位為1的CAPACITY,用於獲取低29位的線程數量

ctlOf(int rs, int wc)方法:參數rs表示runState,參數wc表示workerCount,即根據runState和workerCount打包合並成ctl

2、執行過程

 一個任務從提交到執行完畢經歷過程如下:

  第一步:如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;

  第二步:如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;

  第三步:如果線程池中的線程數量大於等於corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會創建新的線程來處理被添加的任務

  第四步:如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;

3、任務拒絕策略

當線程池的任務緩存隊列已滿並且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,通常有以下四種策略:

  • ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
  • ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程)
  • ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

4、在執行過程中所涉及到的方法

(1)execute(Runnable command)(提交任務)

在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法裏面最終調用的還是execute()方法,所以我們只需要研究execute()方法的實現

 1 public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4          int c = ctl.get();
 5         if (workerCountOf(c) < corePoolSize) {
 6             if (addWorker(command, true))
 7                 return;
 8             c = ctl.get();
 9         }
10         if (isRunning(c) && workQueue.offer(command)) {
11             int recheck = ctl.get();
12             if (! isRunning(recheck) && remove(command))
13                 reject(command);
14             else if (workerCountOf(recheck) == 0)
15                 addWorker(null, false);
16         }
17         else if (!addWorker(command, false))
18             reject(command);
19     }

源碼解析:

1、如果線程池當前線程數量少於corePoolSize,則addWorker(command, true)創建新worker線程,如創建成功返回,如沒創建成功,則執行後續步驟;
addWorker(command, true)失敗的原因可能是:
A、線程池已經shutdown,shutdown的線程池不再接收新任務
B、workerCountOf(c) < corePoolSize 判斷後,由於並發,別的線程先創建了worker線程,導致workerCount>=corePoolSize
2、如果線程池還在running狀態,將task加入workQueue阻塞隊列中,如果加入成功,進行double-check,如果加入失敗(可能是隊列已滿),則執行後續步驟;
double-check主要目的是判斷剛加入workQueue阻塞隊列的task是否能被執行
A、如果線程池已經不是running狀態了,應該拒絕添加新任務,從workQueue中刪除任務
B、如果線程池是運行狀態,或者從workQueue中刪除任務失敗(剛好有一個線程執行完畢,並消耗了這個任務),確保還有線程執行任務(只要有一個就夠了)
3、如果線程池不是running狀態 或者 無法入隊列,嘗試開啟新線程,擴容至maxPoolSize,如果addWork(command, false)失敗了,拒絕當前command

(2)addwork(Runnable firstTask,boolean core)(添加任務)

參數說明:

  firstTask:worker線程的初始任務,可以為空

  core:true(將corePoolSize作為上限),false(將maximumPoolSize作為上限)

addWorker方法有4種傳參的方式:

1、addWorker(command, true)

2、addWorker(command, false)

3、addWorker(null, false)

4、addWorker(null, true)

在execute方法中就使用了前3種,結合這個核心方法進行以下分析
第一個:線程數小於corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就返回false
第二個:當隊列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果線程池也滿了的話就返回false
第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker在線程執行的時候會去任務隊列裏拿任務,這樣就相當於創建了一個新的線程,只是沒有馬上分配任務
第四個:這個方法就是放一個null的task進Workers Set,而且是在小於corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就返回false,什麽也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為線程池預先啟動corePoolSize個worker等待從workQueue中獲取任務執行

源碼解析:

1、判斷線程池當前是否為可以添加worker線程的狀態,可以則繼續下一步,不可以return false:
A、線程池狀態>shutdown,可能為stop、tidying、terminated,不能添加worker線程
B、線程池狀態==shutdown,firstTask不為空,不能添加worker線程,因為shutdown狀態的線程池不接收新任務
C、線程池狀態==shutdown,firstTask==null,workQueue為空,不能添加worker線程,因為firstTask為空是為了添加一個沒有任務的線程再從workQueue獲取task,而workQueue為空,說明添加無任務線程已經沒有意義
2、線程池當前線程數量是否超過上限(corePoolSize 或 maximumPoolSize),超過了return false,沒超過則對workerCount+1,繼續下一步
3、在線程池的ReentrantLock保證下,向Workers Set中添加新創建的worker實例,添加完成後解鎖,並啟動worker線程,如果這一切都成功了,return true,如果添加worker入Set失敗或啟動失敗,調用addWorkerFailed()邏輯

  其中,線程池會把每個線程封裝成一個Worker對象,由addWorker(Runnable firstTask, boolean core)方法控制,firstTask代表線程池首要執行的任務,core代表是否使用corePoolSize參數作為線程池最大標記。

(3)內部類worker

  Worker類本身既實現了Runnable,又繼承了AbstractQueuedSynchronizer(以下簡稱AQS),所以其既是一個可執行的任務,又可以達到鎖的效果
  new Worker()
    1、將AQS的state置為-1,在runWoker()前不允許中斷
    2、待執行的任務會以參數傳入,並賦予firstTask
    3、用Worker這個Runnable創建Thread

  之所以Worker自己實現Runnable,並創建Thread,在firstTask外包一層,是因為要通過Worker控制中斷,而firstTask這個工作任務只是負責執行業務
  Worker控制中斷主要有以下幾方面:
    1、初始AQS狀態為-1,此時不允許中斷interrupt(),只有在worker線程啟動了,執行了runWoker(),將state置為0,才能中斷
不允許中斷體現在:
    A、shutdown()線程池時,會對每個worker tryLock()上鎖,而Worker類這個AQS的tryAcquire()方法是固定將state從0->1,故初始狀態state==-1時tryLock()失敗,沒發interrupt()
    B、shutdownNow()線程池時,不用tryLock()上鎖,但調用worker.interruptIfStarted()終止worker,interruptIfStarted()也有state>0才能interrupt的邏輯
    2、為了防止某種情況下,在運行中的worker被中斷,runWorker()每次運行任務時都會lock()上鎖,而shutdown()這類可能會終止worker的操作需要先獲取worker的鎖,這樣就防止了中斷正在運行的線程

    Worker實現的AQS為不可重入鎖,為了是在獲得worker鎖的情況下再進入其它一些需要加鎖的方法

  Worker和Task的區別:
  Worker是線程池中的線程,而Task雖然是runnable,但是並沒有真正執行,只是被Worker調用了run方法,後面會看到這部分的實現。

(4)runwork(Worker w)(執行任務)

  1、Worker線程啟動後,通過Worker類的run()方法調用runWorker(this)
  2、執行任務之前,首先worker.unlock(),將AQS的state置為0,允許中斷當前worker線程
  3、開始執行firstTask,調用task.run(),在執行任務前會上鎖wroker.lock(),在執行完任務後會解鎖,為了防止在任務運行時被線程池一些中斷操作中斷
  4、在任務執行前後,可以根據業務場景自定義beforeExecute() 和 afterExecute()方法
  5、無論在beforeExecute()、task.run()、afterExecute()發生異常上拋,都會導致worker線程終止,進入processWorkerExit()處理worker退出的流程
  6、如正常執行完當前task後,會通過getTask()從阻塞隊列中獲取新任務,當隊列中沒有任務,且獲取任務超時,那麽當前worker也會進入退出流程

(5)getTask()(獲取任務)

  1、首先判斷是否可以滿足從workQueue中獲取任務的條件,不滿足return null
   A、線程池狀態是否滿足:
   (a)shutdown狀態 + workQueue為空 或 stop狀態,都不滿足,因為被shutdown後還是要執行workQueue剩余的任務,但workQueue也為空,就可以退出了
   (b)stop狀態,shutdownNow()操作會使線程池進入stop,此時不接受新任務,中斷正在執行的任務,workQueue中的任務也不執行了,故return null返回
   B、線程數量是否超過maximumPoolSize 或 獲取任務是否超時
   (a)線程數量超過maximumPoolSize可能是線程池在運行時被調用了setMaximumPoolSize()被改變了大小,否則已經addWorker()成功不會超過maximumPoolSize
   (b)如果 當前線程數量>corePoolSize,才會檢查是否獲取任務超時,這也體現了當線程數量達到maximumPoolSize後,如果一直沒有新任務,會逐漸終止worker線程直到corePoolSize
  2、如果滿足獲取任務條件,根據是否需要定時獲取調用不同方法:
   A、workQueue.poll():如果在keepAliveTime時間內,阻塞隊列還是沒有任務,返回null
   B、workQueue.take():如果阻塞隊列為空,當前線程會被掛起等待;當隊列中有任務加入時,線程被喚醒,take方法返回任務
  3、在阻塞從workQueue中獲取任務時,可以被interrupt()中斷,代碼中捕獲了InterruptedException,重置timedOut為初始值false,再次執行第1步中的判斷,滿足就繼續獲取任務,不滿足return null,會進入worker退出的流程

(6)processWorkerExit(Worker w,boolean completedAbruptly)(worker線程退出)

  參數:
  worker: 要結束的worker
  completedAbruptly: 是否突然完成(是否因為異常退出)
  執行流程:
    1、worker數量-1
     A、如果是突然終止,說明是task執行時異常情況導致,即run()方法執行時發生了異常,那麽正在工作的worker線程數量需要-1
    B、如果不是突然終止,說明是worker線程沒有task可執行了,不用-1,因為已經在getTask()方法中-1了
    2、從Workers Set中移除worker,刪除時需要上鎖mainlock
    3、tryTerminate():在對線程池有負效益的操作時,都需要“嘗試終止”線程池,大概邏輯:
判斷線程池是否滿足終止的狀態
     A、如果狀態滿足,但還有線程池還有線程,嘗試對其發出中斷響應,使其能進入退出流程
     B、沒有線程了,更新狀態為tidying->terminated
    4、是否需要增加worker線程,如果線程池還沒有完全終止,仍需要保持一定數量的線程
線程池狀態是running 或 shutdown
     A、如果當前線程是突然終止的,addWorker()
    B、如果當前線程不是突然終止的,但當前線程數量 < 要維護的線程數量,addWorker()
故如果調用線程池shutdown(),直到workQueue為空前,線程池都會維持corePoolSize個線程,然後再逐漸銷毀這corePoolSize個線程

  本文轉載自https://www.cnblogs.com/sxkgeek/p/9343519.html

Java線程池的底層實現與使用