1. 程式人生 > >Java併發程式設計的藝術之十----Executor框架

Java併發程式設計的藝術之十----Executor框架

1.Executor框架

1.1兩級排程模型

Java執行緒啟動時候會建立一個本地作業系統執行緒,當該java執行緒終止時,這個作業系統執行緒也會被回收。作業系統會排程所有執行緒並分配cpu。

上層,多執行緒程式通常把應用分解成若干個任務,然後Executor將任務對映為固定數量的執行緒,底層,系統核心將執行緒對映到cpu處理器上。應用程式通過Executor控制上層排程,而下層通過作業系統核心控制。

1.2Executor框架的結構及成員

①Executor的結構

任務:被執行任務需要實現的介面:Runnable介面或Callable介面

任務的執行:核心介面Executor

,以及繼承自ExecutorExecutorService介面。Executor框架有兩個關鍵類實現了ExecutorService介面(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。

非同步計算的結果。包括介面Future和實現Future介面的FutureTask類。

 

·Executor是一個介面,它是Executor框架的基礎,它將任務的提交與任務的執行分離開來。

·ThreadPoolExecutor是執行緒池的核心實現類,用來執行被提交的任務

·ScheduledThreadPoolExecutor是一個實現類,可以在給定的延遲後

執行命令,或者定期執

行命令。ScheduledThreadPoolExecutor比Timer更靈活,功能更強大

·Future介面和實現Future介面的FutureTask類,代表非同步計算的結果。

·Runnable介面和Callable介面的實現類,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor執行。

 

主執行緒首先要建立實現Runnable或者Callable介面的任務物件。然後可以把Runnable物件直接交給ExecutorService執行(ExecutorService.execute

(Runnablecommand));或者也可以把Runnable物件或Callable物件提交給ExecutorService執行(Executor-Service.submit(Runnable task)或ExecutorService.submit(Callable<T>task))。如果執行ExecutorService.submit(…),ExecutorService將返回一個實現Future介面的物件(到目前為止的JDK中,返回的是FutureTask物件)。最後,主執行緒可以執行FutureTask.get()方法來等待任務執行完成。主執行緒也可以執行FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行

②Executor框架的主要成員

ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future介面、Runnable介面、Callable介面和Executors

(1)ThreadPoolExecutor

ThreadPoolExecutor通常使用工廠類Executors來建立。

1. FixedThreadPool,適用於為了滿足資源管理的需求,而需要限制當前執行緒數量的應用場

,它適用於負載比較重的伺服器。

2. SingleThreadExecutor。適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個執行緒是活動的應用場景。

3. CachedThreadPool。大小無界的執行緒池,適用於執行很多的短期非同步任務的小程式,或者是負載較輕的伺服器。

(2)ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor通常使用工廠類Executors來建立。

1. ScheduledThreadPoolExecutor。適用於需要多個後臺執行緒執行週期任務,同時為了滿足資源管理的需求需要限制後臺執行緒的數量的應用場景

2. SingleThreadScheduledExecutor適用於需要單個後臺執行緒執行週期任務,同時需要保證順

序地執行各個任務的應用場景。

(3)Future介面

Future介面和實現Future介面的FutureTask類用來表示非同步計算的結果。

Runnable介面或Callable介面的實現類提交(submit)給ThreadPoolExecutor或ScheduledThreadPoolExecutor時,ThreadPoolExecutor或ScheduledThreadPoolExecutor會向我們返回一個FutureTask物件

(4)Runnable介面和Callable介面

Runnable介面和Callable介面的實現類,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor執行。它們之間的區別是Runnable不會返回結果,而Callable可以返回結果。

2. ThreadPoolExecutor詳解

2.1FixedThreadPool詳解

FixedThreadPool被稱為可重用固定執行緒數的執行緒池。

FixedThreadPool的corePoolSize和maximumPoolSize都被設定為建立FixedThreadPool時指定的引數nThreads,指定的容量。

當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime為多餘的空閒執行緒等待新任務的

最長時間,超過這個時間後多餘的執行緒將被終止。這裡把keepAliveTime設定為0L,意味著多餘的空閒執行緒會被立即終止

1)如果當前執行的執行緒數少於corePoolSize,則建立新執行緒來執行任務。

