1. 程式人生 > >ThreadPoolExecutor中策略的選擇與工作隊列的選擇(java線程池)

ThreadPoolExecutor中策略的選擇與工作隊列的選擇(java線程池)

完全 系統 rod 大小 font 操作系統 重復 null 定義

工作原理

1、線程池剛創建時,裏面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列裏面有任務,線程池也不會馬上執行它們。

2、當調用 execute() 方法添加一個任務時,線程池會做如下判斷:

a. 如果正在運行的線程數量小於 corePoolSize,那麽馬上創建線程運行這個任務;

   b. 如果正在運行的線程數量大於或等於 corePoolSize,那麽將這個任務放入隊列。

   c. 如果這時候隊列滿了,而且正在運行的線程數量小於 maximumPoolSize,那麽還是要創建線程運行這個任務;

   d. 如果隊列滿了,而且正在運行的線程數量大於或等於 maximumPoolSize,那麽線程池會拋出異常,告訴調用者“我不能再接受任務了”。

3、當一個線程完成任務時,它會從隊列中取下一個任務來執行。

4、當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數大於 corePoolSize,那麽這個線程就被停掉。所以線程池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。

   這樣的過程說明,並不是先加入任務就一定會先執行。假設隊列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那麽當加入 20 個任務時,執行的順序就是這樣的:首先執行任務 1、2、3,然後任務 4~13 被放入隊列。這時候隊列滿了,任務 14、15、16 會被馬上執行,而任務 17~20 則會拋出異常。最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。下面是一個線程池使用的例子:

排隊

所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此隊列與池大小進行交互:

  • 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
  • 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
  • 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。

排隊有三種通用策略:

  1. 直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
  2. 無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
  3. 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

被拒絕的任務當 Executor 已經關閉,並且 Executor 將有限邊界用於最大線程和工作隊列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種情況下,execute 方法都將調用其RejectedExecutionHandlerRejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預定義的處理程序策略:

  1. 在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時RejectedExecutionException
  2. ThreadPoolExecutor.CallerRunsPolicy 中,線程調用運行該任務的execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
  3. ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。
  4. ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然後重試執行程序(如果再次失敗,則重復此過程)。

定義和使用其他種類的 RejectedExecutionHandler 類也是可能的,但這樣做需要非常小心,尤其是當策略僅用於特定容量或排隊策略時。

公用的類ThreadPoolTask

