1. 程式人生 > >java執行緒池之ThreadPoolExecutor(二):任務入佇列和任務丟棄

java執行緒池之ThreadPoolExecutor(二):任務入佇列和任務丟棄

一、關於任務入佇列

在上一篇【java執行緒池之ThreadPoolExecutor(一)】中,

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
定義執行緒池用的是ArrayBlockingQueue,該佇列是有容量限制的。譬如這裡,佇列裡面同時只能存在5個任務排隊等候。

其官方解釋如下:

ArrayBlockingQueue<E>:一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列的頭部 是在佇列中存在時間最長的元素。佇列的尾部 是在佇列中存在時間最短的元素。新元素插入到佇列的尾部,佇列獲取操作則是從佇列頭部開始獲得元素。 
這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致操作受阻塞;試圖從空佇列中提取元素將導致類似阻塞。 

除了ArrayBlockingQueue佇列,java還為我們提供了其它的佇列:


LinkedBlockingQueue<E>  :一個基於已連結節點的、範圍任意的 blocking queue。此佇列按 FIFO(先進先出)排序元素。佇列的頭部 是在佇列中時間最長的元素。佇列的尾部 是在佇列中時間最短的元素。新元素插入到佇列的尾部,並且佇列獲取操作會獲得位於佇列頭部的元素。連結佇列的吞吐量通常要高於基於陣列的佇列,但是在大多數併發應用程式中,其可預知的效能要低。

PriorityBlockingQueue<E>:一個無界阻塞佇列,它使用與類 PriorityQueue 相同的順序規則,並且提供了阻塞獲取操作。雖然此佇列邏輯上是無界的,但是資源被耗盡時試圖執行 add 操作也將失敗(導致 OutOfMemoryError)。此類不允許使用 null 元素。依賴自然順序的優先順序佇列也不允許插入不可比較的物件(這樣做會導致丟擲 ClassCastException)。 

DelayQueue<E extends Delayed>:Delayed 元素的一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部 是延遲期滿後儲存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且 poll 將返回 null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即使無法使用 take 或 poll 移除未到期的元素,也不會將這些元素作為正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此佇列不允許使用 null 元素。 

......

等等不一一列舉,感興趣的同學自己去檢視api文件。

現在把上一篇【java執行緒池之ThreadPoolExecutor(一)】中的建立執行緒池的程式碼修改如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
執行結果如下:
正在執行task==2
正在執行task==4
正在執行task==1
正在執行task==3
正在執行task==5
=========第1次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:5
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:10
已經執行完成的任務數目:0
=========第1次檢測end===========
=========第2次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:5
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:10
已經執行完成的任務數目:0
=========第2次檢測end===========
=========第3次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:5
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:10
已經執行完成的任務數目:0
=========第3次檢測end===========
執行緒:task==2執行完畢
正在執行task==6
=========第4次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:5
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:9
已經執行完成的任務數目:1
=========第4次檢測end===========
=========第5次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:5
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:9
已經執行完成的任務數目:1
=========第5次檢測end===========
執行緒:task==3執行完畢
正在執行task==7

........

執行緒:task==9執行完畢
正在執行task==13
執行緒:task==11執行完畢
正在執行task==14
執行緒:task==7執行完畢
正在執行task==15
=========第11次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:5
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:0
已經執行完成的任務數目:10
=========第11次檢測end===========
執行緒:task==13執行完畢
=========第12次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:4
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:0
已經執行完成的任務數目:11
=========第12次檢測end===========
執行緒:task==14執行完畢
=========第13次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:3
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:0
已經執行完成的任務數目:12
=========第13次檢測end===========
執行緒:task==6執行完畢
執行緒:task==12執行完畢
=========第14次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:1
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:0
已經執行完成的任務數目:14
=========第14次檢測end===========

......

=========第20次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:1
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:0
已經執行完成的任務數目:14
=========第20次檢測end===========
執行緒:task==15執行完畢
=========第21次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:0
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:0
已經執行完成的任務數目:15
=========第21次檢測end===========