2)線上程池完成預熱之後(當前執行的執行緒數等於corePoolSize),將任務加入LinkedBlockingQueue。

3)執行緒執行完1中的任務後,會在迴圈中反覆從LinkedBlockingQueue獲取任務來執行。

FixedThreadPool使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列:

缺點:達到corepoolsize後,新任務在無界佇列中等待,執行緒池中的執行緒數不會超過corepoolsize。maximumPoolSize將是一個無效引數。FixedThreadPool不會進行飽和策略

2.2SingleThreadExecutor詳解

使用單個worker執行緒的Executor。

SingleThreadExecutor的corePoolSize和maximumPoolSize被設定為1。其他引數與FixedThreadPool相同。SingleThreadExecutor使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為Integer.MAX_VALUE)。

當前執行執行緒少於corePoolSize,建立一個新執行緒執行任務,如果已經有一個任務在執行,將任務加入無界佇列,執行完任務後,無限迴圈反覆從無界佇列獲取任務

2.3CachedThreadPool詳解

CachedThreadPoolcorePoolSize被設定為0,即corePool為空;maximumPoolSize被設定為Integer.MAX_VALUE,即maximumPool是無界的。這裡把keepAliveTime設定為60L,意味著CachedThreadPool中的空閒執行緒等待新任務的最長時間為60秒,空閒執行緒超過60秒後將會被終止

CachedThreadPool使用沒有容量的SynchronousQueue作為執行緒池的工作佇列,但CachedThreadPool的maximumPool是無界的。這意味著,如果主執行緒提交任務的速度高於maximumPool中執行緒處理任務的速度時,CachedThreadPool會不斷建立新執行緒。極端情況下,CachedThreadPool會因為建立過多執行緒而耗盡CPU和記憶體資源

1)首先執行SynchronousQueue.offer。如果當前maximumPool中有空閒執行緒正在執行SynchronousQueue.poll,那麼主執行緒執行offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行,execute()方法執行完成;否則執行下面的步驟2)。

2)沒有執行緒執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,步驟1)將失敗。此時CachedThreadPool會建立一個新執行緒執行任務,execute()方法執行完成。

3)在步驟2)中新建立的執行緒將任務執行完後,會執行SynchronousQueue.poll。這個poll操作會讓空閒執行緒最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內主執行緒提交了一個新任務(主執行緒執行步驟1)),那麼這個空閒執行緒將執行主執行緒提交的新任務;否則,這個空閒執行緒將終止。由於空閒60秒的空閒執行緒會被終止,因此長時間保持空閒的CachedThreadPool不會使用任何資源。

3. ScheduledThreadPoolExecutor詳解

-------------------以後整理

4. FutureTask詳解

Future介面和實現Future介面的FutureTask類,代表非同步計算的結果

4.1FutureTask簡介

FutureTask除了實現Future介面外,還實現了Runnable介面。因此,FutureTask可以交給Executor執行,也可以由呼叫執行緒直接執行(FutureTask.run())

FutureTask的3種狀態

1)未啟動。FutureTask.run()方法還沒有被執行之前,FutureTask處於未啟動狀態。

2)已啟動。FutureTask.run()方法被執行的過程中,FutureTask處於已啟動狀態

3)已完成。FutureTask.run()方法執行完後正常結束,或被取消(FutureTask.cancel(…)),或執行FutureTask.run()方法時丟擲異常而異常結束,FutureTask處於已完成狀態

當FutureTask處於未啟動或已啟動狀態時,執行FutureTask.get()方法將導致呼叫執行緒阻塞;當FutureTask處於已完成狀態時,執行FutureTask.get()方法將導致呼叫執行緒立即返回結果或丟擲異常。

