ThreadPoolExecutor中策略的選擇與工作隊列的選擇(java線程池)
工作原理
1、線程池剛創建時,裏面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列裏面有任務,線程池也不會馬上執行它們。
2、當調用 execute() 方法添加一個任務時,線程池會做如下判斷:
a. 如果正在運行的線程數量小於 corePoolSize,那麽馬上創建線程運行這個任務;
b. 如果正在運行的線程數量大於或等於 corePoolSize,那麽將這個任務放入隊列。
c. 如果這時候隊列滿了,而且正在運行的線程數量小於 maximumPoolSize,那麽還是要創建線程運行這個任務;
d. 如果隊列滿了,而且正在運行的線程數量大於或等於 maximumPoolSize,那麽線程池會拋出異常,告訴調用者“我不能再接受任務了”。
3、當一個線程完成任務時,它會從隊列中取下一個任務來執行。
4、當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數大於 corePoolSize,那麽這個線程就被停掉。所以線程池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
這樣的過程說明,並不是先加入任務就一定會先執行。假設隊列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那麽當加入 20 個任務時,執行的順序就是這樣的:首先執行任務 1、2、3,然後任務 4~13 被放入隊列。這時候隊列滿了,任務 14、15、16 會被馬上執行,而任務 17~20 則會拋出異常。最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。下面是一個線程池使用的例子:
排隊
所有 BlockingQueue
都可用於傳輸和保持提交的任務。可以使用此隊列與池大小進行交互:
- 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
- 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
- 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。
排隊有三種通用策略:
- 直接提交。工作隊列的默認選項是
SynchronousQueue
,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。 - 無界隊列。使用無界隊列(例如,不具有預定義容量的
LinkedBlockingQueue
)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。 - 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如
ArrayBlockingQueue
)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。
被拒絕的任務當 Executor 已經關閉,並且 Executor 將有限邊界用於最大線程和工作隊列容量,且已經飽和時,在方法 execute(java.lang.Runnable)
中提交的新任務將被拒絕。在以上兩種情況下,execute 方法都將調用其RejectedExecutionHandler
的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
方法。下面提供了四種預定義的處理程序策略:
- 在默認的
ThreadPoolExecutor.AbortPolicy
中,處理程序遭到拒絕將拋出運行時RejectedExecutionException
。 - 在
ThreadPoolExecutor.CallerRunsPolicy
中,線程調用運行該任務的execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。 - 在
ThreadPoolExecutor.DiscardPolicy
中,不能執行的任務將被刪除。 - 在
ThreadPoolExecutor.DiscardOldestPolicy
中,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然後重試執行程序(如果再次失敗,則重復此過程)。
定義和使用其他種類的 RejectedExecutionHandler
類也是可能的,但這樣做需要非常小心,尤其是當策略僅用於特定容量或排隊策略時。
公用的類ThreadPoolTask
[java] view plain copy
- public class ThreadPoolTask implements Runnable {
- // 保存任務所需要的數據
- private Object threadPoolTaskData;
- private static int consumerTaskSleepTime = 2000;
- ThreadPoolTask(Object tasks) {
- this.threadPoolTaskData = tasks;
- }
- public void run() {
- // 處理一個任務,這裏的處理方式太簡單了,僅僅是一個打印語句
- System.out.println("start .." + threadPoolTaskData);
- try {
- //便於觀察,等待一段時間
- Thread.sleep(consumerTaskSleepTime);
- } catch (Exception e) {
- e.printStackTrace();
- }
- System.out.println("finish " + threadPoolTaskData);
- threadPoolTaskData = null;
- }
- }
ThreadPoolExecutor.AbortPolicy
[java] view plain copy
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class ThreadPool {
- //讓可執行程序休息一下
- private static int executePrograms = 0;
- private static int produceTaskMaxNumber = 10;
- public static void main(String[] args) {
- // 構造一個線程池
- ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
- TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
- new ThreadPoolExecutor.CallerRunsPolicy());
- for (int i = 1; i <= produceTaskMaxNumber; i++) {
- try {
- String task = "task@ " + i;
- System.out.println("put " + task);
- threadPool.execute(new ThreadPoolTask(task));
- // 便於觀察,等待一段時間
- Thread.sleep(executePrograms);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
運行結果
[java] view plain copy
- put task@ 1
- put task@ 2
- start ..task@ 1
- put task@ 3
- put task@ 4
- start ..task@ 2
- put task@ 5
- put task@ 6
- put task@ 7
- start ..task@ 6
- put task@ 8
- start ..task@ 7
- java.util.concurrent.RejectedExecutionException
- at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
- at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
- at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
- at ThreadPool.main(ThreadPool.java:22)
- put task@ 9
- java.util.concurrent.RejectedExecutionException
- at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
- at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
- at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
- at ThreadPool.main(ThreadPool.java:22)
- put task@ 10
- java.util.concurrent.RejectedExecutionException
- at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
- at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
- at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
- at ThreadPool.main(ThreadPool.java:22)
- finish task@ 2
- finish task@ 6
- finish task@ 7
- start ..task@ 4
- start ..task@ 3
- finish task@ 1
- start ..task@ 5
- finish task@ 4
- finish task@ 3
- finish task@ 5
ThreadPoolExecutor.CallerRunsPolicy
[java] view plain copy
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class ThreadPool {
- //讓可執行程序休息一下
- private static int executePrograms = 0;
- private static int produceTaskMaxNumber = 10;
- public static void main(String[] args) {
- // 構造一個線程池
- ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
- TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
- new ThreadPoolExecutor.CallerRunsPolicy());
- for (int i = 1; i <= produceTaskMaxNumber; i++) {
- try {
- String task = "task@ " + i;
- System.out.println("put " + task);
- threadPool.execute(new ThreadPoolTask(task));
- // 便於觀察,等待一段時間
- Thread.sleep(executePrograms);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
運行結果
[java] view plain copy- put task@ 1
- put task@ 2
- start ..task@ 1
- put task@ 3
- start ..task@ 2
- put task@ 4
- put task@ 5
- put task@ 6
- put task@ 7
- start ..task@ 6
- put task@ 8
- start ..task@ 8
- start ..task@ 7
- finish task@ 2
- finish task@ 1
- start ..task@ 3
- start ..task@ 4
- finish task@ 6
- start ..task@ 5
- finish task@ 7
- finish task@ 8
- put task@ 9
- put task@ 10
- start ..task@ 9
- finish task@ 4
- finish task@ 3
- start ..task@ 10
- finish task@ 9
- finish task@ 5
- finish task@ 10
ThreadPoolExecutor.DiscardPolicy
[java] view plain copy
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class ThreadPool {
- //讓可執行程序休息一下
- private static int executePrograms = 0;
- private static int produceTaskMaxNumber = 10;
- public static void main(String[] args) {
- // 構造一個線程池
- ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
- TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
- new ThreadPoolExecutor.CallerRunsPolicy());
- for (int i = 1; i <= produceTaskMaxNumber; i++) {
- try {
- String task = "task@ " + i;
- System.out.println("put " + task);
- threadPool.execute(new ThreadPoolTask(task));
- // 便於觀察,等待一段時間
- Thread.sleep(executePrograms);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
運行結果
[java] view plain copy
- put task@ 1
- put task@ 2
- start ..task@ 1
- put task@ 3
- start ..task@ 2
- put task@ 4
- put task@ 5
- put task@ 6
- put task@ 7
- start ..task@ 6
- put task@ 8
- put task@ 9
- start ..task@ 7
- put task@ 10
- finish task@ 2
- finish task@ 1
- start ..task@ 3
- start ..task@ 4
- finish task@ 7
- finish task@ 6
- start ..task@ 5
- finish task@ 3
- finish task@ 4
- finish task@ 5
ThreadPoolExecutor.DiscardOldestPolicy
[java] view plain copy
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class ThreadPool {
- //讓可執行程序休息一下
- private static int executePrograms = 0;
- private static int produceTaskMaxNumber = 10;
- public static void main(String[] args) {
- // 構造一個線程池
- ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
- TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
- new ThreadPoolExecutor.CallerRunsPolicy());
- for (int i = 1; i <= produceTaskMaxNumber; i++) {
- try {
- String task = "task@ " + i;
- System.out.println("put " + task);
- threadPool.execute(new ThreadPoolTask(task));
- // 便於觀察,等待一段時間
- Thread.sleep(executePrograms);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
運行結果:
[java] view plain copy
- put task@ 1
- put task@ 2
- start ..task@ 1
- put task@ 3
- start ..task@ 2
- put task@ 4
- put task@ 5
- put task@ 6
- put task@ 7
- start ..task@ 6
- put task@ 8
- start ..task@ 7
- put task@ 9
- put task@ 10
- finish task@ 6
- finish task@ 7
- start ..task@ 8
- finish task@ 1
- start ..task@ 9
- finish task@ 2
- start ..task@ 10
- finish task@ 8
- finish task@ 9
- finish task@ 10
ThreadPoolExecutor中策略的選擇與工作隊列的選擇(java線程池)