[java] view plain copy
  1. public class ThreadPoolTask implements Runnable {
  2. // 保存任務所需要的數據
  3. private Object threadPoolTaskData;
  4. private static int consumerTaskSleepTime = 2000;
  5. ThreadPoolTask(Object tasks) {
  6. this.threadPoolTaskData = tasks;
  7. }
  8. public void run() {
  9. // 處理一個任務,這裏的處理方式太簡單了,僅僅是一個打印語句
  10. System.out.println("start .." + threadPoolTaskData);
  11. try {
  12. //便於觀察,等待一段時間
  13. Thread.sleep(consumerTaskSleepTime);
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println("finish " + threadPoolTaskData);
  18. threadPoolTaskData = null;
  19. }
  20. }



ThreadPoolExecutor.AbortPolicy

[java] view plain copy
  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.LinkedBlockingQueue;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. public class ThreadPool {
  6. //讓可執行程序休息一下
  7. private static int executePrograms = 0;
  8. private static int produceTaskMaxNumber = 10;
  9. public static void main(String[] args) {
  10. // 構造一個線程池
  11. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
  12. TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
  13. new ThreadPoolExecutor.CallerRunsPolicy());
  14. for (int i = 1; i <= produceTaskMaxNumber; i++) {
  15. try {
  16. String task = "task@ " + i;
  17. System.out.println("put " + task);
  18. threadPool.execute(new ThreadPoolTask(task));
  19. // 便於觀察,等待一段時間
  20. Thread.sleep(executePrograms);
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }
  26. }

運行結果

[java] view plain copy
  1. put task@ 1
  2. put task@ 2
  3. start ..task@ 1
  4. put task@ 3
  5. put task@ 4
  6. start ..task@ 2
  7. put task@ 5
  8. put task@ 6
  9. put task@ 7
  10. start ..task@ 6
  11. put task@ 8
  12. start ..task@ 7
  13. java.util.concurrent.RejectedExecutionException
  14. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
  15. at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
  16. at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
  17. at ThreadPool.main(ThreadPool.java:22)
  18. put task@ 9
  19. java.util.concurrent.RejectedExecutionException
  20. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
  21. at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
  22. at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
  23. at ThreadPool.main(ThreadPool.java:22)
  24. put task@ 10
  25. java.util.concurrent.RejectedExecutionException
  26. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
  27. at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
  28. at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
  29. at ThreadPool.main(ThreadPool.java:22)
  30. finish task@ 2
  31. finish task@ 6
  32. finish task@ 7
  33. start ..task@ 4
  34. start ..task@ 3
  35. finish task@ 1
  36. start ..task@ 5
  37. finish task@ 4
  38. finish task@ 3
  39. finish task@ 5



ThreadPoolExecutor.CallerRunsPolicy

[java] view plain copy
  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.LinkedBlockingQueue;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. public class ThreadPool {
  6. //讓可執行程序休息一下
  7. private static int executePrograms = 0;
  8. private static int produceTaskMaxNumber = 10;
  9. public static void main(String[] args) {
  10. // 構造一個線程池
  11. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
  12. TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
  13. new ThreadPoolExecutor.CallerRunsPolicy());
  14. for (int i = 1; i <= produceTaskMaxNumber; i++) {
  15. try {
  16. String task = "task@ " + i;
  17. System.out.println("put " + task);
  18. threadPool.execute(new ThreadPoolTask(task));
  19. // 便於觀察,等待一段時間
  20. Thread.sleep(executePrograms);
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }
  26. }

運行結果

[java] view plain copy
  1. put task@ 1
  2. put task@ 2
  3. start ..task@ 1
  4. put task@ 3
  5. start ..task@ 2
  6. put task@ 4
  7. put task@ 5
  8. put task@ 6
  9. put task@ 7
  10. start ..task@ 6
  11. put task@ 8
  12. start ..task@ 8
  13. start ..task@ 7
  14. finish task@ 2
  15. finish task@ 1
  16. start ..task@ 3
  17. start ..task@ 4
  18. finish task@ 6
  19. start ..task@ 5
  20. finish task@ 7
  21. finish task@ 8
  22. put task@ 9
  23. put task@ 10
  24. start ..task@ 9
  25. finish task@ 4
  26. finish task@ 3
  27. start ..task@ 10
  28. finish task@ 9
  29. finish task@ 5
  30. finish task@ 10



ThreadPoolExecutor.DiscardPolicy

[java] view plain copy
  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.LinkedBlockingQueue;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. public class ThreadPool {
  6. //讓可執行程序休息一下
  7. private static int executePrograms = 0;
  8. private static int produceTaskMaxNumber = 10;
  9. public static void main(String[] args) {
  10. // 構造一個線程池
  11. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
  12. TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
  13. new ThreadPoolExecutor.CallerRunsPolicy());
  14. for (int i = 1; i <= produceTaskMaxNumber; i++) {
  15. try {
  16. String task = "task@ " + i;
  17. System.out.println("put " + task);
  18. threadPool.execute(new ThreadPoolTask(task));
  19. // 便於觀察,等待一段時間
  20. Thread.sleep(executePrograms);
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }
  26. }

運行結果

[java] view plain copy
  1. put task@ 1
  2. put task@ 2
  3. start ..task@ 1
  4. put task@ 3
  5. start ..task@ 2
  6. put task@ 4
  7. put task@ 5
  8. put task@ 6
  9. put task@ 7
  10. start ..task@ 6
  11. put task@ 8
  12. put task@ 9
  13. start ..task@ 7
  14. put task@ 10
  15. finish task@ 2
  16. finish task@ 1
  17. start ..task@ 3
  18. start ..task@ 4
  19. finish task@ 7
  20. finish task@ 6
  21. start ..task@ 5
  22. finish task@ 3
  23. finish task@ 4
  24. finish task@ 5



ThreadPoolExecutor.DiscardOldestPolicy

[java] view plain copy
  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.LinkedBlockingQueue;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. public class ThreadPool {
  6. //讓可執行程序休息一下
  7. private static int executePrograms = 0;
  8. private static int produceTaskMaxNumber = 10;
  9. public static void main(String[] args) {
  10. // 構造一個線程池
  11. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3,
  12. TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
  13. new ThreadPoolExecutor.CallerRunsPolicy());
  14. for (int i = 1; i <= produceTaskMaxNumber; i++) {
  15. try {
  16. String task = "task@ " + i;
  17. System.out.println("put " + task);
  18. threadPool.execute(new ThreadPoolTask(task));
  19. // 便於觀察,等待一段時間
  20. Thread.sleep(executePrograms);
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }
  26. }

運行結果:

[java] view plain copy
  1. put task@ 1
  2. put task@ 2
  3. start ..task@ 1
  4. put task@ 3
  5. start ..task@ 2
  6. put task@ 4
  7. put task@ 5
  8. put task@ 6
  9. put task@ 7
  10. start ..task@ 6
  11. put task@ 8
  12. start ..task@ 7
  13. put task@ 9
  14. put task@ 10
  15. finish task@ 6
  16. finish task@ 7
  17. start ..task@ 8
  18. finish task@ 1
  19. start ..task@ 9
  20. finish task@ 2
  21. start ..task@ 10
  22. finish task@ 8
  23. finish task@ 9
  24. finish task@ 10



ThreadPoolExecutor中策略的選擇與工作隊列的選擇(java線程池)