Java 併發程式設計 | 執行緒池詳解
執行緒池用來處理非同步任務或者併發執行的任務
優點:
- 重複利用已建立的執行緒,減少建立和銷燬執行緒造成的資源消耗
- 直接使用執行緒池中的執行緒,提高響應速度
- 提高執行緒的可管理性,由執行緒池同一管理
ThreadPoolExecutor
java
中執行緒池使用 ThreadPoolExecutor
實現
建構函式
ThreadPoolExecutor
提供了四個建構函式,其他三個建構函式最終呼叫的都是下面這個建構函式
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 複製程式碼
入參:
-
corePoolSize
:執行緒池的核心執行緒數量執行緒池維護的核心執行緒數量,當執行緒池初始化後,核心執行緒數量為零,當有任務來到的時候才會建立執行緒去執行任務,當執行緒池中的工作執行緒數量等於核心執行緒數量時,新到的任務就會放到快取佇列中
-
maximumPoolSize
:執行緒池允許建立的最大執行緒數量當阻塞佇列滿了的時候,並且執行緒池中建立的執行緒數量小於
maximumPoolSize
,此時會建立新的執行緒執行任務 -
keepAliveTime
:執行緒活動保持時間只有當執行緒池數量大於核心執行緒數量時,
keepAliveTime
才會有效,如果當前執行緒數量大於核心執行緒數量時,並且執行緒的空閒時間達到keepAliveTime
,當前執行緒終止,直到執行緒池數量等於核心執行緒數 -
unit
:執行緒活動保持時間的單位keepAliveTime
的單位,包括:TimeUnit.DAYS
天,TimeUnit.HOURS
小時,TimeUnit.MINUTES
分鐘,TimeUnit.SECONDS
秒,TimeUnit.MILLISECONDS
毫秒,TimeUnit.MICROSECONDS
微秒,TimeUnit.NANOSECONDS
納秒 -
workQueue
:任務佇列,用來儲存等待執行任務的阻塞佇列ArrayBlockingQueue
:是一個基於陣列結構的有界佇列LinkedBlockingQueue
:是一個基於連結串列結構的阻塞佇列SynchronousQueue
:不儲存元素的阻塞佇列,每一個插入操作必須等到下一個執行緒呼叫移除操作,否則插入操作一直阻塞PriorityBlockingQueue
:一個具有優先順序的無線阻塞佇列 -
threadFactory
:用來建立執行緒的工廠 -
handler
:飽和策略,當執行緒池和佇列都滿了的時候,必須要採取一種策略處理新的任務,預設策略是AbortPolicy
,根據自己需求選擇合適的飽和策略AbortPolicy
:直接丟擲異常CallerRunsPolicy
:用呼叫者所在的執行緒來運行當前任務DiscardOldestPolicy
:丟棄佇列裡面最近的一個任務,並執行當前任務DiscardPolicy
:不處理,丟棄掉當然我們也可以通過實現
RejectedExecutionHandler
去自定義實現處理策略
入參不同,執行緒池的執行機制也不同,瞭解每個入參的含義由於我們更透傳的理解執行緒池的實現原理
提交任務
執行緒池處理提交任務流程如下

