1. 程式人生 > >Java線程池Executor框架詳解

Java線程池Executor框架詳解

keepalive cto 拒絕 cache task 委托 工作單元 shu keepaliv

Java的線程既是工作單元,也是執行機制。從JDK 5開始,把工作單元與執行機制分離開來。工作單元包括Runnable和Callable,而執行機制由Executor框架提供。

Executor框架簡介
在HotSpot VM的線程模型中,Java線程(java.lang.Thread)被一對一映射為本地操作系統線程。Java線程啟動時會創建一個本地操作系統線程;當該Java線程終止時,這個操作系統線程也會被回收。操作系統會調度所有線程並將它們分配給可用的CPU。
在上層,Java多線程程序通常把應用分解為若幹個任務,然後使用用戶級的調度器(Executor框架)將這些任務映射為固定數量的線程;在底層,操作系統內核將這些線程映射到硬件處理器上。

技術分享圖片

任務:包括被執行任務需要實現的接口:Runnable接口或Callable接口。
任務的執行:包括任務執行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關鍵類實現了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
異步計算的結果:包括接口Future和實現Future接口的FutureTask類。

技術分享圖片
技術分享圖片

ThreadPoolExecutor
1、FixedThreadPool:稱為可重用固定線程數的線程池。下面是FixedThreadPool的源代碼實

現。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool的corePoolSize和maximumPoolSize都被設置為創建FixedThreadPool時指定的參數nThreads。
當線程池中的線程數大於corePoolSize時,keepAliveTime為多余的空閑線程等待新任務的最長時間,超過這個時間後多余的線程將被終止。這裏把keepAliveTime設置為0L,意味著多余的空閑線程會被立即終止。

FixedThreadPoo``l使用×××隊列LinkedBlockingQueue作為線程池的工作隊列(隊列的容量為Integer.MAX_VALUE)。使用×××隊列作為工作隊列會對線程池帶來如下影響。
1)當線程池中的線程數達到corePoolSize後,新任務將在×××隊列中等待,因此線程池中的線程數不會超過corePoolSize。
2)由於1,使用×××隊列時maximumPoolSize將是一個無效參數。
3)由於1和2,使用×××隊列時keepAliveTime將是一個無效參數。
4)由於使用×××隊列,運行中的FixedThreadPool(未執行方法shutdown()或shutdownNow())不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution方法)。

2、SingleThreadExecutor:使用單個worker線程的Executor。下面是SingleThreadExecutor的源代碼實現。

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
3、CachedThreadPool:是一個會根據需要創建新線程的線程池。下面是創建CachedThread-
Pool的源代碼。

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被設置為0,即corePool為空;maximumPoolSize被設置為Integer.MAX_VALUE,即maximumPool是×××的。這裏把keepAliveTime設置為60L,意味著CachedThreadPool中的空閑線程等待新任務的最長時間為60秒,空閑線程超過60秒後將會被終止。

CachedThreadPool使用沒有容量的SynchronousQueue作為線程池的工作隊列,但CachedThreadPool的maximumPool是×××的。這意味著,如果主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CachedThreadPool會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU和內存資源。

ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。它主要用來在給定的延遲之後運行任務,或者定期執行任務。

技術分享圖片

ScheduledThreadPoolExecutor會把待調度的任務(ScheduledFutureTask)放到一個DelayQueue中。
ScheduledFutureTask主要包含3個成員變量,如下:
long型成員變量time,表示這個任務將要被執行的具體時間。
long型成員變量sequenceNumber,表示這個任務被添加到ScheduledThreadPoolExecutor中
的序號。
long型成員變量period,表示任務執行的間隔周期。

DelayQueue封裝了一個PriorityQueue,這個PriorityQueue會對隊列中的ScheduledFutureTask進行排序。排序時,time小的排在前面(時間早的任務將被先執行)。如果兩個ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面(也就是說,如果兩個任務的執行時間相同,那麽先提交的任務將被先執行)。

FutureTask
Future接口和實現Future接口的FutureTask類,代表異步計算的結果。FutureTask可以處於下面3種狀態:
1)未啟動。FutureTask.run()方法還沒有被執行之前,FutureTask處於未啟動狀態。當創建一個FutureTask,且沒有執行FutureTask.run()方法之前,這個FutureTask處於未啟動狀態。
2)已啟動。FutureTask.run()方法被執行的過程中,FutureTask處於已啟動狀態。
3)已完成。FutureTask.run()方法執行完後正常結束,或被取消(FutureTask.cancel(…)),或執行FutureTask.run()方法時拋出異常而異常結束,FutureTask處於已完成狀態。

當FutureTask處於未啟動或已啟動狀態時,執行FutureTask.get()方法將導致調用線程阻塞;
當FutureTask處於已完成狀態時,執行FutureTask.get()方法將導致調用線程立即返回結果或拋出異常。
當FutureTask處於未啟動狀態時,執行FutureTask.cancel()方法將導致此任務永遠不會被執行;當FutureTask處於已啟動狀態時,執行FutureTask.cancel(true)方法將以中斷執行此任務線程的方式來試圖停止任務;當FutureTask處於已啟動狀態時,執行FutureTask.cancel(false)方法將不會對正在執行此任務的線程產生影響(讓正在執行的任務運行完成);當FutureTask處於已完成狀態時,執行FutureTask.cancel(…)方法將返回false。
技術分享圖片

FutureTask的實現也是基於AbstractQueuedSynchronizer(AQS)。

技術分享圖片

Sync是FutureTask的內部私有類,它繼承自AQS。創建FutureTask時會創建內部私有的成員對象Sync,FutureTask所有的的公有方法都直接委托給了內部私有的Sync。

Java線程池Executor框架詳解