1. 程式人生 > >Java多執行緒之Executor框架

Java多執行緒之Executor框架

在前面的這篇文章中介紹了執行緒池的相關知識,現在我們來看一下跟執行緒池相關的框架--Executor。

一.什麼是Executor

1.Executor框架的兩級排程模型

在HotSpot VM的執行緒模型中,Java執行緒(java.lang.Thread)被一對一對映為本地作業系統執行緒。Java執行緒啟動時會建立一個本地作業系統執行緒;當該Java執行緒終止時,這個作業系統執行緒也會被回收。作業系統會排程所有執行緒並將它們分配給可用的CPU。
在上層,Java多執行緒程式通常把應用分解為若干個任務,然後使用使用者級的排程器(Executor框架)將這些任務對映為固定數量的執行緒;在底層,作業系統核心將這些執行緒對映到硬體處理器上。

這種兩級排程模型的示意圖如圖10-1所示。從圖中可以看出,應用程式通過Executor框架控制上層的排程;而下層的排程由作業系統核心控制,下層的排程不受應用程式的控制。

2.Executor框架的結構與成員

1)Executor框架的結構
主要由3大部分組成:1.任務,包括被執行任務需要實現的介面:Runnable介面或Callable介面;2.任務的執行,包括任務執行機制的核心介面Executor,以及繼承自Executor的ExecutorService介面。Executor框架有兩個關鍵類實現ExecutorService介面(ThreadPoolExecutor和ScheduledThreadPoolExecutor);3.非同步計算的結果,包括介面Future和實現Future介面的FutureTask類。
Executor是一個介面,它是Executor框架的基礎,它將任務的提交與任務的執行分離開來。


ThreadPoolExecutor是執行緒池的核心實現類,用來執行被提交的任務。(可參照這篇)
ScheduledThreadPoolExecutor是一個實現類,可以在給定的延遲後執行命令,或者定期執行命令。它比Timer更靈活,功能更強大。
Future介面和實現Future介面的FutureTask類,代表非同步計算的結果。
Runnable介面和Callable介面的實現類,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執行。

2).使用說明:主執行緒首先要建立實現Runnable或者Callable介面的任務物件。工具類Executors可以把一個Runnable物件封裝為一個Callable物件

(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。
然後可以把Runnable物件直接交給ExecutorService執行(ExecutorService.execute(Runnable command));或者也可以把Runnable物件或Callable物件提交給ExecutorService執行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable<T>task))。
如果執行ExecutorService.submit(…),ExecutorService將返回一個實現Future介面的物件(到目前為止的JDK中,返回的是FutureTask物件)。由於FutureTask實現了Runnable,程式設計師也可以建立FutureTask,然後直接交給ExecutorService執行。
最後,主執行緒可以執行FutureTask.get()方法來等待任務執行完成。主執行緒也可以執行FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行。

2.Executor框架的成員介紹:

1).ThreadPoolExecutor,Executors可以建立三種類型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。下面分別介紹各自的特性和Executor框架各自建立的API。


SingleThreadExecutor:適用於需要保證順序地執行各個任務,並且在任意時間點,不會有多個執行緒的應用場景,即保證只有一個執行緒。

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

 FixedThreadPool:適用於為了滿足資源管理的需求,而需要限制當前執行緒數量的應用場景,它適用於負載比較重的伺服器。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

CachedThreadPool:是大小無界的執行緒池,適用於執行很多的短期非同步任務的小程式,或者是負載較輕的伺服器。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

2)ScheduledThreadPoolExecutor
Executors可以建立兩種型別的ScheduledThreadPoolExecutor,建立的相關API和它們的介紹主要如下:

ScheduledThreadPoolExecutor。包含若干個執行緒的ScheduledThreadPoolExecutor。適用於需要多個後臺執行緒執行週期任務,同時為了滿足資源管理的需求而需要限制後臺執行緒的數量的應用場景。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }


SingleThreadScheduledExecutor。只包含一個執行緒的ScheduledThreadPoolExecutor;適用於單個後臺執行緒執行週期任務,同時需要保證順序地執行各個任務的應用場景。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

3)Future介面

Future介面和實現Future介面的FutureTask類用來表示非同步計算的結果。當我們把Runnable介面或Callable介面的實現類提交(submit)給ThreadPoolExecutor或ScheduledThreadPoolExecutor時,ThreadPoolExecutor或ScheduledThreadPoolExecutor會向我們返回一個FutureTask物件。下面是對應的API。

<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)

這裡要注意:到目前最新的JDK8為止,Java通過上述API返回的是一個FutureTask物件。但從API可以看到,Java僅僅保證返回的是一個實現了Future介面的物件。在將來的JDK實現中,返回的可能不一定是FutureTask。
 4)Runnable介面和Callable介面

