建立執行緒那麼容易,為什麼非要讓我使用執行緒池?(深深深入剖析)
一、概述
1、問題
先看我們遇到的問題:我們建立執行緒的方式很簡單,new Thread(() -> {...})
,就是因為這麼簡單粗暴的方式,才帶來了致命的問題。首先執行緒的建立和銷燬都是很耗時很浪費效能的操作,你用執行緒為了什麼?為了就是非同步,為了就是提升效能。簡單的new三五個Thread還好,我需要一千個執行緒呢?你也for迴圈new1000個Thread嗎?用完在銷燬掉。那這一千個執行緒的建立和銷燬的效能是很糟糕的!
2、解決
為了解決上述問題,執行緒池誕生了,執行緒池的核心思想就是:執行緒複用。也就是說執行緒用完後不銷燬,放到池子裡等著新任務的到來,反覆利用N個執行緒來執行所有新老任務。這帶來的開銷只會是那N個執行緒的建立,而不是每來一個請求都帶來一個執行緒的從生到死的過程。
二、執行緒池
1、概念
還說個雞兒,上面的問題解決方案已經很通俗易懂了。針對特級小白我在舉個生活的案例:
比如找工作面試,涉及到兩個角色:面試官、求職者。求職者成千上萬,每來一個求職者都要為其單獨新找一個面試官來面試嗎?顯然不是,公司都有面試官池子,比如:A、B、C你們三就是這公司的面試官了,有人來面試你們三輪流面就行了。可能不是很恰當,含義就是說我並不需要為每個請求(求職者)都單獨分配一個新的執行緒(面試官) ,而是我固定好幾個執行緒,由他們幾個來處理所有請求。不會反覆建立銷燬。
2、引數
2.1、原始碼
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
2.2、解釋
corePoolSize
:核心執行緒數
執行緒池在完成初始化之後,預設情況下,執行緒池中不會有任何執行緒,執行緒池會等有任務來的時候再去建立執行緒。核心執行緒創建出來後即使超出了執行緒保持的存活時間配置也不會銷燬,核心執行緒只要建立就永駐了,就等著新任務進來進行處理。
maximumPoolSize
:最大執行緒數
核心執行緒忙不過來且任務儲存佇列滿了的情況下,還有新任務進來的話就會繼續開闢執行緒,但是也不是任意的開闢執行緒數量,執行緒數(包含核心執行緒)達到
maximumPoolSize
後就不會產生新執行緒了,就會執行拒絕策略。
keepAliveTime
如果執行緒池當前的執行緒數多於
corePoolSize
,那麼如果多餘的執行緒空閒時間超過keepAliveTime
,那麼這些多餘的執行緒(超出核心執行緒數的那些執行緒)就會被回收。
unit
:執行緒保持的存活時間單位
比如:
TimeUnit.MILLISECONDS
、TimeUnit.SECONDS
workQueue
:任務儲存佇列
核心執行緒數滿了後還有任務繼續提交到執行緒池的話,就先進入
workQueue
。
workQueue
通常情況下有如下選擇:
LinkedBlockingQueue
:無界佇列,意味著無限制,其實是有限制,大小是int的最大值。也可以自定義大小。
ArrayBlockingQueue
:有界佇列,可以自定義大小,到了閾值就開啟新執行緒(不會超過maximumPoolSize
)。
SynchronousQueue
:Executors.newCachedThreadPool();
預設使用的佇列。也不算是個佇列,他不沒有儲存元素的能力。一般都採取
LinkedBlockingQueue
,因為他也可以設定大小,可以取代ArrayBlockingQueue
有界佇列。
threadFactory
:當執行緒池需要新的執行緒時,會用threadFactory
來生成新的執行緒
預設採用的是
DefaultThreadFactory
,主要負責建立執行緒。newThread()
方法。創建出來的執行緒都在同一個執行緒組且優先順序也是一樣的。
handler
:拒絕策略,任務量超出執行緒池的配置限制或執行shutdown還在繼續提交任務的話,會執行handler
的邏輯。
預設採用的是
AbortPolicy
,遇到上面的情況,執行緒池將直接採取直接拒絕策略,也就是直接丟擲異常。RejectedExecutionException
3、原理
3.1、原理
-
執行緒池剛啟動的時候核心執行緒數為0
-
丟任務給執行緒池的時候,執行緒池會新開啟執行緒來執行這個任務
-
如果執行緒數小於
corePoolSize
,即使工作執行緒處於空閒狀態,也會建立一個新執行緒來執行新任務 -
如果執行緒數大於或等於
corePoolSize
,則會將任務放到workQueue
,也就是任務佇列 -
如果任務佇列滿了,且執行緒數小於
maximumPoolSize
,則會建立一個新執行緒來執行任務 -
如果任務佇列滿了,且執行緒數大於或等於
maximumPoolSize
,則直接採取拒絕策略
3.2、圖解
3.3、舉例
執行緒池引數配置:核心執行緒5個,最大執行緒數10個,佇列長度為100。
那麼執行緒池啟動的時候不會建立任何執行緒,假設請求進來6個,則會建立5個核心執行緒來處理五個請求,另一個沒被處理到的進入到佇列。這時候有進來99個請求,執行緒池發現核心執行緒滿了,佇列還在空著99個位置,所以會進入到佇列裡99個,加上剛才的1個正好100個。這時候再次進來5個請求,執行緒池會再次開闢五個非核心執行緒來處理這五個請求。目前的情況是執行緒池裡執行緒數是10個RUNNING狀態的,佇列裡100個也滿了。如果這時候又進來1個請求,則直接走拒絕策略。
3.4、原始碼
public void execute(Runnable command) { int c = ctl.get(); // workerCountOf(c):工作執行緒數 // worker數量比核心執行緒數小,直接建立worker執行任務 if (workerCountOf(c) < corePoolSize) { // addWorker裡面負責建立執行緒且執行任務 if (addWorker(command, true)) return; c = ctl.get(); } // worker數量超過核心執行緒數,任務直接進入佇列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 執行緒池狀態不是RUNNING狀態,說明執行過shutdown命令,需要對新加入的任務執行reject()操作。 // 這兒為什麼需要recheck,是因為任務入佇列前後,執行緒池的狀態可能會發生變化。 if (! isRunning(recheck) && remove(command)) reject(command); // 這兒為什麼需要判斷0值,主要是線上程池構造方法中,核心執行緒數允許為0 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果執行緒池不是執行狀態,或者任務進入佇列失敗,則嘗試建立worker執行任務。 // 這兒有3點需要注意: // 1. 執行緒池不是執行狀態時,addWorker內部會判斷執行緒池狀態 // 2. addWorker第2個引數表示是否建立核心執行緒 // 3. addWorker返回false,則說明任務執行失敗,需要執行reject操作 else if (!addWorker(command, false)) reject(command); }
4、Executors
4.1、概念
首先這不是一個執行緒池,這是執行緒池的工具類,他能方便的為我們建立執行緒。但是阿里巴巴開發手冊上說明不推薦用Executors建立執行緒池,推薦自己定義執行緒池。這是因為Executors建立的任何一種執行緒池都可能引發血案,具體是什麼問題下面會說。
4.2、固定執行緒數
4.2.1、描述
核心執行緒數和最大執行緒數是一樣的,所以稱之為固定執行緒數。
其他引數配置預設為:永不超時(0ms),無界佇列(LinkedBlockingQueue
)、預設執行緒工廠(DefaultThreadFactory
)、直接拒絕策略(AbortPolicy
)。
4.2.2、api
Executors.newFixedThreadPool(n);
4.2.3、demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: 建立2個執行緒來執行10個任務。 * * @author TongWei.Chen 2020-07-09 21:28:34 */ public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 10; i++) { // 從結果中可以發現執行緒name永遠都是兩個。不會有第三個。 executorService.execute(() -> System.out.println(Thread.currentThread().getName())); } } }
4.2.4、問題
問題就在於它是無界佇列,佇列裡能放int的最大值個任務,併發巨高的情況下極大可能直接OOM瞭然後任務還在堆積,畢竟直接用的是jvm記憶體。所以建議自定義執行緒池,自己按照需求指定合適的佇列大小,自定義拒絕策略將超出佇列大小的任務放到對外記憶體做補償,比如Redis。別把業務系統壓垮就行。
4.2.5、原始碼
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( // 核心執行緒數和最大執行緒數都是nThreads nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, // 無界佇列!!!致命問題的關鍵所在。 new LinkedBlockingQueue<Runnable>()); }
4.3、單個執行緒
4.3.1、描述
核心執行緒數和最大執行緒數是1,內部預設的,不可更改,所以稱之為單執行緒數的執行緒池。
類似於Executors.newFixedThreadPool(1);
其他引數配置預設為:永不超時(0ms),無界佇列(LinkedBlockingQueue
)、預設執行緒工廠(DefaultThreadFactory
)、直接拒絕策略(AbortPolicy
)。
4.3.2、api
Executors.newSingleThreadExecutor();
4.3.3、demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: 建立1個執行緒來執行10個任務。 * * @author TongWei.Chen 2020-07-09 21:28:34 */ public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { // 從結果中可以發現執行緒name永遠都是pool-1-thread-1。不會有第二個出現。 executorService.execute(() -> System.out.println(Thread.currentThread().getName())); } } }
4.3.4、問題
同【4.2、固定執行緒數】的問題,都是無界佇列惹的禍。
4.3.5、原始碼
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor( // 核心執行緒數和最大執行緒數都是1,寫死的,客戶端不可更改。 1, 1, 0L, TimeUnit.MILLISECONDS, // 無界佇列!!!致命問題的關鍵所在。 new LinkedBlockingQueue<Runnable>())); }
4.4、帶快取的執行緒池
4.4.1、描述
他的功能是來個任務我就開闢個執行緒去處理,不會進入佇列,SynchronousQueue
佇列也不帶儲存元素的功能。那這意味著來一億個請求就會開闢一億個執行緒去處理,keepAliveTime為60S,意味著執行緒空閒時間超過60S就會被殺死;這就叫帶快取功能的執行緒池。
核心執行緒數是0,最大執行緒數是int的最大值,內部預設的,不可更改。
其他引數配置預設為:1min超時(60s),SynchronousQueue
佇列、預設執行緒工廠(DefaultThreadFactory
)、直接拒絕策略(AbortPolicy
)。
4.4.2、api
Executors.newCachedThreadPool();
4.4.3、demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: 建立個帶快取功能的執行緒池來執行10個任務。 * * @author TongWei.Chen 2020-07-09 21:28:34 */ public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { // 從結果中可以發現執行緒name有10個。也就是有幾個任務就會開闢幾個執行緒。 executorService.execute(() -> System.out.println(Thread.currentThread().getName())); } } }
4.4.4、問題
問題就在於他的最大執行緒數是int的最大值,因為他內部採取的佇列是SynchronousQueue
,這個佇列沒有容納元素的能力,這將意味著只要來請求我就開啟執行緒去工作,巔峰期能建立二十幾億個執行緒出來工作,你自己想想多麼可怕!!!
4.4.5、原始碼
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( // 核心執行緒數是0,最大執行緒數都是Integer.MAX_VALUE,這個可致命了!!! 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
4.5、有排程功能的執行緒池
4.5.1、描述
RocketMQ內部大量採用了此種執行緒池來做心跳等任務。
核心執行緒數手動傳進來,最大執行緒數是Integer.MAX_VALUE,最大執行緒數是內部預設的,不可更改。
其他引數配置預設為:永不超時(0ns),帶延遲功能的佇列(DelayedWorkQueue
)、預設執行緒工廠(DefaultThreadFactory
)、直接拒絕策略(AbortPolicy
)。
4.5.2、api
Executors.newScheduledThreadPool(n);
4.5.3、demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: 建立個帶排程功能的執行緒池來執行任務。 * * @author TongWei.Chen 2020-07-09 21:28:34 */ public class ThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); // 五秒一次 scheduledExecutorService.schedule(() -> System.out.println(Thread.currentThread().getName()), 5, TimeUnit.SECONDS); // 首次五秒後執行,其次每隔1s執行一次 scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName()), 5, 1, TimeUnit.SECONDS); } }
4.5.4、問題
【同4.4、帶快取的執行緒池的問題】
問題就在於他的最大執行緒數是int的最大值,這將意味海量併發期能建立二十幾億個執行緒出來工作,你自己想想多麼可怕!!!
4.5.5、原始碼
public ScheduledThreadPoolExecutor(int corePoolSize) { // 致命的問題跟newCachedThreadPool一樣,最大執行緒數能開到幾十億(Integer.MAX_VALUE)!!! super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
4.6、停止執行緒
4.6.1、shutdown
平緩的結束執行緒池,比如當前執行緒池還在執行任務,還沒執行完,這時候執行了shutdown的話,執行緒池並不會立即停止工作,而是會等待執行緒池中的任務都執行完成後才會shutdown掉,但是如果執行shutdown了,外界還在繼續提交任務到執行緒池,那麼執行緒池會直接採取拒絕策略。
4.6.2、isShutdown
判斷執行緒池是否已經shutdown。
4.6.3、shutdownNow
暴力結束執行緒池。不管你當前執行緒池有沒有任務在執行,佇列裡有沒有堆積訊息,我都直接讓執行緒池掛掉。但是他的返回值是佇列裡那些未被執行的任務。有需要的可以記錄下log啥的。
4.7、疑問
這幾種執行緒池為什麼要採取不一樣的佇列?比如newFixedThreadPool
為什麼採取LinkedBlockingQueue
,而newCachedThreadPool
又為什麼採取SynchronousQueue
?
因為newFixedThreadPool
執行緒數量有限,他又不想丟失任務,只能採取無界佇列,而newCachedThreadPool
的話本身自帶int最大值個執行緒數,所以沒必要用無界佇列,他的宗旨就是我有執行緒能處理,不需要佇列。
5、總結幾個問題
1、執行緒池的狀態
RUNNING
:接受新任務並處理排隊任務。SHUTDOWN
:不接受新任務,但是會處理排隊任務。【見:停止執行緒的4.6.1、shutdown】STOP
:不接受新任務,也不處理排隊任務,並中端正在進行的任務。TIDYING
:所有任務都已經完事,工作執行緒為0的時候 ,執行緒會進入這個狀態並執行terminate()鉤子方法。TERMINATED
:terminate()鉤子方法執行完成。
2、執行緒池自動建立還是手動?
那肯定是手動了,因為Executors自動建立的那些執行緒池都存在致命的問題。手動建立執行緒池我們能自己控制執行緒數大小以及佇列大小,還可以指定組名稱等等個性化配置。重點不會出現致命問題,風險都把控在我們手裡。
3、執行緒數多少合適?
- CPU密集型(比如加密、各種複雜計算等):建議設定為CPU核數+1。
- 耗時IO操作(比如讀寫資料庫,壓縮解壓縮大檔案等等):一般會設定CPU核數的2倍。當然也有個很牛X的計算公式:執行緒數=CPU核數 *(1+平均等待時間/平均工作時間)
4、before&after
線上程執行前後可以通過兩個方法來進行列印log或其他工作。
原始碼如下:
// 執行前的before beforeExecute(wt, task); Throwable thrown = null; try { // 真正執行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 執行完成後after afterExecute(task, thrown); }
6、核心原始碼(全)
1、常用變數的解釋
// 1. `ctl`,可以看做一個int型別的數字,高3位表示執行緒池狀態,低29位表示worker數量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 2. `COUNT_BITS`,`Integer.SIZE`為32,所以`COUNT_BITS`為29 private static final int COUNT_BITS = Integer.SIZE - 3; // 3. `CAPACITY`,執行緒池允許的最大執行緒數。1左移29位,然後減1,即為 2^29 - 1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 4. 執行緒池有5種狀態,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl // 5. `runStateOf()`,獲取執行緒池狀態,通過按位與操作,低29位將全部變成0 private static int runStateOf(int c) { return c & ~CAPACITY; } // 6. `workerCountOf()`,獲取執行緒池worker數量,通過按位與操作,高3位將全部變成0 private static int workerCountOf(int c) { return c & CAPACITY; } // 7. `ctlOf()`,根據執行緒池狀態和執行緒池worker數量,生成ctl值 private static int ctlOf(int rs, int wc) { return rs | wc; } /* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */ // 8. `runStateLessThan()`,執行緒池狀態小於xx private static boolean runStateLessThan(int c, int s) { return c < s; } // 9. `runStateAtLeast()`,執行緒池狀態大於等於xx private static boolean runStateAtLeast(int c, int s) { return c >= s; }
2、構造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 基本型別引數校驗 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 空指標校驗 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; // 根據傳入引數`unit`和`keepAliveTime`,將存活時間轉換為納秒存到變數`keepAliveTime `中 this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
3、提交執行task的過程
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); // worker數量比核心執行緒數小,直接建立worker執行任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // worker數量超過核心執行緒數,任務直接進入佇列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 執行緒池狀態不是RUNNING狀態,說明執行過shutdown命令,需要對新加入的任務執行reject()操作。 // 這兒為什麼需要recheck,是因為任務入佇列前後,執行緒池的狀態可能會發生變化。 if (! isRunning(recheck) && remove(command)) reject(command); // 這兒為什麼需要判斷0值,主要是線上程池構造方法中,核心執行緒數允許為0 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果執行緒池不是執行狀態,或者任務進入佇列失敗,則嘗試建立worker執行任務。 // 這兒有3點需要注意: // 1. 執行緒池不是執行狀態時,addWorker內部會判斷執行緒池狀態 // 2. addWorker第2個引數表示是否建立核心執行緒 // 3. addWorker返回false,則說明任務執行失敗,需要執行reject操作 else if (!addWorker(command, false)) reject(command); }
4、addworker原始碼解析
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 外層自旋 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 這個條件寫得比較難懂,我對其進行了調整,和下面的條件等價 // (rs > SHUTDOWN) || // (rs == SHUTDOWN && firstTask != null) || // (rs == SHUTDOWN && workQueue.isEmpty()) // 1. 執行緒池狀態大於SHUTDOWN時,直接返回false // 2. 執行緒池狀態等於SHUTDOWN,且firstTask不為null,直接返回false // 3. 執行緒池狀態等於SHUTDOWN,且佇列為空,直接返回false // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 內層自旋 for (;;) { int wc = workerCountOf(c); // worker數量超過容量,直接返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 使用CAS的方式增加worker數量。 // 若增加成功,則直接跳出外層迴圈進入到第二部分 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 執行緒池狀態發生變化,對外層迴圈進行自旋 if (runStateOf(c) != rs) continue retry; // 其他情況,直接內層迴圈進行自旋即可 // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // worker的新增必須是序列的,因此需要加鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 這兒需要重新檢查執行緒池狀態 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // worker已經呼叫過了start()方法,則不再建立worker if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // worker建立並新增到workers成功 workers.add(w); // 更新`largestPoolSize`變數 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 啟動worker執行緒 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // worker執行緒啟動失敗,說明執行緒池狀態發生了變化(關閉操作被執行),需要進行shutdown相關操作 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
5、執行緒池worker任務單元
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 這兒是Worker的關鍵所在,使用了執行緒工廠建立了一個執行緒。傳入的引數為當前worker this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // 省略程式碼... }
6、核心執行緒執行邏輯-runworker
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 呼叫unlock()是為了讓外部可以中斷 w.unlock(); // allow interrupts // 這個變數用於判斷是否進入過自旋(while迴圈) boolean completedAbruptly = true; try { // 這兒是自旋 // 1. 如果firstTask不為null,則執行firstTask; // 2. 如果firstTask為null,則呼叫getTask()從佇列獲取任務。 // 3. 阻塞佇列的特性就是:當佇列為空時,當前執行緒會被阻塞等待 while (task != null || (task = getTask()) != null) { // 這兒對worker進行加鎖,是為了達到下面的目的 // 1. 降低鎖範圍,提升效能 // 2. 保證每個worker執行的任務是序列的 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 如果執行緒池正在停止,則對當前執行緒進行中斷操作 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); // 執行任務,且在執行前後通過`beforeExecute()`和`afterExecute()`來擴充套件其功能。 // 這兩個方法在當前類裡面為空實現。 try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { // 幫助gc task = null; // 已完成任務數加一 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 自旋操作被退出,說明執行緒池正在結束 processWorkerExit(w, completedAbruptly); } }
7、自建執行緒池注意點
- 阻塞任務佇列數
- 執行緒池的名字,最好跟業務相關
- 核心執行緒池大小,看業務實際情況。可以參考【執行緒數多少合適?】
- 最大執行緒池大小,看業務實際情況。可以參考【執行緒數多少合適?】
- 拒絕策略,我個人一般都是記錄log,如果主要的業務我會根據log做補償。
比如:
ThreadPoolExecutor executor = new ThreadPoolExecutor(CPU核數 + 1, 2 * CPU核數 + 1, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), // 執行緒池名字pay-account new DefaultThreadFactory("pay-account"), (r1, executor) -> { // 記錄log 重新入佇列做補償 });
&n