你會發現活躍執行緒數始終不會超過5個,因為這時候佇列的容量沒有限制,不管多少任務來了,排隊便是,執行緒池中始終由5個核心執行緒重複執行。

二. 執行緒池中的任務丟棄

對於任務丟棄,ThreadPoolExecutor以內部類的形式實現了4個策略。分別是:
CallerRunsPolicy。提交任務的執行緒自己負責執行這個任務。
AbortPolicy。使Executor丟擲異常,通過異常做處理。
DiscardPolicy。丟棄提交的任務。
DiscardOldestPolicy。丟棄掉佇列中最早加入的任務。

在呼叫構造方法時,引數中未指定RejectedExecutionHandler情況下,預設採用AbortPolicy。

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
		 try {
			for(int i=1;i<=20;i++){//執行20個任務,但第16個任務會報異常,從而被拒絕。
			     MyTask myTask = new MyTask("task=="+i);
			     executor.execute(myTask);
			 }
		} catch (Exception e) {
			e.printStackTrace();
		}
		 //該執行緒用於對執行緒池的檢測
		 new Thread(new CheckTask(executor)).start();
		 executor.shutdown();
執行結果如下所示:
java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
	at com.thread.pool.ThreadPoolTest2.main(ThreadPoolTest2.java:32)
正在執行task==2
正在執行task==4
正在執行task==11
正在執行task==13
正在執行task==15
正在執行task==1
正在執行task==3
正在執行task==5
正在執行task==12
正在執行task==14
執行緒:task==1執行完畢
正在執行task==6
執行緒:task==11執行完畢
正在執行task==7
=========第1次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:10
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:3
已經執行完成的任務數目:2
=========第1次檢測end===========
=========第2次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:10
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:3
已經執行完成的任務數目:2
=========第2次檢測end===========
執行緒:task==15執行完畢
正在執行task==8
執行緒:task==6執行完畢
正在執行task==9
=========第3次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:10
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:1
已經執行完成的任務數目:4
=========第3次檢測end===========
從第16個任務開始就報異常被中止了。

當然我們在建立執行緒池的時候可以指明直接丟掉過多的任務,如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5),new ThreadPoolExecutor.DiscardPolicy());
		 try {
			for(int i=1;i<=20;i++){//執行20個任務,但第16個任務開始就會被丟棄。
			     MyTask myTask = new MyTask("task=="+i);
			     executor.execute(myTask);
			 }
		} catch (Exception e) {
			e.printStackTrace();
		}
		 //該執行緒用於對執行緒池的檢測
		 new Thread(new CheckTask(executor)).start();
		 executor.shutdown();
請注意,執行緒池的構造方法現在多出了一個引數new ThreadPoolExecutor.DiscardPolicy()。執行結果如下:
正在執行task==2
正在執行task==4
正在執行task==11
正在執行task==13
正在執行task==15
正在執行task==1
正在執行task==3
正在執行task==5
正在執行task==12
正在執行task==14
執行緒:task==14執行完畢
正在執行task==6
=========第1次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:10
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:4
已經執行完成的任務數目:1
=========第1次檢測end===========
執行緒:task==3執行完畢
正在執行task==7
執行緒:task==15執行完畢
正在執行task==8
=========第2次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:10
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:2
已經執行完成的任務數目:3
=========第2次檢測end===========
=========第3次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:10
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:2
已經執行完成的任務數目:3
=========第3次檢測end===========
.......

執行緒:task==7執行完畢
=========第12次檢測start===========
執行緒池中核心執行緒數:5
執行緒池中活躍執行緒數目:0
執行緒池中允許的最大執行緒數目:10
佇列中等待執行的任務數目:0
已經執行完成的任務數目:15
=========第12次檢測end===========
注意:這裡找不到第16個任務執行情況,也就是說從第16個任務開始就被丟棄了,也不會報錯。
還有其它任務丟棄策略,請自行檢視api文件研究!