4.2FutureTask的使用

可以把FutureTask交給Executor執行;也可以通過ExecutorService.submit(…)方法返回一個FutureTask,然後執行FutureTask.get()方法或FutureTask.cancel(…)方法。當一個執行緒需要等待另一個執行緒把某個任務執行完後它才能繼續執行,此時可以使用FutureTask。假設有多個執行緒執行若干任務,每個任務最多隻能被執行一次當多個執行緒試圖同時執行同一個任務時,只允許一個執行緒執行任務,其他執行緒需要等待這個任務執行完後才能繼續執行。

4.3FutureTask的實現

基於AbstractQueuedSynchronizer(以下簡稱為AQS)它提供通用機制來原子性管理同步狀態、阻塞和喚醒執行緒,以及維護被阻塞執行緒的佇列。

每一個基於AQS實現的同步器都會包含兩種型別的操作

·至少一個acquire操作。這個操作阻塞呼叫執行緒,除非/直到AQS的狀態允許這個執行緒繼續

執行。FutureTask的acquire操作為get()/get(long timeout,TimeUnit unit)方法呼叫。

·至少一個release操作。這個操作改變AQS的狀態,改變後的狀態可允許一個或多個阻塞

執行緒被解除阻塞FutureTask的release操作包括run()方法和cancel(…)方法。

FutureTask包含內部類sync,sync實現了AQS的tryAquireShared(int)方法和tryReleaseShared(int)方法,Sync通過這兩個方法來檢查和更新同步狀態。

FutureTask.get()方法會呼叫AQS.acquireSharedInterruptibly(int arg)方法:

1)呼叫AQS.acquireSharedInterruptibly(int arg)方法,這個方法首先會回撥在子類Sync中實現的tryAcquireShared()方法來判斷acquire操作是否可以成功。acquire操作可以成功的條件為:state為執行完成狀態RAN或已取消狀態CANCELLED且runner不為null。

2)如果成功則get()方法立即返回。如果失敗則到執行緒等待佇列中去等待其他執行緒執行release操作

3)當其他執行緒執行release操作(比如FutureTask.run()或FutureTask.cancel(…))喚醒當前執行緒後,當前執行緒再次執行tryAcquireShared()將返回正值1,當前執行緒將離開執行緒等待佇列並喚醒它的後繼執行緒(這裡會產生級聯喚醒的效果,後面會介紹)。

4)最後返回計算的結果或丟擲異常

 

FutureTask.run()的執行過程如下

1)執行在建構函式中指定的任務(Callable.call())。

2)以原子方式來更新同步狀態(呼叫AQS.compareAndSetState(int expect,int update),設定state為執行完成狀態RAN)。如果這個原子操作成功,就設定代表計算結果的變數result的值為Callable.call()的返回值,然後呼叫AQS.releaseShared(int arg)

3)AQS.releaseShared(int arg)首先會回撥在子類Sync中實現的tryReleaseShared(arg)來執行release操作(設定執行任務的執行緒runner為null,然會返回true);AQS.releaseShared(int arg),然後喚醒執行緒等待佇列中的第一個執行緒

4)呼叫FutureTask.cancel()。

當執行FutureTask.get()方法時,如果FutureTask不是處於執行完成狀態RAN或已取消狀態

CANCELLED,當前執行執行緒將到AQS的執行緒等待佇列中等待(見下圖的執行緒A、B、C和D)。當某個執行緒執行FutureTask.run()方法或FutureTask.cancel(...)方法時,會喚醒執行緒等待佇列的第一個執行緒(見圖10-16所示的執行緒E喚醒執行緒A)。

當執行緒E執行run()方法時,會喚醒佇列中的第一個執行緒A。執行緒A被喚醒後,首先把自己從佇列中刪除,然後喚醒它的後繼執行緒B,最後執行緒A從get()方法返回。執行緒B、C和D重複A執行緒的處理流程。最終,在佇列中等待的所有執行緒都被級聯喚醒並從get()方法返回。