1. 程式人生 > >Java併發核心基礎——執行緒池使用及底層實現機制詳解

Java併發核心基礎——執行緒池使用及底層實現機制詳解

Java執行緒池概述: 從使用入手: java.util.concurrent.Executosr是執行緒池的靜態工廠,我們通常使用它方便地生產各種型別的執行緒池,主要的方法有三種: 1、newSingleThreadExecutor()——建立一個單執行緒的執行緒池 2、newFixedThreadPool(int n)——建立一個固定大小的執行緒池 3、newCachedThreadPool()——建立一個可快取的執行緒池 1、SingleThreadExecutor 特點:單執行緒序列工作,如果這個唯一的執行緒因為異常終止,則有一個新的執行緒來替代它。 2、FixedThreadPool
特點:固定大小的執行緒池,如果設定的所有執行緒都在執行,新任務會在任務佇列等待。 3、CachedThreadPool 特點:大小可伸縮的執行緒池。如果當前沒有可用執行緒,則建立一個執行緒。在執行結束後快取60s,如果不被呼叫則移除執行緒。呼叫execute()方法時可以重用快取中的執行緒。適用於很多短期非同步任務的環境,可以提高程式效能。 以CachedThreadPool為例:
public class MyCachedThreadPool {
	public static void main(String[] args){
		//建立Runnable物件,實現run()方法
		Runnable runnable = new Runnable() {
			@Override
			public void run() {
				System.out.println(Thread.currentThread().getName() + " running!");
				try {
					//為了體現出任務競爭資源,讓執行緒休眠1000ms
					Thread.sleep(1000L);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(Thread.currentThread().getName() + " end!");
			}
		};
		//建立執行緒池,加入任務,執行任務
		ExecutorService myThreadPool = Executors.newCachedThreadPool();
		myThreadPool.execute(runnable);
		myThreadPool.execute(runnable);
		myThreadPool.execute(runnable);
		myThreadPool.execute(runnable);
		myThreadPool.execute(runnable);
		//關閉執行緒池
		myThreadPool.shutdown();
	}
}
測試結果:

其中,我們不光會使用excute()方法執行任務,還會使用submit()方法,submit方法主要適用於使用Callable的情況,區別主要有如下幾點: 1、接收引數不一樣,excute()方法需要一個Runnable型別引數,而submit方法需要一個Callable<T>型別引數。 2、返回值不同,excute()方法沒有返回值,submit方法會返回Future<T>型別返回值。 3、submit方法適合處理異常。執行的任務裡會丟擲checked或者unchecked exception,而你又希望外面的呼叫者能夠感知這些exception並做出及時的處理,那麼就需要用到submit,通過捕獲Future.get丟擲的異常。
使用執行緒池的好處: 1、降低資源消耗。重複利用已建立執行緒,降低執行緒建立與銷燬的資源消耗。 2、提高響應效率。任務到達時,不需等待建立執行緒就能立即執行。 3、提高執行緒可管理性。 4、防止伺服器過載。記憶體溢位、CPU耗盡。 執行緒池應用範圍: 1、需要大量的執行緒來完成任務,並且完成所需時間較短。如Web伺服器完成網頁請求。 2、對效能有苛刻要求。如伺服器實時響應。 3、突發性大量請求,且不至於在伺服器產生大量執行緒。 介紹執行緒池實現機制之前: 關於執行緒池一些比較重要的類或介面: (1)ExecutorService是真正的執行緒池介面,所以我們在通過Excutors建立各種執行緒時,一般採用如下程式碼: ExecutorService threadPool = Executors.newXXX(); 先宣告一個執行緒池介面,執行時動態繫結具體的執行緒池物件,典型的介面的用法(雖然ExecutorService名字起的不像個介面……)。 從這段程式碼還可以看出,(2)Executors是靜態工廠的功能(雖然名字也不像個工廠……),生產各種型別執行緒池。 需要注意的是,要區分(3)Executor與Executors,Executor是執行緒池的頂級介面,但它只是一個執行執行緒的工具,真正的執行緒池介面是ExecutorService。 (4)AbstractExecutorService實現了ExecutorService介面,實現了其中大部分的方法(有沒有實現的,所以被宣告為Abstract)。 (5)ThreadPoolExecutor,繼承了AbstractExecutorService,是ExecutorService的預設實現,也是一會將要介紹的重點。 這五個類或介面實現了從執行緒池頂層介面到底層實現的整個架構。 其它的帶有Schedule關鍵字的類或介面與實現週期性重複性工作相關,不是本篇考慮的重點。類圖如下:
除了繼承Thread、實現Runnable、Callable三種建立執行緒方式外的第四種建立方式: 實現java.util.concurrent.ThreadFactory介面,實現newThread(Runnable r)方法 這種方式應用於這樣一種場景:我們需要一個執行緒池,並且對於執行緒池中的執行緒物件、賦予統一的名字、優先順序,以及一些其他統一操作,使用這樣的工廠方式就是優秀程式設計師應該使用的最好的方法。 執行緒池工作機制及原理: 先看看Excutors建立各種執行緒池的原始碼: 1、newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {   
	return new FinalizableDelegatedExecutorService   
		(new ThreadPoolExecutor(1, 1,   
                                0L, TimeUnit.MILLISECONDS,   
                                new LinkedBlockingQueue<Runnable>()));   
}
2、newFixedThreadPool(int n)
public static ExecutorService newFixedThreadPool(int nThreads) {   
	return new ThreadPoolExecutor(nThreads, nThreads,   
								  0L, TimeUnit.MILLISECONDS,   
                                  new LinkedBlockingQueue<Runnable>());   
}
3、newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {   
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   
                                  60L, TimeUnit.SECONDS,   
                                  new SynchronousQueue<Runnable>());   
}
發現所有實現都是建立了一個ThreadPoolExecutor例項(ThreadPoolExecutor是ExecutorServiec預設實現類,加深印象) 其中ThreadPoolExecutor的建構函式中有幾個引數,現在介紹這些引數,是理解執行緒池工作原理的重要方式: 1、第一個引數:int corePoolSIze,核心池大小,也就是執行緒池中會維持不被釋放的執行緒數量。我們可以看到FixedThreadPool中這個引數值就是設定的執行緒數量,而SingleThreadExcutor中就是1,newCachedThreadPool中就是0,不會維持,只會快取60L。但需要注意的是,線上程池剛建立時,裡面並沒有建好的執行緒,只有當有任務來的時候才會建立(除非呼叫方法prestartAllCoreThreads()與prestartCoreThread()方法),在corePoolSize數量範圍的執行緒在完成任務後不會被回收。 2、第二個引數:int maximumPoolSize,執行緒池的最大執行緒數,代表著執行緒池中能建立多少執行緒池。超出corePoolSize,小於maximumPoolSize的執行緒會在執行任務結束後被釋放。此配置在CatchedThreadPool中有效。 3、第三個引數:long keepAliveTime,剛剛說到的會被釋放的執行緒快取的時間。我們可以看到,正如我們所說的,在CachedThreadPool()構造過程中,會被設定快取時間為60s(時間單位由第四個引數控制)。 4、第四個引數:TimeUnit unit,設定第三個引數keepAliveTime的時間單位。 5、第五個引數:儲存等待執行任務的阻塞佇列,有多種選擇,分別介紹: SynchronousQueue——直接提交策略,適用於CachedThreadPool。它將任務直接提交給執行緒而不保持它們。如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求最大的 maximumPoolSize 以避免拒絕新提交的任務(正如CachedThreadPool這個引數的值為Integer.MAX_VALUE)。當任務以超過佇列所能處理的量、連續到達時,此策略允許執行緒具有增長的可能性。吞吐量較高。 LinkedBlockingQueue——無界佇列,適用於FixedThreadPool與SingleThreadExcutor。基於連結串列的阻塞佇列,建立的執行緒數不會超過corePoolSizes(maximumPoolSize值與其一致),當執行緒正忙時,任務進入佇列等待。按照FIFO原則對元素進行排序,吞吐量高於ArrayBlockingQueue。 ArrayListBlockingQueue——有界佇列,有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。 ThreadPoolExcutor的構造方式不僅有這一種,總共有四種,還可以在最後加入一個引數以控制執行緒池任務超額處理策略: 當用來快取待處理任務的佇列已滿時,又加入了新的任務,那麼這時候就該考慮如何處理這個任務。 可以通過實現RejectedExceptionHandler介面,實現rejectedException(ThreadPoolExecutor e, Runnable r)方法自定義操作。但通常我們使用JDK提供了4種處理策略,在ThreadPoolExecutor構造時以引數傳入: ThreadPoolExcutor.AbortPolicy()——直接丟擲異常,預設操作 ThreadPoolExcutor.CallerRunsPolicy()——只用呼叫者所線上程來執行任務 ThreadPoolExcutor.DiscardOldersPolicy()——丟棄佇列裡最近的一個任務,並執行當前任務 ThreadPoolExcutor.DiscardPolicy()——不處理,直接丟掉 關於excute()方法,它的執行實際上分了三步: 1、當少量的執行緒在執行,執行緒的數量還沒有達到corePoolSize,那麼啟用新的執行緒來執行新任務。 2、如果執行緒數量已經達到了corePoolSize,那麼嘗試把任務快取起來,然後再次檢查執行緒池的狀態,看這個時候是否能新增一個額外的執行緒,來執行這個任務。如果這個檢查到執行緒池關閉了,就拒絕任務。 3、如果我們沒法快取這個任務,那麼我們就嘗試去新增執行緒去執行這個任務,如果失敗,可能任務已被取消或者任務佇列已經飽和,就拒絕掉這個任務。 可以參考原始碼理解:
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
其中,執行緒池會把每個執行緒封裝成一個Worker物件,由addWorker(Runnable firstTask, boolean core)方法控制,firstTask代表執行緒池首要執行的任務,core代表是否使用corePoolSize引數作為執行緒池最大標記。 參考資料: 《Java併發程式設計從入門到精通》 http://www.cnblogs.com/wanqieddy/p/3853863.html