1. 程式人生 > >Java併發程式設計實戰:執行緒池的應用

Java併發程式設計實戰:執行緒池的應用

一、任務與執行策略間的隱形耦合

1、執行緒飢餓死鎖

當任務都是同類、獨立的時候,執行緒池才會有最好的工作表現。如果將耗時的短期的任務混合在一起,除非執行緒池很大,否則會有“塞車”的風險;如果提交的任務要依賴其他的任務,除非池是無限的,否則有產生死鎖的風險。如下程式碼所示,對於一個單執行緒化的Executor,一個任務將另一個任務提交到相同的Executor中,並等待新提交的任務的結果,第二個任務滯留在工作佇列中,直到第一個任務完成,但是第一個任務在等待第一個任務完成,這就引發了死鎖。在一個大的執行緒池中,如果所有執行緒執行的任務都阻塞線上程池中,等待著仍處於同一佇列中的其他任務,也會發生死鎖。

class TheadDeadlock{
	ExecutorService exec = Executors.newSingleThreadExecutor();
	class RenderPageTask implements Callable<String>{
		public String call() {
			Future<String> header,footer;
			header = exec.submit(new LoadFileTask("header.html"));
			footer = exec.submit(new LoadFileTask("footer.html"));
			String page = renderBody();
			return header.get() + page + footer.get();
		}
	}
}

2、耗時操作

如果任務由於過長的時間週期而阻塞,那麼即使不可能出現死鎖,執行緒池的響應性也會變得很差。耗時任務會造成執行緒池堵塞,還會延長服務時間。可以限定任務等待資源的時間來緩解耗時操作帶來的影響。

二、定製執行緒池的大小

執行緒池過大:執行緒對稀缺的CPU和記憶體資源的競爭,會導致記憶體的高使用量,還可能耗盡資源。

執行緒池過小:由於存在很多可用的處理器資源卻未在工作,會對吞吐量造成損失。

如果有不同類別的任務,他們擁有差別很大的行為,那麼要使用多個執行緒池,這樣每個執行緒池可以根據不同任務的工作負載進行調節。

對於計算密集型的任務,一個有N個處理器的系統通常使用N+1個執行緒的執行緒池來獲得最優的利用率。

對於包含了I/O和其他阻塞操作的任務,不是所有的執行緒都會在所有的時間被排程,因此需要一個更大的執行緒池。

給定下列定義:

N = CPU的數量

U = 目標CPU的使用率,0 <= U <= 1

W/C = 等待時間與計算時間的比率

為保持處理器達到期望的使用率,最優的池的大小等於

Num = N * U * (1 + W/C)

可以通過Runtime.getRuntime().availableProcessors();來獲得CPU的數目

CPU週期並不是唯一可以約束資源池大小的資源,其他還有:記憶體、檔案控制代碼、套接字控制代碼和資料庫連線等。計算這些型別資源池的大小:首先累加每一個任務需要的這些資源的總量,然後除以可用的總量,所得結果是池大小的上限。

三、配置ThreadPoolExecutor

1、執行緒的建立與銷燬

核心池大小:即執行緒池的實現試圖維護的池的大小,即使沒有任務執行,池的大小也等於核心池的大小,並且直到工作佇列充滿前,池都不會建立更多的執行緒。

最大池的大小:可同時活動的執行緒數的上限

存活時間:如果一個執行緒已經閒置的時間超過了存活時間,它將成為一個被回收的候選者,如果池的大小超過了核心池的大小,執行緒池會終止它。

newFixedThreadPool工廠為請求的池設定了核心池的大小和最大池的大小,而且存活時間是無限的

newCachedThreadPool工廠將最大池的大小設定為Integer.MAX_VALUE,核心池的大小設定為0,超時設定為1分鐘

2、管理佇列任務

當新請求到達的頻率超過了執行緒池能夠處理它們的速度,這些多出來的請求會在一個由Executor管理的Runnable佇列中等候。

ThreadPoolExecutor允許使用一個BlockingQueue來持有等待執行的任務。任務排隊有3種基本方法:無限佇列、有限佇列和同步移交。

newFixedThreadPool和newSingleThreadExecutor預設使用的是一個無限的LinkedBlockingQueue。如果執行緒池中的所有執行緒都處於忙碌狀態,任務將會在佇列中等候。如果等候的任務超出了佇列的長度,佇列也會無限制地增加。

newCachedThreadPool使用SynchronousQueue,將任務直接從生產者移交給工作者執行緒。為了把一個元素放入到SynchronousQueue中,必須有另一個執行緒正在等待接受移交的任務。如果沒有這樣一個執行緒,只要當前池的大小還小於最大值,ThreadPoolExecutor就會建立一個新的執行緒;否則根據飽和策略,任務會被拒絕。只有當池是無限的,或者可以接受任務被拒絕,SynchronousQueue才是一個好的選擇。

