多執行緒程式設計學習十一(ThreadPoolExecutor 詳解).
一、ThreadPoolExecutor 引數說明
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize:核心執行緒池的大小。當提交一個任務到執行緒池時,核心執行緒池會建立一個核心執行緒來執行任務,即使其他核心執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於核心執行緒池基本大小時就不再建立。如果呼叫了執行緒池的 prestartAllCoreThreads() 方法,核心執行緒池會提前建立並啟動所有核心執行緒。
- workQueue:任務佇列。當核心執行緒池中沒有執行緒時,所提交的任務會被暫存在佇列中。Java 提供了多種阻塞佇列。
- maximumPoolSize:執行緒池允許建立的最大執行緒數。如果佇列也滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的空閒執行緒執行任務。值得注意的是,如果使用了無界的任務佇列則這個引數不起作用。
- keepAliveTime:當執行緒池中的執行緒數大於 corePoolSize 時,keepAliveTime 為多餘的空閒執行緒等待新任務的最長時間,超過這個時間後多餘的執行緒將被終止。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高執行緒的利用率。值得注意的是,如果使用了無界的任務佇列則這個引數不起作用。
- TimeUnit:執行緒活動保持時間的單位。
threadFactory:建立執行緒的工廠。可以通過執行緒工廠給每個創建出來的執行緒設定符合業務的名字。
// 依賴 guava new ThreadFactoryBuilder().setNameFormat("xx-task-%d").build();
handler:飽和策略。當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。Java 提供了以下4種策略:
- AbortPolicy:預設。直接丟擲異常。
- CallerRunsPolicy:只用呼叫者所線上程來執行任務。
- DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
- DiscardPolicy:不處理,丟棄掉。
tips: 一般我們稱核心執行緒池中的執行緒為核心執行緒,這部分執行緒不會被回收;超過任務佇列後,建立的執行緒為空閒執行緒,這部分執行緒會被回收(回收時間即 keepAliveTime)
二、常見的 ThreadPoolExecutor 介紹
Executors 是建立 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的工廠類。
Java 提供了多種型別的 ThreadPoolExecutor,比較常見的有 FixedThreadPool、SingleThreadExecutor、CachedThreadPool等。
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 被稱為可重用固定執行緒數的執行緒池。可以看到 corePoolSize 和 maximumPoolSize 都被設定成了 nThreads;keepAliveTime設定為0L,意味著多餘的空閒執行緒會被立即終止;使用了阻塞佇列 LinkedBlockingQueue 作為執行緒的工作佇列(佇列的容量為 Integer.MAX_VALUE)。
FixedThreadPool 所存在的問題是,由於佇列的容量為 Integer.MAX_VALUE,基本可以認為是無界的,所以 maximumPoolSize 和 keepAliveTime 引數都不會生效,飽和拒絕策略也不會執行,會造成任務大量堆積在阻塞佇列中。
FixedThreadPool 適用於為了滿足資源管理的需求,而需要限制執行緒數量的應用場景。
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 是使用單個執行緒的執行緒池。可以看到 corePoolSize 和 maximumPoolSize 被設定為1,其他引數與 FixedThreadPool 相同,所以所帶來的風險也和 FixedThreadPool 一致,就不贅述了。
SingleThreadExecutor 適用於需要保證順序的執行各個任務。
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool 是一個會根據需要建立新執行緒的執行緒池。可以看到 corePoolSize 被設定為 0,所以建立的執行緒都為空閒執行緒;maximumPoolSize 被設定為 Integer.MAX_VALUE(基本可認為無界),意味著可以建立無限數量的空閒執行緒;keepAliveTime 設定為60L,意味著空閒執行緒等待新任務的最長時間為60秒;使用沒有容量的 SynchronousQueue 作為執行緒池的工作佇列。
CachedThreadPool 所存在的問題是, 如果主執行緒提交任務的速度高於maximumPool 中執行緒處理任務的速度時,CachedThreadPool 會不斷建立新執行緒。極端情況下,CachedThreadPool會因為建立過多執行緒而耗盡CPU和記憶體資源。
CachedThreadPool 適用於執行很多的短期非同步任務的小程式,或者是負載較輕的伺服器。
三、 自建 ThreadPoolExecutor 執行緒池
鑑於上面提到的風險,我們更提倡使用 ThreadPoolExecutor 去建立執行緒池,而不用 Executors 工廠去建立。
以下是一個 ThreadPoolExecutor 建立執行緒池的 Demo 例項:
public class Pool {
static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pool-task-%d").build();
static ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
threadFactory, new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 無返回值的任務執行 -> Runnable
executor.execute(() -> System.out.println("Hello World"));
// 2. 有返回值的任務執行 -> Callable
Future<String> future = executor.submit(() -> "Hello World");
// get 方法會阻塞執行緒執行等待返回結果
String result = future.get();
System.out.println(result);
// 3. 監控執行緒池
monitor();
// 4. 關閉執行緒池
shutdownAndAwaitTermination();
monitor();
}
private static void monitor() {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Pool.executor;
System.out.println("【執行緒池任務】執行緒池中曾經建立過的最大執行緒數:" + threadPoolExecutor.getLargestPoolSize());
System.out.println("【執行緒池任務】執行緒池中執行緒數:" + threadPoolExecutor.getPoolSize());
System.out.println("【執行緒池任務】執行緒池中活動的執行緒數:" + threadPoolExecutor.getActiveCount());
System.out.println("【執行緒池任務】佇列中等待執行的任務數:" + threadPoolExecutor.getQueue().size());
System.out.println("【執行緒池任務】執行緒池已執行完任務數:" + threadPoolExecutor.getCompletedTaskCount());
}
/**
* 關閉執行緒池
* 1. shutdown、shutdownNow 的原理都是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的 interrupt 方法來中斷執行緒。
* 2. shutdownNow:將執行緒池的狀態設定成 STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表。
* 3. shutdown:將執行緒池的狀態設定成 SHUTDOWN 狀態,然後中斷所有沒有正在執行任務的執行緒。
*/
private static void shutdownAndAwaitTermination() {
// 禁止提交新任務
executor.shutdown();
try {
// 等待現有任務終止
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 取消當前正在執行的任務
executor.shutdownNow();
// 等待一段時間讓任務響應被取消
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
// 如果當前執行緒也中斷,則取消
executor.shutdownNow();
// 保留中斷狀態
Thread.currentThread().interrupt();
}
}
}
建立執行緒池需要注意以下幾點:
- CPU 密集型任務應配置儘可能小的執行緒,如配置 Ncpu+1 個執行緒。
- IO 密集型任務(資料庫讀寫等)應配置儘可能多的執行緒,如配置 Ncpu*2 個執行緒。
- 優先順序不同的任務可以使用優先順序佇列 PriorityBlockingQueue 來處理。
- 建議使用有界佇列。可以避免建立數量非常多的執行緒,甚至拖垮系統。有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。