【萬字圖文-原創】 | 學會Java中的執行緒池,這一篇也許就夠了!
阿新 • • 發佈:2020-05-24
![](https://img2020.cnblogs.com/blog/799093/202005/799093-20200524082508581-233911575.png)
### 碎碎念
關於JDK原始碼相關的文章這已經是第四篇了,原創不易,粉絲從幾十人到昨天的`666`人,真的很感謝之前幫我轉發文章的一些朋友們。
![關注人數.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074813784-1672563259.png)
從16年開始寫技術文章,到現在部落格園已經發表了`222`篇文章,大多數都是原創,共有800多粉絲,基本上每個月都會有文章的產出。
![部落格園資訊.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074814647-1466626520.png)
![部落格園文章月份記錄.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074814914-841821671.png)
回顧這幾年以來寫作的心路歷程,一直都是偷偷的寫,偷偷的發,害怕被人知道,怕被人罵文章寫的太水(之前心理太脆弱了,哈哈)。後面和cxuan聊過後,他建議我給他投稿試試,於是就有了那一篇的萬字的`AQS`文章。
最近也有好多讀者加到我的微信,問一些文章中的問題,我也都會認真解答,看到有人閱讀我的文章並有所收穫,我真的挺欣慰,這就是寫作的動力吧。
幫助別人的同時也是在幫助自己,自己學的技術和理解的內容都是有侷限性的。通過寫文章結識到了很多朋友,聽聽別人的分析和見解,我也能學到很多。
![答疑.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074815435-1011230728.png)
每次看到部落格中有人留言都很激動,也會第一時間去回覆。感謝下面的公眾號大佬們之前無私的幫助,大家也可以關注一下他們,都是很`nice`的大佬:
**Java建設者、Java團長、程式猿石頭、碼象全棧、Java3y、JAVA小咖秀、Bella的技術輪子、石杉的架構筆記、武培軒、程式通事**
### 前言
Java中的執行緒池已經不是什麼神祕的技術了,相信在看的讀者在專案中也都有使用過。關於執行緒池的文章也是數不勝數,我們站在巨人的肩膀上來再次梳理一下。
本文還是保持原有的風格,圖文解析,儘量做到多畫圖!全文共20000+字,建議收藏後細細品讀,閱讀期間搭配原始碼食用效果更佳!
**讀完此文你將學到:**
1. `ThreadPoolExecutor`中常用引數有哪些?
2. `ThreadPoolExecutor`中執行緒池狀態和執行緒數量如何儲存的?
3. `ThreadPoolExecutor`有哪些狀態,狀態之間流轉是什麼樣子的?
4. `ThreadPoolExecutor`任務處理策略?
5. `ThreadPoolExecutor`常用的拒絕策略有哪些?
6. `Executors`工具類提供的執行緒池有哪些?有哪些缺陷?
7. `ThreadPoolExecutor`核心執行緒池中執行緒預熱功能?
8. `ThreadPoolExecutor`中建立的執行緒如何被複用的?
9. `ThreadPoolExecutor`中關閉執行緒池的方法`shutdown`與`shutdownNow`的區別?
10. `ThreadPoolExecutor`中存在的一些擴充套件點?
11. `ThreadPoolExecutor`支援動態調整核心執行緒數、最大執行緒數、佇列長度等一些列引數嗎?怎麼操作?
**本文原始碼基於JDK1.8**
### 執行緒池基本概念
執行緒池是一種池化思想的產物,如同我們資料庫有連線池、Java中的常量池。執行緒池可以幫助我們管理執行緒、複用執行緒,減少執行緒頻繁新建、銷燬等帶來的開銷。
在Java中是通過`ThreadPoolExecutor`類來建立一個執行緒池的,一般我們建議專案中自己去定義執行緒池,不推薦使用`JDK`提供的工具類`Executors`去構建執行緒池。
檢視**阿里巴巴開發手冊**中也有對執行緒池的一些建議:
**`【強制】`建立執行緒或執行緒池時請指定有意義的執行緒名稱,方便出錯時回溯。**
`正例:`自定義執行緒工廠,並且根據外部特徵進行分組,比如,來自同一機房的呼叫,把機房編號賦值給`whatFeaturOfGroup`
```java
public class UserThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
UserThreadFactory(String whatFeaturOfGroup) {
namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
}
@Override
public Thread newThread(Runnable task) {
String name = namePrefix + nextId.getAndIncrement();
Thread thread = new Thread(null, task, name, 0, false);
System.out.println(thread.getName());
return thread;
}
}
```
**`【強制】`執行緒資源必須通過執行緒池提供,不允許在應用中自行顯式建立執行緒。**
> 說明:執行緒池的好處是減少在建立和銷燬執行緒上所消耗的時間以及系統資源的開銷,解決資源不足的問題。
如果不使用執行緒池,有可能造成系統建立大量同類執行緒而導致消耗完記憶體或者“過度切換”的問題。
**`【強制】`執行緒池不允許使用 Executors 去建立,而是通過 ThreadPoolExecutor 的方式,這
樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。**
> 說明:Executors 返回的執行緒池物件的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允許的請求佇列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2) CachedThreadPool:
允許的建立執行緒數量為 Integer.MAX_VALUE,可能會建立大量的執行緒,從而導致 OOM。
### 執行緒池使用示例
下面是一個自定義的執行緒池,這是之前公司在用的一個執行緒池,修改其中部分屬性和備註做脫敏處理:
```java
public class MyThreadPool {
static final Logger LOGGER = LoggerFactory.getLogger(MyThreadPool.class);
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
private static final String THREAD_POOL_NAME = "MyThreadPool-%d";
private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
.daemon(true).build();
private static final int DEFAULT_SIZE = 500;
private static final long DEFAULT_KEEP_ALIVE = 60L;
private static ExecutorService executor;
private static BlockingQueue executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);
static {
try {
executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT + 2, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, executeQueue, FACTORY);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("MyThreadPool shutting down.");
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.error("MyThreadPool shutdown immediately due to wait timeout.");
executor.shutdownNow();
}
} catch (InterruptedException e) {
LOGGER.error("MyThreadPool shutdown interrupted.");
executor.shutdownNow();
}
LOGGER.info("MyThreadPool shutdown complete.");
}
}));
} catch (Exception e) {
LOGGER.error("MyThreadPool init error.", e);
throw new ExceptionInInitializerError(e);
}
}
private MyThreadPool() {
}
public static boolean execute(Runnable task) {
try {
executor.execute(task);
} catch (RejectedExecutionException e) {
LOGGER.error("Task executing was rejected.", e);
return false;
}
return true;
}
public static Future submitTask(Callable task) {
try {
return executor.submit(task);
} catch (RejectedExecutionException e) {
LOGGER.error("Task executing was rejected.", e);
throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
}
}
}
```
這裡主要就是使用呼叫`ThreadPoolExecutor`建構函式來構造一個執行緒池,指定自定義的`ThreadFactory`,裡面包含我們自己執行緒池的`poolName`等資訊。重寫裡面的`execute()`和`submitTask()`方法。 添加了系統關閉時的鉤子函式`shutDownHook()`,在裡面呼叫執行緒池的`shutdown()`方法,使得系統在退出(**使用ctrl c或者kill -15 pid**)時能夠優雅的關閉執行緒池。
如果有看不懂的小夥伴也沒有關係,後面會詳細分析`ThreadPoolExecutor`中的原始碼,相信看完後面的程式碼再回頭來看這個用例 就完全是小菜一碟了。
### 執行緒池實現原理
通過上面的示例程式碼,我們需要知道建立執行緒池時幾個重要的屬性:
```java
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
```
`corePoolSize`: 執行緒池核心執行緒數量
`maximumPoolSize`: 執行緒池最大執行緒數量
`workQueue`: 執行緒池中阻塞佇列,一般指定佇列大小
執行緒池中資料模型可以簡化成下圖所示,其中`Thread`應該是新增的一個個`Worker`,這裡標註的`Thread`是為了方便理解:
![執行緒池中資料模型.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074815882-63321464.png)
執行緒池中提交一個任務具體執行流程如下圖:
![執行流程.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074816113-1232911533.png)
提交任務時,比較當前執行緒池中執行緒數量和核心執行緒數的大小,根據比較結果走不同的任務處理策略,這個下面會有詳細說明。
執行緒池中核心方法呼叫鏈路:
![方法呼叫鏈路.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074816269-1324483823.png)
### TheadPoolExecutor原始碼初探
`TheadPoolExecutor`中常用屬性和方法較多,我們可以先分析下這些,然後一步步往下深入,常用屬性和方法如下:
![執行緒池常見屬性和方法.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074816413-168325255.png)
具體程式碼如下:
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
}
```
1. ctl
> `ctl`代表當前執行緒池狀態和執行緒池執行緒數量的結合體,高3位標識當前執行緒池執行狀態,後29位標識執行緒數量。ctlOf方法就是rs(執行緒池執行狀態)和wc(執行緒數量)按位或操作
2. COUNT_BITS
> COUNT_BITS = Integer.SIZE - 3 = 29,在ctl中,低29位用於存放當前執行緒池中執行緒的數量
3. CAPACITY
> CAPACITY = (1 << COUNT_BITS) - 1
我們來計算一下:
1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000
(1 << 29) - 1 = 0001 1111 1111 1111 1111 1111 1111 1111
這個屬性是用來執行緒池能裝載執行緒的最大數量,也可以用來做一些位運算操作。
4. 執行緒池幾種狀態
**RUNNING:**
> (1) 狀態說明:執行緒池處在RUNNING狀態時,能夠接收新任務,以及對已新增的任務進行處理。
(2) 狀態切換:執行緒池的初始化狀態是RUNNING。換句話說,執行緒池被一旦被建立,就處於RUNNING狀態,並且執行緒池中的任務數為0
**SHUTDOWN:**
> (1) 狀態說明:執行緒池處在SHUTDOWN狀態時,不接收新任務,但能處理已新增的任務。
(2) 狀態切換:呼叫執行緒池的shutdown()介面時,執行緒池由RUNNING -> SHUTDOWN
**STOP:**
> (1) 狀態說明:執行緒池處在STOP狀態時,不接收新任務,不處理已新增的任務,並且會中斷正在處理的任務。
(2) 狀態切換:呼叫執行緒池的shutdownNow()介面時,執行緒池由(RUNNING or SHUTDOWN ) -> STOP
**TIDYING:**
> (1) 狀態說明:當所有的任務已終止,ctl記錄的"任務數量"為0,執行緒池會變為TIDYING狀態。當執行緒池變為TIDYING狀態時,會執行鉤子函式terminated()。terminated()在ThreadPoolExecutor類中是空的,若使用者想線上程池變為TIDYING時,進行相應的處理;可以通過過載terminated()函式來實現。
(2) 狀態切換:當執行緒池在SHUTDOWN狀態下,阻塞佇列為空並且執行緒池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。
當執行緒池在STOP狀態下,執行緒池中執行的任務為空時,就會由STOP -> TIDYING
**TERMINATED:**
> (1) 狀態說明:執行緒池徹底終止,就變成TERMINATED狀態。
(2) 狀態切換:執行緒池處在TIDYING狀態時,執行完terminated()之後,就會由 TIDYING -> TERMINATED
**狀態的變化流轉:**
![執行緒池的狀態流轉.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074816572-2061701085.png)
5. runStateOf()
> 計算執行緒池執行狀態的,就是計算ctl前三位的數值。`unStateOf() = c & ~CAPACITY,CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111,那麼~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000,它與任何數按位與的話都是隻看這個數前三位
6. workerCountOf()
> 計算執行緒池的執行緒數量,就是看ctl的後29位,workerCountOf() = c & CAPACITY, CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111與任何數按位與,就是看這個數的後29位
7. ctlOf(int rs, int wt)
> 在獲取當前執行緒池ctl的時候會用到,在後面原始碼中會有很多地方呼叫, 傳遞的引數rs代表執行緒池狀態,wt代表當前執行緒次執行緒(worker)的數量
8. runStateLessThan(int c, int s)
> return c < s,c一般傳遞的是當前執行緒池的ctl值。比較當前執行緒池ctl所表示的狀態是否小於某個狀態s
9. runStateAtLeast(int c, int s)
> return c >= s,c一般傳遞的是當前執行緒池的ctl值。比較當前執行緒池ctl所表示的狀態,是否大於等於某個狀態s
10. isRunning(int c)
> c < SHUTDOWN, 判斷當前執行緒池是否是RUNNING狀態,因為只有RUNNING的值小於SHUTDOWN
11. compareAndIncrementWorkerCount()/compareAndDecrementWorkerCount()
> 使用CAS方式 讓ctl值分別加一減一 ,成功返回true, 失敗返回false
12. decrementWorkerCount()
> 將ctl值減一,這個方法用了do...while迴圈,直到成功為止
13. completedTaskCount
> 記錄執行緒池所完成任務總數 ,當worker退出時會將 worker完成的任務累積到completedTaskCount
14. Worker
> 執行緒池內部類,繼承自AQS且實現Runnable介面。Worker內部有一個Thread thread是worker內部封裝的工作執行緒。Runnable firstTask用來接收使用者提交的任務資料。在初始化Worker時候會設定state為-1(初始化狀態),通過threadFactory建立一個執行緒。
15. ThreadPoolExecutor初始化引數
> corePoolSize: 核心執行緒數限制
maximumPoolSize: 最大執行緒限制
keepAliveTime: 非核心的空閒執行緒等待新任務的時間 unit: 時間單位。配合allowCoreThreadTimeOut也會清理核心執行緒池中的執行緒。
workQueue: 任務佇列,最好選用有界佇列,指定佇列長度
threadFactory: 執行緒工廠,最好自定義執行緒工廠,可以自定義每個執行緒的名稱
handler: 拒絕策略,預設是AbortPolicy
### execute()原始碼分析
當有任務提交到執行緒池時,就會直接呼叫`ThreadPoolExecutor.execute()`方法,執行流程如下:
![執行流程.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074816733-172366465.png)
從流程圖可看,新增任務會有三個分支判斷,原始碼如下:
`java.util.concurrent.ThreadPoolExecutor.execute()`:
```java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
```
`c`在這裡代表執行緒池`ctl`的值,包含工作任務數量以及執行緒池的狀態,上面有解釋過。
**接著看下面幾個分支程式碼:**
**分支一:** `if (workerCountOf(c) < corePoolSize)` ,條件成立表示當前執行緒數量小於核心執行緒數,此次提交任務,直接建立一個新的`worker`。
```java
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
```
如果執行緒數小於核心執行緒數,執行`addWorker`操作,這個後面會講這個方法的細節,如果新增成功則直接返回,失敗後會重新計算`ctl`的值,然後執行分支二。
針對`addWorker()`執行失敗的情況,有以下幾種可能:
1. 存在併發情況,`execute()`方法是可能有多個執行緒同時呼叫的,當多個執行緒同時**workerCountOf(c) < corePoolSize**成立後,就會向執行緒池中建立`worker`,這個時候執行緒池的核心執行緒數可能已經達到,在`addWorker`中還會再次判斷,所以會有任務新增失敗。
![addWorker()失敗場景一.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074816965-1628045832.png)
2. 當前執行緒池狀態發生改變,例如執行緒A執行`addWorker()`方法時,執行緒B修改執行緒池狀態,導致執行緒池不是`RUNNING`狀態,此時執行緒A執行`addWorker()`就有可能失敗。
![addWorker()失敗場景二.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074817186-1280882923.png)
**分支二:** `if (isRunning(c) && workQueue.offer(command)) {}`
通過分支一流程的分析,我們可以知道執行到這個分支說明**當前執行緒數量已經達到`corePoolSize`或者`addWorker()`執行失敗,我們先看看分支二執行流程:
![超過核心執行緒數往佇列中新增任務流程圖.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074817433-1242540246.png)
首先判斷當前執行緒池是否處於`RUNNING`狀態,如果是則嘗試將`task`放入到`workQueue`中,`workQueue`是我們在初始化`ThreadPoolExecutor`時傳入進來的阻塞佇列。
如果當前任務成功新增到阻塞佇列中,再次獲取`ctl`賦值給`recheck`變數,然後執行:
```java
if (!isRunning(recheck) && remove(command))
reject(command);
```
再次判斷當前執行緒池是否為`RUNNINT`狀態,如果不是則說明提交任務到佇列之後,執行緒池狀態被其他執行緒給修改了,比如呼叫`shutdown()/shutdownNow()`等。這種情況就需要把剛剛提交到佇列中的的任務刪除掉。
再看下remove()方法:
```java
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate();
return removed;
}
```
如果任務提交到佇列之後,執行緒池中的執行緒還未將這個任務消費,那麼就可以`remove`成功,呼叫`reject()`方法來執行拒絕策略。
如果在改變執行緒池狀態之前,佇列中的資料已經被消費了,此時`remove()`就會失敗。
![移除佇列中Task任務.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074817591-175039698.png)
接著走`else if`中的邏輯:
```java
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
```
走這個`else if`邏輯有兩種可能,執行緒池是`RUNNING`狀態或者執行緒池狀態被改變且`workQueue`中新增的任務已經被消費導致`remove()`失敗。
如果是`RUNNING`狀態,執行緒池中的執行緒數量是0,此時`workQueue`中還有待執行的任務,就需要新增一個`worker`(`addWorker`裡面會有建立執行緒的操作),繼續消費`workqueue`中的任務。
![新增新任務.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074817741-1692595423.png)
這裡要注意一下`addWorker(null, false)`,也就是建立一個執行緒,但並沒有傳入任務,因為任務已經被新增到`workQueue`中了,所以`worker`在執行的時候,會直接從`workQueue`中獲取任務。在`workerCountOf(recheck) == 0`時執行`addWorker(null, false)`也是為了保證執行緒池在`RUNNING`狀態下必須要有一個執行緒來執行任務,可以理解為一種擔保兜底機制。
至於執行緒池中執行緒為何可以為0?這個如果我們設定了`allowCoreThreadTimeOut=true`,那麼核心執行緒也是允許被回收的,後面`getTask()`中程式碼有提及。
**分支三:** `else if (!addWorker(command, false)) {}`
通過分支一和分之二的分析,進入這個分支的前置條件:執行緒數超過核心執行緒數且`workQueue`中資料已滿。
`else if (!addWorker(command, false))`,執行新增`worker`操作,如果執行失敗就直接走`reject()`拒絕策略。這裡新增失敗可能是執行緒數已經超過了`maximumPoolSize`。
![分支三執行流程.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074817923-142803787.png)
### addWorker()原始碼分析
上面分析提交任務的方法`execute()`時多次用到`addWorker`方法,接收任務後將任務新增到`Worker`中。
`Worker`是`ThreadPoolExecutor`中的內部類,繼承自`AQS`且實現了`Runnable`介面。 類中包含`Thread thread`,它是`worker`內部封裝的工作執行緒,還有`firstTask`屬性,它是一個可執行的`Runnable`物件。在`Worker`的建構函式中,使用執行緒工廠建立了一個執行緒,當`thread`啟動的時候,會以`worker.run()`為入口啟動執行緒,這裡會直接呼叫到`runWorker()`中。
```java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
```
流程如下圖:
![新增Worker.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074818173-295957714.png)
這裡再回頭看下**addWorker(Runnable firstTask, boolean core)** 方法,這個方法主要是新增一個`Worker`到執行緒池中並執行,`firstTask`引數用於指定新增的執行緒執行的第一個任務,`core`引數為true表示在新增執行緒時會判斷當前活動執行緒數是否少於`corePoolSize`,`false`表示在新增執行緒時會判斷當前活動執行緒數是否少於`maximumPoolSize`
**addWorker方法整體執行流程圖如下:**
![addWorker流程圖.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074818370-725948824.png)
接著看下原始碼:
`java.util.concurrent.ThreadPoolExecutor.addWorker()`:
```java
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs > = SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
```
這裡是有兩層`for`迴圈,外層迴圈主要是判斷執行緒池的狀態,如果狀態不合法就直接返回`false`.
只有兩種情況屬於合法狀態:
1. `RUNNING`狀態
2. `SHUTDOWN`狀態時,佇列中還有未處理的任務,且提交的任務為空。`SHUTDOWN`含義就是不再接收新任務,可以繼續處理阻塞佇列的任務。
第二層迴圈是通過`CAS`操作更新`workCount`數量,如果更新成功則往執行緒池中中新增執行緒,這個所謂的執行緒池就是一個`HashSet`陣列。新增失敗時判斷失敗原因,`CAS`失敗有兩種原因:執行緒池狀態被改變或者併發情況修改執行緒池中`workCount`數量,這兩種情況都會導致`ctl`值被修改。如果是第二種原因導致的失敗,繼續自旋更新`workCount`數量。
接著繼續分析迴圈內部的實現,先看看第一層迴圈:`c`代表執行緒池`ctl`值,`rs`代表執行緒池執行狀態。
```java
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
```
**條件一:**`rs >= SHUTDOWN` 成立, 說明當前執行緒池狀態不是`RUNNING`狀態
**條件二:** `!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())`
我們之前提到過,建立任務有兩種情況:
1)`RUNNING`狀態可以提交任務,
2)`SHUTDOWN`狀態下如果傳遞的任務是空且阻塞佇列中還有任務未處理的情況才是允許建立任務繼續處理的,因為阻塞佇列中的任務仍然需要繼續處理。
上面的條件一和條件二就是處理`SHUTDOWN`狀態下任務建立操作的判斷。
接著分析第二層迴圈,先是判斷執行緒池`workCount`數量是否大於可建立的最大值,或者是否超過了核心執行緒數/最大執行緒數,如果是則直接返回,`addWorker()`操作失敗。
接著使用`compareAndIncrementWorkerCount(c)`將執行緒池中`workCount+1`,這裡使用的是`CAS`操作,如果成功則直接跳出最外層迴圈。
```java
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
```
如果`CAS`失敗,說明此時有競爭,會重新獲取`ctl`的值,判斷競爭失敗的原因是新增`workCount`數量還是修改執行緒池狀態導致的,如果執行緒池狀態未發生改變,就繼續迴圈嘗試`CAS`增加`workCount`數量,接著看迴圈結束後邏輯:
```java
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
```
這裡`workerStarted`代表`worker`是否已經啟動,`workerAdded`代表建立的`worker`是否新增到池子中,這裡所謂的池子就是全域性定義的一個`HashSet`結構的`workers`變數。
接著根據傳遞的`firstTask`來構建一個`Worker`,在`Worker`的構造方法中也會通過`ThreadFactory`建立一個執行緒,這裡判斷`t != null`是因為使用者可以自定義`ThreadFactory`,如果這裡使用者不是建立執行緒而是直接返回`null`則會出現一些問題,所以需要判斷一下。
```java
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
}
```
在往池子中新增`Worker`的時候,是需要先加鎖的,因為針對全域性的`workers`操作並不是執行緒安全的。
```java
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
```
繼續看下面程式碼,`rs`代表當前執行緒池的狀態,這裡還是判斷執行緒池的狀態,如果`rs < SHUTDOWN`代表執行緒池狀態是`RUNNING`狀態,此時可以直接操作。
如果是`SHUTDOWN`狀態,需要滿足`firstTask == null`才可以繼續操作。因為在`SHUTDOWN`狀態時不會再新增新的任務,但還是可以繼續處理`workQueue`中的任務。
`t.isAlive()` 當執行緒`start`後,執行緒`isAlive`會返回`true`,這裡還是防止自定義的`ThreadFactory`建立執行緒返回給外部之前,將執行緒`start`了,由此可見`Doug lea`考慮問題真的很全面。
```java
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
}
```
接著將建立的`Worker`新增到`workers`集合中,設定`largestPoolSize`,這個屬性是執行緒池生命週期內執行緒數最大值,一般是做統計資料用的。 最後修改`workerAdded = true`,代表當前提交的任務所建立的`Worker`已經新增到池子中了。
新增`worker`成功後,呼叫執行緒的`start()`方法啟動執行緒,因為`Worker`中重寫了`run()`方法,最後會執行`Worker.run()`。最後設定`workerStarted = true`後釋放全域性鎖。
```java
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
orkerAdded = true;
}
```
這裡再回頭看看`workerAdded = false`的情形,如果執行緒池在`lock`之前,狀態發生了變化,導致新增失敗。此時`workerAdded`也會為`false`,最後執行`addWorkerFailed(work)`操作,這個方法是將`Work`從`workers`中移除掉,然後將`workCount`數量減一,最後執行`tryTerminate(`)來嘗試關閉執行緒池,這個方法後面會細說。
### runWorker()原始碼分析
在`Worker`類中的`run`方法呼叫了`runWorker`來執行任務。上面`addWorker()`方法正常的執行邏輯會建立一個`Worker`,然後啟動`Worker`中的執行緒,這裡其實就會執行到`runWorker`方法。
![方法呼叫關係.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074818539-106423672.png)
`runWorker`的執行邏輯很簡單,啟動一個執行緒,執行當前傳遞的`task`任務,執行完後又不斷的從`workQueue`中獲取任務繼續執行,如果當前`workCount`數量小於核心執行緒數且佇列中沒有了任務,當前執行緒會被阻塞,這個就是`getTask()`的邏輯,一會會講到。
如果當前執行緒數大於核心執行緒數且佇列中沒有任務,就會返回`null`,在`runWorker`這邊退出迴圈,回收多餘的`worker`資料。
![runWorker流程.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074818707-1437554613.png)
原始碼如下:
```java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
```
這裡`w.unlock()`是為了初始化當前`Work`中`state==0`,然後設定獨佔執行緒為`null`,因為在`shutDown()`方法中會嘗試獲取`Worker`中的鎖,如果獲取成功代表當前執行緒沒有被加鎖處於空閒狀態,給當前執行緒一箇中斷訊號。所以這裡在執行執行緒任務的時候需要加鎖,防止呼叫`shutDown()`的時候給當前`worker`執行緒一箇中斷訊號。
判斷`task`是否為空,如果是一個空任務,那麼就去`workQueue`中獲取任務,如果兩者都為空就會退出迴圈。
```java
while (task != null || (task = getTask()) != null) {}
```
最核心的就是呼叫`task.run()`啟動當前任務,這裡面還有兩個可擴充套件的方法,分別是**beforeExecute()/afterExecute()**,我們可以在任務執行前和執行後分別自定義一些操作,其中`afterExecute()`可以接收到任務丟擲的異常資訊,方便我們做後續處理。
```java
while (task != null || (task = getTask()) != null) {
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
```
如果退出迴圈,說明`getTask()`方法返回`null`。會執行到`finally`中的`processWorkerExit(w, completedAbruptly)`方法,此方法是用來清理執行緒池中新增的`work`資料,`completedAbruptly=true`代表是異常情況下退出。
```java
try {
while (task != null || (task = getTask()) != null) {
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
```
`runWorker()`中只是啟動了當前執行緒工作,還需要源源不斷通過`getTask()`方法從`workQueue`來獲取任務執行。在`workQueue`沒有任務的時候,根據執行緒池`workCount`和核心執行緒數的對比結果來使用`processWorkerExit()`執行清理工作。
### getTask()原始碼分析
`getTask`方法用於從阻塞佇列中獲取任務,如果當前執行緒小於核心執行緒,那麼當阻塞佇列中沒有任務時就會阻塞,反之會等待`keepAliveTime`後返回。
這個就是`keepAliveTime`的使用含義:非核心的空閒執行緒等待新任務的時間,當然如果這裡設定了`allowCoreThreadTimeOut=true`也會回收核心執行緒。
具體程式碼如下:
```java
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
```
這裡核心程式碼就是從`workQueue`中取任務,採用`poll`還是`take`取決於`allowCoreThreadTimeOut`和執行緒數量,`allowCoreThreadTimeOut`在構造`ThreadLocalExecutor`後設置的,預設為false。如果設定為`true`則代表核心執行緒數下的執行緒也是可以被回收的。如果使用`take`則表明`workQueue`中沒有任務當前執行緒就會被阻塞掛起,直到有了新的任務才會被喚醒。
![workQueue資料取出流程.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074818893-1909400254.png)
在這裡擴充套件下阻塞佇列的部分方法的含義,這裡主要是看`poll()`和`take()`的使用區別:
**阻塞佇列插入方法:**
>**boolean add**(E e):佇列沒有滿,則插入資料並返回true;佇列滿時,丟擲異常 java.lang.IllegalStateException: Queue full。
>**boolean offer**(E e):佇列沒有滿,則插入資料並返回true;佇列滿時,返回false。
>**void put**(E e):佇列沒有滿,則插入資料;佇列滿時,阻塞呼叫此方法執行緒,直到佇列有空閒空間時此執行緒進入就緒狀態。
>**boolean offer**(E e, long timeout, TimeUnit unit):佇列沒有滿,插入資料並返回true;佇列滿時,阻塞呼叫此方法執行緒,若指定等待的時間內還不能往佇列中插入資料,返回false。
**阻塞佇列移除(獲取)方法:**
> **E remove()**:佇列非空,則以FIFO原則移除資料,並返回該資料的值;佇列為空,丟擲異常 java.util.NoSuchElementException。
> **E poll():** 佇列非空,移除資料,並返回該資料的值;佇列為空,返回null。
> **E take():** 佇列非空,移除資料,並返回該資料的值;佇列為空,阻塞呼叫此方法執行緒,直到佇列為非空時此執行緒進入就緒狀態。
> **E poll**(long timeout, TimeUnit unit):佇列非空,移除資料,並返回該資料的值;佇列為空,阻塞呼叫此方法執行緒,若指定等待的時間內佇列都沒有資料可取,返回null。
**阻塞佇列檢查方法:**
> **E element():** 佇列非空,則返回隊首元素;佇列為空,丟擲異常 java.util.NoSuchElementException。
> **E peek():** 佇列非空,則返回隊首元素;佇列為空,返回null。
### processWorkerExit()原始碼分析
此方法的含義是清理當前執行緒,從執行緒池中移除掉剛剛新增的`worker`物件。
![processWorkerExit執行前置條件.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074819120-395517420.png)
執行`processWorkerExit()`代表在`runWorker()`執行緒跳出了當前迴圈,一般有兩種情況:
1. `task.run()`內部丟擲異常,直接結束迴圈,然後執行`processWorkerExit()`
2. `getTask()`返回為空,代表執行緒數量大於核心數量且`workQueue`中沒有任務,此時需要執行`processWorkerExit()`來清理多餘的`Worker`物件
```java
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)、
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
```
針對於執行緒池`workers`的操作都會進行加鎖處理,然後將當前`Worker`從池子中移除,累加當前執行緒池完成的任務總數`completedTaskCount`。
接著呼叫`tryTerminate()`嘗試關閉執行緒池,這個方法後面有詳細說明。
接著判斷`if (runStateLessThan(c, STOP)) {}`,含義是當前執行緒池狀態小於`STOP`,即當前執行緒池狀態當前執行緒池狀態為 `RUNNING` 或 `SHUTDOWN`,判斷當前執行緒是否是正常退出。如果當前執行緒是正常退出,那麼`completedAbruptly=false`,接著判斷執行緒池中是否還擁有足夠多的的執行緒,因為異常退出可能導致執行緒池中執行緒數量不足,此時就要執行`addWorker()`為執行緒池新增新的`worker`資料,看下面的詳細分析:
**執行最後的addWorke()有三種可能:**
1)當前執行緒在執行`task`時 發生異常,這裡一定要建立一個新`worker`頂上去。
2)如果`!workQueue.isEmpty()`說明任務佇列中還有任務,這種情況下最起碼要留一個執行緒,因為當前狀態為 **RUNNING || SHUTDOWN**這是前提條件。
3)**當前執行緒數量 < corePoolSize**值,此時會建立執行緒,維護執行緒池數量在`corePoolSize`個水平。
### tryTerminate()原始碼分析
上面移除`Worker`的方法中有一個`tryTerminate()`方法的呼叫,這個方法是根據執行緒池狀態嘗試關閉執行緒池。
執行流程如下:
![tryTerminate()流程.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074819267-437237109.png)
實現原始碼如下:
```java
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
```
首先是判斷執行緒池狀態:
條件一:isRunning(c) 成立,直接返回就行,執行緒池很正常!
條件二:runStateAtLeast(c, TIDYING) 說明 已經有其它執行緒 在執行 TIDYING -> TERMINATED狀態了,當前執行緒直接回去。
條件三:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())
SHUTDOWN特殊情況,如果是這種情況,直接回去。得等佇列中的任務處理完畢後,再轉化狀態。
接著執行:
```java
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
```
走到這個邏輯,說明**執行緒池狀態 >= STOP或者執行緒池狀態為`SHUTDOWN`且佇列已經空了**
當前執行緒池中的執行緒數量 > 0,呼叫`interruptIdleWorkers()`中斷一個空閒執行緒,然後返回。我們來分析下,在`getTask()`返回為空時會執行退出邏輯`processWorkerExit()`,這裡就會呼叫`tryTerminate()`方法嘗試關閉執行緒池。
如果此時執行緒池狀態滿足**執行緒池狀態 >= STOP或者執行緒池狀態為`SHUTDOWN`且佇列已經空了**,如果此時執行緒池中執行緒數不為0,就會中斷一個空閒執行緒。
為什麼這裡只中斷一個執行緒呢?這裡的設計思想是,如果執行緒數量特別多的話,只有一個執行緒去做喚醒空閒`worker`的任務可能會比較吃力,所以,就給了每個 被喚醒的`worker`執行緒 ,在真正退出之前協助 喚醒一個空閒執行緒的任務,提供吞吐量的一種常用手段。
我們順便看下`interruptIdleWorkers()`原始碼:
```java
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
```
遍歷`workers`,如果執行緒是空閒狀態**(空閒狀態:queue.take()和queue.poll()返回空)**,則給其一箇中斷訊號,如果是處於`workQueue`阻塞的執行緒,會被喚醒,喚醒後,進入下一次自旋時,可能會`return null`執行退出相關的邏輯,接著又會呼叫`processWorkerExit()->tryTerminate()`,回到上面場景,當前執行緒退出的時候還是會繼續喚醒下一個空現執行緒。
接著往下看`tryTerminate`的剩餘邏輯:
```java
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
```
執行到這裡的執行緒是誰?
`workerCountOf(c) == 0` 時,會來到這裡。
最後一個退出的執行緒。 在 (執行緒池狀態 >= STOP || 執行緒池狀態為 SHUTDOWN 且 佇列已經空了)
執行緒喚醒後,都會執行退出邏輯,退出過程中 會 先將 workerCount計數 -1 => ctl -1。
呼叫`tryTerminate` 方法之前,已經減過了,所以0時,表示這是最後一個退出的執行緒了。
獲取全域性鎖,進行加鎖操作,通過`CAS`設定執行緒池狀態為`TIDYING`狀態,設定成功則執行`terminated()`方法,這也是一個自定義擴充套件的方法,當執行緒池中止的時候會呼叫此方法。
最後設定執行緒池狀態為`TERMINATED`狀態,喚醒呼叫`awaitTermination()`方法的執行緒。
### awaitTermination()原始碼分析
該方法是判斷執行緒池狀態是否達到`TERMINATED`,如果達到了則直接返回`true`,沒有達到則會`await`掛起當前執行緒指定的時間。
```java
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
```
在每次執行`tryTerminate()`後會喚醒所有被`await`的執行緒,繼續判斷執行緒池狀態。
### shutDown()/shutDownNow()原始碼分析
`shutDown`和`shutDown()`方法都是直接改變執行緒池狀態的方法,一般我們在系統關閉之前會呼叫此方法優雅的關閉執行緒池。
```java
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List shutdownNow() {
List tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
```
`shutdown`和`shutdownNow`方法呼叫差不多,只是`shutdown`是將執行緒池狀態設定為`SHUTDOWN`,`shutdownNow`是將執行緒池狀態設定為`STOP`。
`shutdownNow`會返回所有未處理的`task`集合。
來看看它們共同呼叫的一些方法:
```java
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
```
這個方法是設定執行緒池狀態為指定狀態,`runStateAtLeast(c, targetState)`,判斷當前執行緒池`ctl`值,如果小於`targetState`則會往後執行。
**ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))**,通過`CAS`指令,修改`ctl`中執行緒池狀態為傳入的`targetState`。
```java
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
```
`interruptIdleWorkers`含義是為空閒的執行緒設定中斷標識,這裡要清楚`worker`什麼時候空閒?我們在上面講解`runWorker()`方法時,執行`task.run()`之前,要針對`Worker`物件加鎖,設定`Worker`中的`state`值為1,防止執行的`worker`被新增中斷標識。接著執行`getTask()`方法,獲取阻塞佇列中的任務,如果是`queue.take()`則會阻塞掛起當前執行緒,釋放鎖,此時執行緒處於空閒狀態。如果是`queue.pool()`返回為空,`runWorker()`會釋放鎖,此時執行緒也是空閒狀態。
執行`interrupt()`後處於`queue`阻塞的執行緒,會被喚醒,喚醒後,進入下一次自旋判斷執行緒池狀態是否改變,如果改變可能直接返回空,這裡具體參看`runWorker()`和`getTask()`方法。
`onShutdown()`也是一個擴充套件方法,需要子類去重寫,這裡代表當執行緒池關閉後需要做的事情。`drainQueue()`方法是獲取`workQueue`中現有的的任務列表。
### 問題回顧
1. `ThreadPoolExecutor`中常用引數有哪些?
上面介紹過了,參見的引數是指ThreadPoolExecutor的構造引數,一般面試的時候都會先問這個,要解釋每個引數的含義及作用。
2. `ThreadPoolExecutor`中執行緒池狀態和執行緒數量如何儲存的?
通過AtomicInteger型別的變數ctl來儲存,前3位代表執行緒池狀態,後29位代表執行緒池中執行緒數量。
3. `ThreadPoolExecutor`有哪些狀態,狀態之間流轉是什麼樣子的?
RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
![執行緒池的狀態流轉.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074816572-2061701085.png)
4. `ThreadPoolExecutor`任務處理策略?
這個問題就是考察`execute()`的執行過程,只要看過原始碼就不會有問題。
![執行流程.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074816113-1232911533.png)
5. `ThreadPoolExecutor`常用的拒絕策略有哪些?
策略處理該任務,執行緒池提供了4種策略:
1)AbortPolicy:直接丟擲異常,預設策略
2)CallerRunsPolicy:用呼叫者所在的執行緒來執行任務
3)DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,並執行當前任務
4)DiscardPolicy:直接丟棄任務
當然執行緒池是支援自定義拒絕策略的,需要實現RejectedExecutionHandler介面中rejectedExecution()方法即可。
6. `Executors`工具類提供的執行緒池有哪些?有哪些缺陷?
1) FixedThreadPool 和 SingleThreadPool:允許的請求佇列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2) CachedThreadPool:允許的建立執行緒數量為 Integer.MAX_VALUE,可能會建立大量的執行緒,從而導致 OOM。
所以阿里巴巴也建議我們要自定義執行緒池核心執行緒數以及阻塞佇列的長度。
7. `ThreadPoolExecutor`核心執行緒池中執行緒預熱功能?
在建立執行緒池後,可以使用prestartAllCoreThreads()來預熱核心執行緒池。
```java
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
```
8. `ThreadPoolExecutor`中建立的執行緒如何被複用的?
這個主要是看runWorker()和getTask()兩個方法的執行流程,當執行任務時呼叫runWorker()方法,執行完成後會繼續從workQueue中獲取任務繼續執行,已達到執行緒複用的效果,當然這裡還有一些細節,可以回頭看看上面的原始碼解析。
9. `ThreadPoolExecutor`中關閉執行緒池的方法`shutdown`與`shutdownNow`的區別?
最大的區別就是shutdown()會將執行緒池狀態變為SHUTDOWN,此時新任務不能被提交,workQueue中還存有的任務可以繼續執行,同時會像執行緒池中空閒的狀態發出中斷訊號。
shutdownNow()方法是將執行緒池的狀態設定為STOP,此時新任務不能被提交,執行緒池中所有執行緒都會收到中斷的訊號。如果執行緒處於wait狀態,那麼中斷狀態會被清除,同時丟擲InterruptedException。
10. `ThreadPoolExecutor`中存在的一些擴充套件點?
鉤子方法:
1)beforeExecute()/afterExecute():runWorker()中執行緒執行前和執行後會呼叫的鉤子方法
2)terminated:執行緒池的狀態從TIDYING狀態流轉為TERMINATED狀態時terminated方法會被呼叫的鉤子方法。
3)onShutdown:當我們執行shutdown()方法時預留的鉤子方法。
11. `ThreadPoolExecutor`支援動態調整核心執行緒數、最大執行緒數、佇列長度等一些列引數嗎?怎麼操作?
執行期間可動態調整引數的方法:
1)setCorePoolSize():動態調整執行緒池核心執行緒數
2)setMaximumPoolSize():動態調整執行緒池最大執行緒數
3)setKeepAliveTime(): 空閒執行緒存活時間,如果設定了allowsCoreThreadTimeOut=true,核心執行緒也會被回收,預設只回收非核心執行緒
4)allowsCoreThreadTimeOut():是否允許回收核心執行緒,如果是true,在getTask()方法中,獲取workQueue就採用workQueue.poll(keepAliveTime),如果超過等待時間就會被回收。
### 總結
這篇執行緒池原始碼覆蓋到了`ThreadPoolExecutor`中大部分程式碼,我相信認真閱讀完後肯定會對執行緒池有更深刻的理解。如有疑問或者建議可關注公眾號給我私信,我都會一一為大家解答。
另外推薦一個我的up主朋友,他自己錄製了好多學習視訊並分享在B站上了,大家有時間可以看一下(PS:非恰飯非利益相關,良心推薦):[小劉講原始碼-B站UP主][1]
[1]:https://space.bilibili.com/457326371
![原創乾貨分享.png](https://img2020.cnblogs.com/other/799093/202005/799093-20200524074819905-744340834.png)