另外,使用有限佇列比如ArrayBlockingQueue或者有限的LinkedBlockingQueue以及PriorityBlockingQueue有助於避免資源耗盡的情況發生,但是它又引入了新問題:當佇列已滿後,新的任務怎麼辦?有很多飽和策略可以處理這個問題。對於一個有界佇列,佇列的長度與吃的長度必須一起調節。一個大佇列加一個小池,可以控制對記憶體和CPU的使用,還可以減少上下文切換。

當任務彼此獨立時,有限執行緒池或者有限工作佇列的使用是合理的。倘若任務之間相互依賴,有限的執行緒池或佇列就可能引起執行緒飢餓死鎖,就要使用一個無限的池配置。

3、飽和策略

當有限的等待佇列填滿後,飽和策略開始起作用。ThreadPoolExecutor的飽和策略可以通過呼叫setRejectedExecutionHandler來修改。JDK提供了幾種不同的RejectedExecutionHandler的實現:

(1)AbortPolicy:預設的“中止”策略會引起execute丟擲未檢查的RejectedExecutionException,呼叫者可以捕獲這個異常,然後編寫能滿足自己需求的處理程式碼。

(2)DiscardPolicy:“遺棄”策略會放棄這個任務

(3)DiscardOldestPolicy:“遺棄最舊的”策略選擇丟棄的任務是本應該接下來就執行的任務,然後嘗試重新提交新任務。如果工作佇列是優先順序佇列,那麼它選擇丟棄的是優先順序最高的任務,所以優先順序佇列和“遺棄最舊的”策略不能一塊使用

(4)CallerRunsPolicy:“呼叫者執行”策略會把一下任務推回到呼叫者那裡,以此減緩新任務流。當所有執行緒都被佔用,工作佇列已充滿後,下一個任務會在主執行緒中執行。主執行緒呼叫execute執行這個任務。因為這將花費一些時間,所以主執行緒在一段時間內不能提交任何任務。同時這也給了工作者執行緒時間來追趕進度。這期間主執行緒也不會呼叫accept,所以外來的請求不會出現在應用程式中,而會在TCP層的佇列中等候。如果持續高負載的話,最終會由TCP層判斷它的連線請求佇列是否已經排滿,如果已滿就開始丟棄請求任務。

(5)使用Semaphore來遏制任務的提交,使用一個非受限佇列,設定Semaphore的限制範圍等於池的大小加上你希望允許可排隊的任務數量。如下程式碼所示:

class BoundedExecutor{
	private final Executor exec;
	private final Semaphore semaphore;
	public BoundedExecutor(Executor exec,int bound){
		this.exec = exec;
		this.semaphore = new Semaphore(bound);
	}
	public void submitTask(final Runnable task) throws InterruptedException{
		semaphore.acquire();
		try {
			exec.execute(new Runnable() {
				@Override
				public void run() {
					// TODO Auto-generated method stub
					try{
						task.run();
					}finally{
						semaphore.release();
					}
				}
			});
		} catch (RejectedExecutionException e) {
			// TODO Auto-generated catch block
			semaphore.release();
		}
	}
}

四、擴充套件ThreadPoolExecutor

ThreadPoolExecutor提供了幾個函式讓子類去覆寫:beforeExecute、afterExecute和terminate。執行任務的執行緒會呼叫函式beforeExecute、afterExecute,用於新增日誌、時序、監視器或統計資訊收集的功能。無論任務是正常從run返回,還是丟擲一個異常,afterExecute都會被呼叫。如果beforeExecute丟擲一個RuntimeException,任務將不被執行,afterExecute也不會被呼叫。但所有的任務都已完成且所有工作者執行緒也已經關閉後,會執行terminate,terminate可以用來釋放Executor在生命週期裡分配到的資源,還可以發出通知、記錄日誌或者完成統計資訊。如下程式碼可以使用這三個函式提供日誌和計時功能

class TimingThreadPool extends ThreadPoolExecutor{
	private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
	private final Logger log = Logger.getLogger("TimingThreadPool");
	private final AtomicLong numTasks = new AtomicLong();
	private final AtomicLong totalTime = new AtomicLong();
	
	protected void beforeExecute(Thread t,Runnable r){
		super.beforeExecute(t,r);
		log.fine(String.format("Thread %s: start %s",t,r));
		startTime.set(System.nanoTime());
	}
	protected void afterExecute(Runnable r,Throwable t){
		try{
			long endTime = System.nanoTime();
			long taskTime = endTime - startTime.get();
			numTasks.incrementAndGet();
			totalTime.addAndGet(taskTime);
			log.fine(String.format("Thread %s: end %s, time=%dns",t,r,taskTime));
		}finally{
			super.afterExecute(r,t);
		}
	}
	protected void terminated(){
		try{
			log.info(String.format("Terminated: time=%dns",totalTime.get()/numTasks.get()));
		}finally{
			super.terminated();
		}
	}
}