1. 程式人生 > >Java Executor併發框架(二)RejectedExecutionHandler介紹

Java Executor併發框架(二)RejectedExecutionHandler介紹

一、介紹

當Executor已經關閉(即執行了executorService.shutdown()方法後),並且Executor將有限邊界用於最大執行緒數量和工作佇列容量,且已經飽和時,在方法execute()中提交的新任務將被拒絕。

在以上述情況下,execute 方法將呼叫其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。

首先我們來看下RejectedExecutionHandler 的定義:


下面提供了四種預定義的處理程式策略:

(1) 預設的ThreadPoolExecutor.AbortPolicy   處理程式遭到拒絕將丟擲執行時RejectedExecutionException;
(2) ThreadPoolExecutor.CallerRunsPolicy 執行緒呼叫執行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度

(3) ThreadPoolExecutor.DiscardPolicy  不能執行的任務將被刪除;

(4) ThreadPoolExecutor.DiscardOldestPolicy  如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)。

執行緒池預設會採用的是defaultHandler策略。首先看defaultHandler的定義:


再看看具體的AbortPolicy:


看一下其他拒絕策略的具體實現:

CallerRunsPolicy實現類: 


DiscardPolicy實現類: 


DiscardOldestPolicy實現類: 


通過以上分析,我們都知道:這四個RejectedExecutionHandler的實現類都是ThreadPoolExecutor的靜態內部類。

二、測試案例

寫一個task:
package com.npf.thread.test;

public class Task implements Runnable {

	protected String name;
	
	public Task(String name) {
		super();
		this.name = name;
	}

	@Override
	public void run() {
		try {
			System.out.println(this.name + " is running.");  
			Thread.sleep(500); 
		} catch (Exception e) {
			
		}
	}

}

1. AbortPolicy 示例
package com.npf.thread.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class AbortPolicyDemo {
	
	public static void main(String[] args) {
		// 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1,"執行緒池"的阻塞佇列容量為1。  
		ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0,
				TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1));
		
		// 設定執行緒池的拒絕策略為AbortPolicy  
		pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
		
		try {
			// 新建10個任務,並將它們新增到執行緒池中  
	        for (int i = 0; i < 10; i++) {  
	            Runnable myTask = new Task("task-"+i);  
	            pool.execute(myTask);  
	        }  
		} catch (RejectedExecutionException e) {
			e.printStackTrace();
			 // 關閉執行緒池  
	        pool.shutdown();
		}
	}
}
某一次執行結果,直接已異常的形式丟擲:


2. CallerRunsPolicy 示例 

package com.npf.thread.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CallerRunsPolicyDemo {
	
	public static void main(String[] args) {
		// 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1,"執行緒池"的阻塞佇列容量為1。  
		ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0,
				TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1));
		
		// 設定執行緒池的拒絕策略為CallerRunsPolicy  
		pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

		// 新建10個任務,並將它們新增到執行緒池中  
       for (int i = 0; i < 10; i++) {  
           Runnable myTask = new Task("task-"+i);  
           pool.execute(myTask);  
        }  

		 // 關閉執行緒池  
        pool.shutdown();
	}
}

某一次執行結果,當有新任務新增到執行緒池被拒絕時,執行緒池會將被拒絕的任務新增到"執行緒池正在執行的執行緒"中去執行


3. DiscardPolicy 示例

package com.npf.thread.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DiscardPolicyDemo {
	
	public static void main(String[] args) {
		// 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1,"執行緒池"的阻塞佇列容量為1。  
		ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0,
				TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1));
		
		// 設定執行緒池的拒絕策略為DiscardPolicy  
		pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

		// 新建10個任務,並將它們新增到執行緒池中  
        for (int i = 0; i < 10; i++) {  
            Runnable myTask = new Task("task-"+i);  
           pool.execute(myTask);  
        }
        
		// 關閉執行緒池  
        pool.shutdown();
	}
}

某一次執行結果,執行緒池pool的"最大池大小"和"核心池大小"都為1,這意味著"執行緒池能同時執行的任務數量最大隻能是1"。執行緒池pool的阻塞佇列是ArrayBlockingQueue,ArrayBlockingQueue是一個有界的阻塞佇列,ArrayBlockingQueue的容量為1。這也意味著執行緒池的阻塞佇列只能有一個執行緒池阻塞等待。由此可知,執行緒池中共運行了2個任務。第1個任務直接放到Worker中,通過執行緒去執行;第2個任務放到阻塞佇列中等待。其他的任務都被丟棄了。


4. DiscardOldestPolicy 示例

package com.npf.thread.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DiscardOldestPolicyDemo {
	
	public static void main(String[] args) {
		// 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1,"執行緒池"的阻塞佇列容量為1。  
		ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0,
				TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1));
		
		// 設定執行緒池的拒絕策略為DiscardOldestPolicy  
		pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

		// 新建10個任務,並將它們新增到執行緒池中  
        for (int i = 0; i < 10; i++) {  
            Runnable myTask = new Task("task-"+i);  
            pool.execute(myTask);  
        }
        
		// 關閉執行緒池  
        pool.shutdown();
	}
}

某一次執行結果,當有新任務新增到執行緒池被拒絕時,執行緒池會丟棄阻塞佇列中末尾的任務,然後將被拒絕的任務新增到末尾。