處理流程:
- 如果核心執行緒數量未滿,建立執行緒執行任務,否則新增到阻塞佇列中
- 如果阻塞佇列中未滿,將任務存到佇列裡
- 如果阻塞佇列滿了,看執行緒池數量是否達到了執行緒池最大數量,如果沒達到,建立執行緒執行任務
- 如果已經達到執行緒池最大數量,根據飽和策略進行處理
ThreadPoolExecutor
使用 execute(Runnable command)
和 submit(Runnable task)
向執行緒池中提交任務,在 submit(Runnable task)
方法中呼叫了 execute(Runnable command)
,所以我們只要瞭解 execute(Runnable command)
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 獲取執行緒池狀態,並且可以通過ctl獲取到當前執行緒池數量及執行緒池狀態 int c = ctl.get(); // 如果工作執行緒數小於核心執行緒數量,則建立一個新執行緒執行任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果不符合上面條件,當前執行緒處於執行狀態並且寫入阻塞佇列成功 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 雙重檢查,再次獲取執行緒狀態,如果當前執行緒狀態變為非執行狀態,則從佇列中移除任務,執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 檢查工作執行緒數量是否為0 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //建立執行緒執行任務,如果新增失敗則執行拒絕策略 else if (!addWorker(command, false)) reject(command); } 複製程式碼
execute(Runnable command)
方法中我們比較關心的就是如何建立新的執行緒執行任務,就 addWorker(command, true)
方法
workQueue.offer(command)
方法是用來向阻塞佇列中新增任務的
reject(command)
方法會根據建立執行緒池時傳入的飽和策略對任務進行處理,例如預設的 AbortPolicy
,檢視原始碼後知道就是直接拋了個 RejectedExecutionException
異常,其他的飽和策略的原始碼也是特別簡單
關於執行緒池狀態與工作執行緒的數量是如何表示的
在 ThreadPoolExecutor
中使用一個 AtomicInteger
型別變量表示
/** * ctl表示兩個資訊,一個是執行緒池的狀態(高3位表示),一個是當前執行緒池的數量(低29位表示),這個跟我們前面* 說過的讀寫鎖的state變數是一樣的,以一個變數記錄兩個資訊,都是以利用int的32個位元組,高十六位表述讀,低十 * 六位表示寫鎖 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //低29位儲存執行緒池數量 private static final int COUNT_BITS = Integer.SIZE - 3; //執行緒池最大容量 private static final int CAPACITY= (1 << COUNT_BITS) - 1; // 執行狀態儲存在高3位 // 執行狀態 private static final int RUNNING= -1 << COUNT_BITS; private static final int SHUTDOWN=0 << COUNT_BITS; private static final int STOP=1 << COUNT_BITS; private static final int TIDYING=2 << COUNT_BITS; private static final int TERMINATED =3 << COUNT_BITS; 複製程式碼
addWorker(command, boolean)
建立工作執行緒,執行任務
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 執行緒池狀態 int rs = runStateOf(c); // 判斷執行緒池狀態,以及阻塞佇列是否為空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 獲取執行緒工作執行緒數量 int wc = workerCountOf(c); // 判斷是否大於最大容量,以及根據傳入的core判斷是否大於核心執行緒數量還是最大執行緒數量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 增加工作執行緒數量 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get();// Re-read ctl //如果執行緒池狀態改變,則重試 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 建立Worker,內部建立了一個新的執行緒 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 執行緒池狀態判斷 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 將建立的執行緒新增到執行緒池 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //執行任務,首先會執行Worker物件的firstTask t.start(); workerStarted = true; } } } finally { //如果任務執行失敗 if (! workerStarted) //移除worker addWorkerFailed(w); } return workerStarted; } 複製程式碼
關閉執行緒池
ThreadPoolExecutor
中關閉執行緒池使用 shutdown()
和 shutdownNow()
方法,原理都是通過遍歷執行緒池中的執行緒,對執行緒進行中斷
for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } 複製程式碼
Executor框架
Executor
框架將任務的提交與任務的執行進行分離
Executors
提供了一系列工廠方法用於創先執行緒池,返回的執行緒池都實現了 ExecutorService
介面
工廠方法:
newFixedThreadPool newCachedThreadPool newSingleThreadExecutor newScheduledThreadPool
在阿里巴巴手冊中強制要求禁止使用 Executors
提供的工廠方法建立執行緒池

這個確實是一個很嚴重的問題,我們部門曾經就出現過使用 FixedThreadPool
執行緒池,導致OOM,這是因為執行緒執行任務的時候被阻塞或耗時很長時間,導致阻塞佇列一直在新增任務,直到記憶體被打滿,報OOM
所以我們在使用執行緒池的時候要使用 ThreadPoolExecutor
的建構函式去建立執行緒池,根據自己的任務型別來確定核心執行緒數和最大執行緒數,選擇適合阻塞佇列和阻塞佇列的長度
合理配置執行緒池
合理的配置執行緒池需要分析一下任務的性質(使用 ThreadPoolExecutor
建立執行緒池):
-
CPU密集型任務應配置竟可能小的執行緒,比如 cpu數量+1
-
IO密集型任務並不是一直在執行任務,應該配置儘可能多的執行緒,比如 cpu數量x2
可通過
Runtime.getRuntime().availableProcessors()
獲取cpu數量 -
執行的任務有呼叫外部介面比較費時的時候,這時cup空閒的時間就越長,可以將執行緒池數量設定大一些,這樣cup空閒的時間就可以去執行別的任務
-
建議使用有界佇列,可根據需要將長度設定大一些,防止OOM