執行緒池很容易理解的
- 執行緒池介紹
- 併發佇列
- 執行緒池原理分析
- 自定義執行緒池
文中部分程式碼使用 lambda 表示式以簡化程式碼。
執行緒池
什麼是執行緒池?
Java中的執行緒池是運用場景最多的併發框架,幾乎所有需要非同步或併發執行任務的程式都可以使用執行緒池。在開發過程中,合理地使用執行緒池能夠帶來3個好處。
- 降低資源消耗 。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
- 提高響應速度 。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
- 提高執行緒的可管理性 。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一分配、調優和監控
執行緒狀態

執行緒生命週期
新建狀態
當用new操作符建立一個執行緒時, 例如 new Thread(r) ,執行緒還沒有開始執行,此時執行緒處在新建狀態。
new Thread(() -> System.out.println("run 任務執行"))
就緒狀態
一個新建立的執行緒並不自動開始執行,要執行執行緒,必須呼叫執行緒的 start() 方法。處於就緒狀態的執行緒並不一定立即執行run()方法,執行緒還必須同其他執行緒競爭CPU時間,只有獲得CPU時間才可以執行執行緒。
new Thread(() -> System.out.println("run 任務執行")).start();
執行狀態
當執行緒獲得CPU時間後,它才進入執行狀態,真正開始執行run()方法。也就是 System.out.println("run 任務執行") 。
run 任務執行
阻塞狀態
執行緒執行過程中,可能由於各種原因進入阻塞狀態 :
-
執行緒執行了Thread.sleep(int n)方法,執行緒放棄CPU,睡眠n毫秒,然後恢復執行。
-
執行緒要執行一段同步程式碼,由於無法獲得相關的同步鎖,只好進入阻塞狀態,等到獲得了同步鎖,才能恢復執行。
-
執行緒執行了一個物件的wait()方法,進入阻塞狀態,只有等到其他執行緒執行了該物件的notify()或notifyAll()方法,才可能將其喚醒。
ofollow,noindex">更多原因參考
死亡狀態
有兩個原因會導致執行緒死亡:
- run方法正常退出而自然死亡
- 一個未捕獲的異常終止了run方法而使執行緒死亡
使用 isAlive() 方法可判斷執行緒在當前是否存活著
併發佇列
在併發佇列上JDK提供了兩套實現, 一個是以ConcurrentLinkedQueue為代表的高效能佇列非阻塞佇列,一個是以BlockingQueue介面為代表的阻塞佇列,無論哪種都繼承自Queue 。
ConcurrentLinkedDeque 非阻塞式佇列
- 無界執行緒安全佇列
- 先進先出的原則
- 不允許null元素
重要方法:
- add() 和 offer() 都是加入元素的方法
- poll() 和 peek() 都是取頭元素節點,區別在於 前者會刪除元素,後者不會
適用於高併發場景下的佇列,通過無鎖的方式,實現了高併發狀態下的高效能,通常ConcurrentLinkedQueue效能好於BlockingQueue.它是一個基於連結節點的無界執行緒安全佇列。該佇列的元素遵循先進先出的原則。頭是最先
加入的,尾是最近加入的,該佇列不允許null元素
public static void main(String[] args) { // 非阻塞式用法 無界佇列 - 先進先出的原則 ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue<String>(); concurrentLinkedQueue.offer("第一個進佇列"); concurrentLinkedQueue.offer("第二個進佇列"); // 獲取單個佇列資訊 peek 獲取佇列 System.out.println(concurrentLinkedQueue.peek()); System.out.println(concurrentLinkedQueue.size()); // 獲取單個佇列資訊 poll 獲取佇列之後 會刪除佇列資訊 移除 System.out.println(concurrentLinkedQueue.poll()); System.out.println(concurrentLinkedQueue.size()); }
第一個進佇列 2 第一個進佇列 1
BlockingQueue 阻塞式佇列
被阻塞的情況主要有如下兩種:
- 當佇列滿了的時候進行入佇列操作
- 當佇列空了的時候進行出佇列操作
以下簡單介紹 **BlockingQueue ** 成員
ArrayBlockingQueue
有邊界的阻塞佇列,它的內部實現是一個數組,先進先出原則。 使用阻塞必須加加超時時間
public static void main(String[] args) throws InterruptedException { // 阻塞式佇列 ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3); // 沒有加超時時間都是為非阻塞式 arrayBlockingQueue.offer("張三"); // 獲取單個佇列資訊,並且會刪除該佇列 System.out.println(arrayBlockingQueue.poll(3, TimeUnit.SECONDS)); System.out.println(arrayBlockingQueue.poll(3, TimeUnit.SECONDS)); System.out.println(arrayBlockingQueue.size()); }
new ArrayBlockingQueue<>(3) 設定(佇列)有界大小3。
offer("張三") - “張三“ 進入佇列
第一個 poll("張三") 取出資料,“張三”從佇列中移除
第二個 poll("張三") ,此時佇列已經沒有資料了會阻塞等待3秒,如果佇列在這3秒內有資料入佇列會立即取出資料並結束阻塞,3秒阻塞超時就返回null
張三 null 0
以上示例是 當佇列空了的時候進行出佇列阻塞
public static void main(String[] args) throws InterruptedException { // 阻塞式佇列 ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3); // 沒有加超時時間都是為非阻塞式 arrayBlockingQueue.offer("張一", 3, TimeUnit.SECONDS); arrayBlockingQueue.offer("張二", 3, TimeUnit.SECONDS); arrayBlockingQueue.offer("張三", 3, TimeUnit.SECONDS); arrayBlockingQueue.offer("張四", 3, TimeUnit.SECONDS); // 獲取單個佇列資訊,並且會刪除該佇列 System.out.println(arrayBlockingQueue.poll(3, TimeUnit.SECONDS)); System.out.println(arrayBlockingQueue.size()); }
new ArrayBlockingQueue<>(3) 設定(佇列)有界大小3。
offer("張一") - “張一“ 進入佇列
offer("張二") - “張二“ 進入佇列
offer("張三") - “張三“ 進入佇列 , 此時佇列已滿
offer("張四",3, TimeUnit.SECOND) 此時佇列已滿 , 這裡給了3秒阻塞時間等待資料出列才有位置,期間如果有資料出列立即新增“張四”入佇列並結束阻塞,如果沒資料出列阻塞超時會廢棄當前資料“張四”
張一 2
以上示例是 當佇列滿了的時候進行入佇列阻塞
常見的阻塞式佇列如下:
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
有興趣可以去實踐下
ThreadPoolExecutor
- Executor - execute 方法介面
- Executors - 工廠類
- ThreadPoolExecutor - Executor 框架的最頂層實現類
Executor框架的最頂層實現是ThreadPoolExecutor類, Executors 工廠類中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool 靜態方法其實也只是 ThreadPoolExecutor 的建構函式引數不同而已。通過傳入不同的引數,就可以構造出適用於不同應用場景下的執行緒池,來看下構造的引數。
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { ... }
- corePoolSize 執行緒池核心執行緒數,預設情況下,核心執行緒會線上程池中一直存活,即使它們處於閒置狀態。如果ThreadPoolExecutor的allowCoreThreadTimeOut屬性設定為true,那麼閒置的核心執行緒在等待新任務到來時會有超時策略,超過keepAliveTime 時間後,核心執行緒就會被終止。
- maximumPoolSize
執行緒池所能容納的最大執行緒數,當活動執行緒達到這個數值後,後續的新任務將被阻塞。 - keepAliveTime
非核心執行緒超過這個時長就會被回收。當ThreadPoolExecutor的allowCoreThreadTimeOut屬性設定為true時,keepAliveTime同樣會作用於核心執行緒 - unit
keepAliveTime 引數的時間單位 - workQueue
執行前用於保持任務的佇列, 此佇列僅由 execute 方法提交的 Runnable 任務。
newCachedThreadPool
這裡用 可快取的 newCachedThreadPool 執行緒池來分析
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newCachedThreadPool的corePoolSize被設定為0, maximumPoolSize被設定為Integer.MAX_VALUE, 即maximum是無界的。這裡keepAliveTime設定為60秒,意味著空閒的執行緒最多可以等待任務60秒,否則將被回收。
newCachedThreadPool使用沒有容量的SynchronousQueue作為主執行緒池的工作佇列,它是一個不儲存元素阻塞佇列。
執行緒池中的執行緒是被執行緒池快取了的,也就是說,執行緒沒有任務要執行時,便處於空閒狀態,處於空閒狀態的執行緒並不會被立即銷燬(會被快取住),只有當空閒時間超出一段時間(預設為60s)後,執行緒池才會銷燬該執行緒(相當於清除過時的快取)。新任務到達後,執行緒池首先會讓被快取住的執行緒(空閒狀態)去執行任務,如果沒有可用執行緒(無空閒執行緒),便會建立新的執行緒。
根據特性可知 :
- 長時間不提交任務的CachedThreadPool不會佔用系統資源
- 極端情況下,CachedThreadPool會因為建立過多執行緒而耗盡CPU資源及記憶體
程式碼示例
public static void main(String[] args) { // 建立執行緒(四種方式) 1.可快取、定長、定時、單例 ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 100; i++) { final int temp = i; // execute方法作用: 執行任務 newCachedThreadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + ",i:" + temp); }); } }
newCachedThreadPool 執行緒池執行 100 次任務
pool-1-thread-3,i:2 pool-1-thread-1,i:0 pool-1-thread-2,i:1 pool-1-thread-4,i:3 pool-1-thread-5,i:4 pool-1-thread-7,i:6 pool-1-thread-6,i:5 pool-1-thread-9,i:8 pool-1-thread-10,i:9 pool-1-thread-8,i:7 pool-1-thread-11,i:10 pool-1-thread-1,i:21 pool-1-thread-3,i:20
可以看到執行緒 pool-1-thread-3、pool-1-thread-1 被重複使用了
newFixedThreadPool
定長的newFixedThreadPool 執行緒池
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
可以看到 newFixedThreadPool 的核心執行緒和最大的執行緒都是一樣的,最多3個執行緒將處於活動狀態。如果提交了3個以上的執行緒,那麼它們將保持在佇列中,直到執行緒可用。
程式碼示例
public static void main(String[] args) { // 可固定長度的執行緒池 核心執行緒數 為3 最多建立3個執行緒 ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int temp = i; // execute方法作用: 執行任務 newFixedThreadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + ",i:" + temp); }); } }
pool-1-thread-3,i:2 pool-1-thread-2,i:1 pool-1-thread-1,i:0 pool-1-thread-3,i:3 pool-1-thread-2,i:5 pool-1-thread-3,i:6 pool-1-thread-1,i:4 pool-1-thread-3,i:8 pool-1-thread-2,i:7 pool-1-thread-1,i:9
可以看到始終只有pool-1-thread-1、pool-1-thread-2、pool-1-thread-3 三個執行緒
newScheduledThreadPool
可定時的 newScheduledThreadPool 執行緒池
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
newScheduledThreadPool 的核心執行緒為 傳入的corePoolSize,最大執行緒為Integer.MAX_VALUE , DelayedWorkQueue佇列實現BlockingQueue介面,所以使用阻塞式的BlockingQueue佇列。
程式碼示例
public static void main(String[] args) { // 可定時執行緒池 3核心執行緒數 ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(3); newScheduledThreadPool.scheduleAtFixedRate( // Runnable 任務 () -> System.out.println(Thread.currentThread().getName() + "run "), 1000,// 初次執行需等待時間 100, TimeUnit.MILLISECONDS);// 週期執行時間(3個執行緒搶cpu時間) }
pool-1-thread-1run pool-1-thread-2run pool-1-thread-3run pool-1-thread-3run pool-1-thread-2run
每 100 毫秒執行一次
newSingleThreadExecutor
單例的 newSingleThreadExecutor 執行緒池
程式碼示例
public static void main(String[] args) { // 單線執行緒 ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int temp = i; // execute方法作用: 執行任務 newSingleThreadExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ",i:" + temp)); } }
pool-1-thread-1,i:0 pool-1-thread-1,i:1 pool-1-thread-1,i:2 pool-1-thread-1,i:3 pool-1-thread-1,i:4 pool-1-thread-1,i:5 pool-1-thread-1,i:6 pool-1-thread-1,i:7 pool-1-thread-1,i:8 pool-1-thread-1,i:9
始終只有一個執行緒執行任務
執行緒池原理剖析
提交一個任務到執行緒池中,執行緒池的處理流程如下:
-
1 .判斷執行緒池裡的核心執行緒是否都在執行任務,如果不是(核心執行緒空閒或者還有核心執行緒沒有被建立)則建立一個新的工作執行緒來執行任務。如果核心執行緒都在執行任務,則進入下個流程。
-
2 .執行緒池判斷工作佇列是否已滿,如果工作佇列沒有滿,則將新提交的任務儲存在這個工作佇列裡。如果工作佇列滿了,則進入下個流程。
-
3 . 判斷執行緒池裡的執行緒是否都處於工作狀態,如果沒有,則建立一個新的工作執行緒來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。
自定義執行緒執行緒池
上面列舉到 newCachedThreadPool、newFixedThreadPool ... 底層都是對 ThreadPoolExecutor 的構造器進行包裝使用。在來看下
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
所以咱們如果要實現一個自定義執行緒池就 直接 newThreadPoolExecutor(...) 就就行,現在來自定義一個。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4));
這裡自定義執行緒池 :核心執行緒1個,最大建立2個執行緒,60s等待超時銷燬,使用阻塞式有界ArrayBlockingQueue 佇列,最多快取4個任務。
public class CustomThread { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4)); for (int i = 1; i <= 6; i++) { TaskThred t1 = new TaskThred("任務" + i); executor.execute(t1); } executor.shutdown(); } } class TaskThred implements Runnable { private String taskName; public TaskThred(String taskName) { this.taskName = taskName; } @Override public void run() { System.out.println(Thread.currentThread().getName() + taskName); } }
execute 執行6個任務
pool-1-thread-1任務1 pool-1-thread-2任務6 pool-1-thread-2任務2 pool-1-thread-1任務4 pool-1-thread-1任務5 pool-1-thread-2任務3
相關的異常資訊
RejectedExecutionException
原因:
- 提交任務量大, 佇列快取較小
- 確保不要在shutdown()之後在執行任務
java.util.concurrent.RejectedExecutionException: Task com.snail.demo.TaskThred@74a14482 rejected from java.util.concurrent.ThreadPoolExecutor@1540e19d[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
要出現這個異常只需將上邊 new ArrayBlockingQueue<>(4) 大小 4 改為 2 ,如下
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2));