Runnable介面和Callable介面的實現類,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執行。它們之間的區別是Runnable不會返回結果,而Callable可以返回結果。Executors類還提供了一個可以把Runnable包裝成一個Callable的API。

//將Runnable 包裝為Callable
public static Callable<Object> callable(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<Object>(task, null);
    }

//將Runnable和待返回結果 包裝為Callable
public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

前面講過,當我們把一個Callable物件(比如上面的Callable1或Callable2)提交給ThreadPoolExecutor或ScheduledThreadPoolExecutor執行時,submit(…)會向我們返回一個FutureTask物件。我們可以執行FutureTask.get()方法來等待任務執行完成。當任務成功完成後FutureTask.get()將返回該任務的結果。例如,如果提交的是物件Callable1,FutureTask.get()方法將返回null;如果提交的是物件Callable2,FutureTask.get()方法將返回result物件。

二.Executor主要類詳細介紹

下面主要來介紹一下ThreadPoolExecutor,ScheduledThreadPoolExecutor和FutureTask。

1.ThreadPoolExecutor:參照這個

2.ScheduledThreadPoolExecutor:

繼承自ThreadPoolExecutor,主要用來在給定的延遲之後執行任務,或者定期執行任務。ScheduledThreadPoolExecutor的功能與Timer類似,但ScheduledThreadPoolExecutor功能更強大、更靈活。Timer對應的是單個後臺執行緒,而ScheduledThreadPoolExecutor可以在建構函式中指定多個對應的後臺執行緒數。

(1)ScheduledThreadPoolExecutor執行機制:

1)當呼叫ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue新增一個實現了RunnableScheduledFutur介面的ScheduledFutureTask。
2)執行緒池中的執行緒從DelayQueue中獲取ScheduledFutureTask,然後執行任務。

2)ScheduledThreadPoolExecutor的實現 

前面我們提到過,ScheduledThreadPoolExecutor會把待排程的任務(ScheduledFutureTask)放到一個DelayQueue中。
ScheduledFutureTask主要包含3個成員變數,如下。
·long型成員變數time,表示這個任務將要被執行的具體時間。
·long型成員變數sequenceNumber,表示這個任務被新增到ScheduledThreadPoolExecutor中的序號。
·long型成員變數period,表示任務執行的間隔週期。
DelayQueue封裝了一個PriorityQueue,這個PriorityQueue會對佇列中的ScheduledFutureTask進行排序。排序時,time小的排在前面(時間早的任務將被先執行)。如果兩個ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面(也就是說,如果兩個任務的執行時間相同,那麼先提交的任務將被先執行)。

首先,執行緒1從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務是指ScheduledFutureTask的time大於等於當前時間;然後,執行緒1執行這個ScheduledFutureTask;接著,執行緒1修改ScheduledFutureTask的time變數為下次將要被執行的時間;最後,執行緒1把這個修改time之後的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。


3.FutureTask

首先,要知道:Future介面和實現Future介面的FutureTask類,代表非同步計算的結果。
1.FutureTask簡介

FutureTask除了實現Future介面外,還實現了Runnable介面。因此,FutureTask可以交給Executor執行,也可以由呼叫執行緒直接執行(FutureTask.run())。根據FutureTask.run()方法被執行的時機,FutureTask可以處於下面3種狀態。
1)未啟動。FutureTask.run()方法還沒有被執行之前,FutureTask處於未啟動狀態。當建立一個FutureTask,且沒有執行FutureTask.run()方法之前,這個FutureTask處於未啟動狀態。
2)已啟動。FutureTask.run()方法被執行的過程中,FutureTask處於已啟動狀態。
3)已完成。FutureTask.run()方法執行完後正常結束,或被取消(FutureTask.cancel(…)),或執行FutureTask.run()方法時丟擲異常而異常結束,FutureTask處於已完成狀態。
當FutureTask處於未啟動或已啟動狀態時,執行FutureTask.get()方法將導致呼叫執行緒阻塞;當FutureTask處於已完成狀態時,執行FutureTask.get()方法將導致呼叫執行緒立即返回結果或丟擲異常。
當FutureTask處於未啟動狀態時,執行FutureTask.cancel()方法將導致此任務永遠不會被執行;當FutureTask處於已啟動狀態時,執行FutureTask.cancel(true)方法將以中斷執行此任務執行緒的方式來試圖停止任務;當FutureTask處於已啟動狀態時,執行FutureTask.cancel(false)方法將不會對正在執行此任務的執行緒產生影響(讓正在執行的任務執行完成);當FutureTask處於已完成狀態時,執行FutureTask.cancel(…)方法將返回false。

 

 

《Java併發程式設計的藝術》