1. 程式人生 > >【搞定Java併發程式設計】第28篇:Java中的執行緒池詳解

【搞定Java併發程式設計】第28篇:Java中的執行緒池詳解

上一篇:Java中的併發工具類之執行緒間交換資料的 Exchanger

本文目錄

1、執行緒池的實現原理 

2、執行緒池的使用

2.1、執行緒池的建立

2.2、向執行緒池提交任務

2.3、關閉執行緒池

2.4、合理地配置執行緒池

2.5、執行緒池的監控

3、執行緒池的原始碼解析

3.1、總覽

3.2、Executor 介面

3.3、ExecutorService 介面

3.4、FutureTask 類

3.5、AbstractExecutorService 抽象類

3.6、ThreadPoolExecutor 類【重要】

4、總結

【面試問題】


Java中的執行緒池是運用場景最多的併發框架,幾乎所有需要非同步或併發執行任務的程式都可以使用執行緒池。在開發過程中,合理地使用執行緒池能夠帶來三個好處:

第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗;

第二:提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行;

第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一分配、調優和監控。

1、執行緒池的實現原理 

當向執行緒池提交任務後,執行緒池會按下圖所示流程去處理這個任務:

執行緒池的主要處理流程

1、執行緒池判斷核心執行緒池裡的執行緒是否都在執行任務。如果不是,則建立一個新的工作執行緒來執行任務。如果核心執行緒池裡的執行緒都在執行任務,則進入下個流程。

2、執行緒池判斷工作佇列是否已經滿。如果工作佇列沒有滿,則將新提交的任務儲存在這個工作佇列裡。如果工作佇列滿了,則進入下個流程。

3、執行緒池判斷執行緒池的執行緒是否都處於工作狀態。如果沒有,則建立一個新的工作執行緒來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。

ThreadPoolExecutor執行execute()方法的示意圖如下:

ThreadPoolExecutor執行示意圖

根據上圖可以看出,ThreadPoolExector 執行 execute 方法分以下4種情況。

1、如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意,執行這一步驟需要獲取全域性鎖)。

2、如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。

3、如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(注意,執行這一步驟需要獲取全域性鎖)。

4、如果建立新執行緒將使當前執行的執行緒超出maximumPoolSize,任務將被拒絕,並呼叫RejectedExecutionHandler.rejectedExecution()方法。根據不同的拒絕策略去處理。

ThreadPoolExecutor採取上述步驟的總體設計思路,是為了在執行execute()方法時,儘可能地避免獲取全域性鎖(那將會是一個嚴重的可伸縮瓶頸)。

在ThreadPoolExecutor完成預熱之後(當前執行的執行緒數大於等於corePoolSize),幾乎所有的execute()方法呼叫都是執行步驟2,而步驟2不需要獲取全域性鎖。

工作執行緒:執行緒池建立執行緒時,會將執行緒封裝成工作執行緒Worker,Worker在執行完任務後,還會迴圈獲取工作佇列裡的任務來執行。


2、執行緒池的使用

2.1、執行緒池的建立

我們可以通過 ThreadPoolExecutor 來建立一個執行緒池。ThreadPoolExecutor有4個建構函式,簡單的看下:

// 建構函式1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {

    // ... 省略
}

// 建構函式2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    // ... 省略      
}

// 建構函式3
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    // ... 省略         
}    

// 建構函式4
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // ... 省略  
}

可以看出,建立一個執行緒池需要傳入以下幾個引數:

1、corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,如果當前 poolSize < corePoolSize 時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有基本執行緒。

2、maximumPoolSize(執行緒池最大數量):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是,如果使用了無界的任務佇列這個引數就沒什麼效果。

3、keepAliveTime(執行緒活動保持時間)執行緒池的工作執行緒空閒後,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高執行緒的利用率。

4、TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。

5、workQueue(任務佇列):用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列:

  • ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按FIFO(先進先出)原則對元素進行排序。
  • LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
  • PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。

6、threadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給執行緒池裡的執行緒設定有意義的名字,程式碼如下。

new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

7、RejectExecutionHandler(飽和策略):佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。在JDK 1.5中Java執行緒池框架提供了以下4種策略:

  1. AbortPolicy:直接丟擲異常。
  2. CallerRunsPolicy:只用呼叫者所線上程來執行任務。
  3. DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
  4. DiscardPolicy:不處理,丟棄掉。

當然,也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化儲存不能處理的任務。

2.2、向執行緒池提交任務

可以使用兩個方法向執行緒池提交任務,分別為 execute()submit() 方法。這兩個方法的區別是:execute用於提交不需要返回值的任務,submit()方法用於提交需要返回值的任務。

  • execute()

execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功。通過以下程式碼可知execute()方法輸入的任務是一個Runnable類的例項。

threadsPool.execute(new Runnable() {
    @Override
    public void run() {
        // ...
    }
});
  • submit()

submit()方法用於提交需要返回值的任務。執行緒池會返回一個Future型別的物件,通過這個future物件可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成,而使用 get(long timeout,TimeUnit unit) 方法則會阻塞當前執行緒一段時間後立即返回,這時候有可能任務沒有執行完。

Future<Object> future = executor.submit(haveReturnValuetask);
    try {
        Object s = future.get();
    } catch (InterruptedException e) {
        // 處理中斷異常
    } catch (ExecutionException e) {
        // 處理無法執行任務異常
    } finally {
        // 關閉執行緒池
        executor.shutdown();
}

2.3、關閉執行緒池

可以通過呼叫執行緒池的 shutdown shutdownNow 方法來關閉執行緒池。它們的原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的 interrupt 方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。

但是它們存在一定的區別,shutdownNow 首先將執行緒池的狀態設定成 STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表,而 shutdown 只是將執行緒池的狀態設定成 SHUTDOWN 狀態,然後中斷所有沒有正在執行任務的執行緒

只要呼叫了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫isTerminaed方法會返回true。

至於應該呼叫哪一種方法來關閉執行緒池,應該由提交到執行緒池的任務特性決定,通常調shutdown方法來關閉執行緒池,如果任務不一定要執行完,則可以呼叫shutdownNow方法。

2.4、合理地配置執行緒池

要想合理地配置執行緒池,就必須首先分析其任務特性,可以從以下幾個角度來分析:

1、任務的性質:CPU密集型任務、IO密集型任務、混合型任務;

2、任務的優先順序:高、中、低;

3、任務的執行時間:長、中、短;

4、任務的依賴性:是否依賴其他系統資源,如資料庫連線。

性質不同的任務可以用不同規模的執行緒池分開處理。

CPU密集型任務(需要給CPU更多的計算時間)應配置儘可能小的執行緒。

IO密集型任務執行緒並不是一直在執行任務,則應配置儘可能多的執行緒。

混合型的任務,如果可以將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是很大,那麼分解後執行的吞吐量將高於序列的吞吐量。

2.5、執行緒池的監控

如果在系統中大量使用執行緒池,則有必要對執行緒池進行監控,方便在出現問題時,可以根據執行緒池的使用狀況快速定位問題。可以通過執行緒池提供的引數進行監控,在監控執行緒池的時候可以使用以下屬性:

1、taskCount:執行緒池需要執行的任務數量;

2、completedTaskCount:執行緒池在執行過程中已完成的任務數量,小於或等於taskCount;

3、largestPoolSize:執行緒池裡曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否曾經滿過;

4、getPoolSize():執行緒池的執行緒數量。如果執行緒池不銷燬的話,執行緒池裡的執行緒不會自動銷燬,所以這個大小隻增不減。

5、getActiveCount():獲取活動的執行緒數。


3、執行緒池的原始碼解析

3.1、總覽

下圖是 Java 執行緒池幾個相關類的繼承結構:

圖片來自於: https://blog.csdn.net/a724888/article/details/69526087

先簡單說說這個繼承結構:

Executor 位於最頂層,也是最簡單的,就一個 execute(Runnable runnable) 介面方法定義。

ExecutorService 也是介面,在 Executor 介面的基礎上添加了很多的介面方法,所以一般來說我們會使用這個介面。

然後再下來一層是 AbstractExecutorService,從名字我們就知道,這是抽象類,這裡實現了非常有用的一些方法供子類直接使用,之後我們再細說。

然後才到我們的重點部分 ThreadPoolExecutor 類,這個類提供了關於執行緒池所需的非常豐富的功能。

另外,我們還涉及到下圖中的這些類:

同在併發包中的 Executors 類,類名中帶字母 s,我們猜到這個是工具類,裡面的方法都是靜態方法,如以下我們最常用的用於生成 ThreadPoolExecutor 的例項的一些方法:

public class Executors {

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 
                                      Integer.MAX_VALUE,
                                      60L, 
                                      TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, 
                                      nThreads,
                                      0L, 
                                      TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    // ...
}

另外,由於執行緒池支援獲取執行緒執行的結果,所以,引入了 Future 介面,RunnableFuture 繼承自此介面,然後我們最需要關心的就是它的實現類 FutureTask。到這裡,記住這個概念,線上程池的使用過程中,我們是往執行緒池提交任務(task)。使用過執行緒池的都知道,我們提交的每個任務是實現了 Runnable 介面的,其實就是先將 實現了 Runnable 介面的任務包裝成 FutureTask,然後再提交到執行緒池。這樣,讀者才能比較容易記住 FutureTask 這個類名:它首先是一個任務(Task),然後具有 Future 介面的語義,即可以在將來(Future)得到執行的結果。

當然,執行緒池中的 BlockingQueue 也是非常重要的概念,如果執行緒數達到 corePoolSize,我們的每個任務會提交到等待佇列中,等待執行緒池中的執行緒來取任務並執行。這裡的 BlockingQueue 通常我們使用其實現類 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每個實現類都有不同的特徵,使用場景之後會慢慢分析。想要詳細瞭解各個 BlockingQueue 的讀者,可以參考我的前面的一篇對 BlockingQueue 的各個實現類進行詳細分析的文章。

除了上面說的這些類外,還有一個很重要的類,就是定時任務實現類 ScheduledThreadPoolExecutor,它繼承自 ThreadPoolExecutor,用於實現定時執行。

3.2、Executor 介面

public interface Executor {
    void execute(Runnable command);
}

我們可以看到 Executor 介面非常簡單,就一個 void execute(Runnable command) 方法,代表提交一個任務。為了讓大家理解 Java 執行緒池的整個設計方案,我會按照 Doug Lea 的設計思路來多說一些相關的東西。

我們經常這樣啟動一個執行緒:

new Thread(new Runnable(){
  // do something
}).start();

用了執行緒池 Executor 後就可以像下面這麼使用:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

如果我們希望執行緒池同步執行每一個任務,我們可以這麼實現這個介面:

class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        r.run(); // 這裡不是用的new Thread(r).start(),也就是說沒有啟動任何一個新的執行緒。
    }
}

我們希望每個任務提交進來後,直接啟動一個新的執行緒來執行這個任務,我們可以這麼實現:

class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();  // 每個任務都用一個新的執行緒來執行
    }
}

我們再來看下怎麼組合兩個 Executor 來使用,下面這個實現是將所有的任務都加到一個 queue 中,然後從 queue 中取任務,交給真正的執行器執行,這裡採用 synchronized 進行併發控制:

class SerialExecutor implements Executor {
    // 任務佇列
    final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
    // 這個才是真正的執行器
    final Executor executor;
    // 當前正在執行的任務
    Runnable active;
 
    // 初始化的時候,指定執行器
    SerialExecutor(Executor executor) {
        this.executor = executor;
    }
 
    // 新增任務到執行緒池: 將任務新增到任務佇列,scheduleNext 觸發執行器去任務佇列取任務
    public synchronized void execute(final Runnable r) {
        tasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (active == null) {
            scheduleNext();
        }
    }
 
    protected synchronized void scheduleNext() {
        if ((active = tasks.poll()) != null) {
            // 具體的執行轉給真正的執行器 executor
            executor.execute(active);
        }
    }
}

當然了,Executor 這個介面只有提交任務的功能,太簡單了,我們想要更豐富的功能,比如我們想知道執行結果、我們想知道當前執行緒池有多少個執行緒活著、已經完成了多少任務等等,這些都是這個介面的不足的地方。接下來我們要介紹的是繼承自 Executor 介面的 ExecutorService 介面,這個介面提供了比較豐富的功能,也是我們最常使用到的介面。

3.3、ExecutorService 介面

一般我們定義一個執行緒池的時候,往往都是使用這個介面:

ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);

因為這個介面中定義的一系列方法大部分情況下已經可以滿足我們的需要了。那麼我們簡單初略地來看一下這個介面中都有哪些方法:

public interface ExecutorService extends Executor {
 
    // 關閉執行緒池,已提交的任務繼續執行,不接受繼續提交新任務
    void shutdown();
 
    // 關閉執行緒池,嘗試停止正在執行的所有任務,不接受繼續提交新任務
    // 它和前面的方法相比,加了一個單詞“now”,區別在於它會去停止當前正在進行的任務
    List<Runnable> shutdownNow();
 
    // 執行緒池是否已關閉
    boolean isShutdown();
 
    // 如果呼叫了 shutdown() 或 shutdownNow() 方法後,所有任務結束了,那麼返回true
    // 這個方法必須在呼叫shutdown或shutdownNow方法之後呼叫才會返回true
    boolean isTerminated();
 
    // 等待所有任務完成,並設定超時時間
    // 我們這麼理解,實際應用中是,先呼叫 shutdown 或 shutdownNow,
    // 然後再調這個方法等待所有的執行緒真正地完成,返回值意味著有沒有超時
    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
 
    // 提交一個 Callable 任務
    <T> Future<T> submit(Callable<T> task);
 
    // 提交一個 Runnable 任務,第二個引數將會放到 Future 中,作為返回值,
    // 因為 Runnable 的 run 方法本身並不返回任何東西
    <T> Future<T> submit(Runnable task, T result);
 
    // 提交一個 Runnable 任務
    Future<?> submit(Runnable task);
 
    // 執行所有任務,返回 Future 型別的一個 list
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
 
    // 也是執行所有任務,但是這裡設定了超時時間
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
            throws InterruptedException;
 
    // 只有其中的一個任務結束了,就可以返回,返回執行完的那個任務的結果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
 
    // 同上一個方法,只有其中的一個任務結束了,就可以返回,返回執行完的那個任務的結果,
    // 不過這個帶超時,超過指定的時間,丟擲 TimeoutException 異常
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

這些方法都很好理解,一個簡單的執行緒池主要就是這些功能,能提交任務,能獲取結果,能關閉執行緒池,這也是為什麼我們經常用這個介面的原因。

3.4、FutureTask 類

在繼續往下層介紹 ExecutorService 的實現類之前,我們先來說說相關的類 FutureTask。

Future        ->   RunnableFuture   -> FutureTask
Runnable   ->   RunnableFuture

FutureTask 通過 RunnableFuture 間接實現了 Runnable 介面,所以每個 Runnable 通常都先包裝成 FutureTask,然後呼叫 executor.execute(Runnable command) 將其提交給執行緒池。

我們知道,Runnable 的 void run() 方法是沒有返回值的,所以,通常,如果我們需要的話,會在 submit 中指定第二個引數作為返回值:

<T> Future<T> submit(Runnable task, T result);

其實到時候會通過這兩個引數,將其包裝成 Callable。Callable 也是因為執行緒池的需要,所以才有了這個介面。它和 Runnable 的區別在於:Runnable的 run() 沒有返回值,而 Callable 的 call() 方法有返回值,同時,如果執行出現異常,call() 方法會丟擲異常。

public interface Callable<V> {
 
    V call() throws Exception;
}

在這裡,就不展開說 FutureTask 類了,因為本文篇幅本來就夠大了,這裡我們需要知道怎麼用就行了。下面,我們來看看 ExecutorService 的抽象實現 AbstractExecutorService 

3.5、AbstractExecutorService 抽象類

AbstractExecutorService 抽象類派生自 ExecutorService 介面,然後在其基礎上實現了幾個實用的方法,這些方法提供給子類進行呼叫。

這個抽象類實現了 invokeAny 方法和 invokeAll 方法,這裡的兩個 newTaskFor() 方法也比較有用,用於將任務包裝成 FutureTask。定義於最上層介面 Executor 中的 void execute(Runnable command) ,由於不需要獲取結果,不會進行 FutureTask 的包裝。

需要獲取結果(FutureTask),用 submit 方法,不需要獲取結果,可以用 execute 方法。

下面,我將一行一行原始碼地來分析這個類,跟著原始碼來看看其實現吧:

Tips: invokeAny 和 invokeAll 方法佔了這整個類的絕大多數篇幅,讀者可以選擇適當跳過,因為它們可能在你的實踐中使用的頻次比較低,而且它們不帶有承前啟後的作用,不用擔心會漏掉什麼導致看不懂後面的程式碼。

public abstract class AbstractExecutorService implements ExecutorService {
 
    // RunnableFuture 是用於獲取執行結果的,我們常用它的子類 FutureTask
    // 下面兩個 newTaskFor 方法用於將我們的任務包裝成 FutureTask 提交到執行緒池中執行
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
 
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
 
    // 提交任務
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 1. 將任務包裝成 FutureTask
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 2. 交給執行器執行,execute 方法由具體的子類來實現
        // 前面也說了,FutureTask 間接實現了Runnable 介面。
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        // 1. 將任務包裝成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task, result);
        // 2. 交給執行器執行
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 1. 將任務包裝成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task);
        // 2. 交給執行器執行
        execute(ftask);
        return ftask;
    }
 
    // 此方法目的:將 tasks 集合中的任務提交到執行緒池執行,任意一個執行緒執行完後就可以結束了
    // 第二個引數 timed 代表是否設定超時機制,超時時間為第三個引數,
    // 如果 timed 為 true,同時超時了還沒有一個執行緒返回結果,那麼丟擲 TimeoutException 異常
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        // 任務數
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        // 
        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
 
        // ExecutorCompletionService 不是一個真正的執行器,引數 this 才是真正的執行器
        // 它對執行器進行了包裝,每個任務結束後,將結果儲存到內部的一個 completionQueue 佇列中
        // 這也是為什麼這個類的名字裡面有個 Completion 的原因吧。
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
        try {
            // 用於儲存異常資訊,此方法如果沒有得到任何有效的結果,那麼我們可以丟擲最後得到的一個異常
            ExecutionException ee = null;
            long lastTime = timed ? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();
 
            // 首先先提交一個任務,後面的任務到下面的 for 迴圈一個個提交
            futures.add(ecs.submit(it.next()));
            // 提交了一個任務,所以任務數量減 1
            --ntasks;
            // 正在執行的任務數(提交的時候 +1,任務結束的時候 -1)
            int active = 1;
 
            for (;;) {
                // ecs 上面說了,其內部有一個 completionQueue 用於儲存執行完成的結果
                // BlockingQueue 的 poll 方法不阻塞,返回 null 代表隊列為空
                Future<T> f = ecs.poll();
                // 為 null,說明剛剛提交的第一個執行緒還沒有執行完成
                // 在前面先提交一個任務,加上這裡做一次檢查,也是為了提高效能
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    // 這裡是 else if,不是 if。這裡說明,沒有任務了,同時 active 為 0 說明
                    // 任務都執行完成了。其實我也沒理解為什麼這裡做一次 break?
                    // 因為我認為 active 為 0 的情況,必然從下面的 f.get() 返回了
 
                    // 2018-02-23 感謝讀者 newmicro 的 comment,
                    //  這裡的 active == 0,說明所有的任務都執行失敗,那麼這裡是 for 迴圈出口
                    else if (active == 0)
                        break;
                    // 這裡也是 else if。這裡說的是,沒有任務了,但是設定了超時時間,這裡檢測是否超時
                    else if (timed) {
                        // 帶等待的 poll 方法
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        // 如果已經超時,丟擲 TimeoutException 異常,這整個方法就結束了
                        if (f == null)
                            throw new TimeoutException();
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    // 這裡是 else。說明,沒有任務需要提交,但是池中的任務沒有完成,還沒有超時(如果設定了超時)
                    // take() 方法會阻塞,直到有元素返回,說明有任務結束了
                    else
                        f = ecs.take();
                }
                /*
                 * 我感覺上面這一段並不是很好理解,這裡簡單說下。
                 * 1. 首先,這在一個 for 迴圈中,我們設想每一個任務都沒那麼快結束,
                 *     那麼,每一次都會進到第一個分支,進行提交任務,直到將所有的任務都提交了
                 * 2. 任務都提交完成後,如果設定了超時,那麼 for 迴圈其實進入了“一直檢測是否超時”
                       這件事情上
                 * 3. 如果沒有設定超時機制,那麼不必要檢測超時,那就會阻塞在 ecs.take() 方法上,
                       等待獲取第一個執行結果
                 * 4. 如果所有的任務都執行失敗,也就是說 future 都返回了,
                       但是 f.get() 丟擲異常,那麼從 active == 0 分支出去(感謝 newmicro 提出)
                         // 當然,這個需要看下面的 if 分支。
                 */
 
 
 
                // 有任務結束了
                if (f != null) {
                    --active;
                    try {
                        // 返回執行結果,如果有異常,都包裝成 ExecutionException
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }// 注意看 for 迴圈的範圍,一直到這裡
 
            if (ee == null)
                ee = new ExecutionException();
            throw ee;
 
        } finally {
            // 方法退出之前,取消其他的任務
            for (Future<T> f : futures)
                f.cancel(true);
        }
    }
 
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
 
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
 
    // 執行所有的任務,返回任務結果。
    // 先不要看這個方法,我們先想想,其實我們自己提交任務到執行緒池,也是想要執行緒池執行所有的任務
    // 只不過,我們是每次 submit 一個任務,這裡以一個集合作為引數提交
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            // 這個很簡單
            for (Callable<T> t : tasks) {
                // 包裝成 FutureTask
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                // 提交任務
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try {
                        // 這是一個阻塞方法,直到獲取到值,或丟擲了異常
                        // 這裡有個小細節,其實 get 方法簽名上是會丟擲 InterruptedException 的
                        // 可是這裡沒有進行處理,而是拋給外層去了。此異常發生於還沒執行完的任務被取消了
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            // 這個方法返回,不像其他的場景,返回 List<Future>,其實執行結果還沒出來
            // 這個方法返回是真正的返回,任務都結束了
            return futures;
        } finally {
            // 為什麼要這個?就是上面說的有異常的情況
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }
 
    // 帶超時的 invokeAll,我們找不同吧
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
 
            long lastTime = System.nanoTime();
 
            Iterator<Future<T>> it = futures.iterator();
            // 提交一個任務,檢測一次是否超時
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                // 超時
                if (nanos <= 0)
                    return futures;
            }
 
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        // 呼叫帶超時的 get 方法,這裡的引數 nanos 是剩餘的時間,
                        // 因為上面其實已經用掉了一些時間了
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }
 
}

到這裡,我們發現,這個抽象類包裝了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它們都沒有真正開啟執行緒來執行任務,它們都只是在方法內部呼叫了 execute 方法,所以最重要的 execute(Runnable runnable) 方法還沒出現,需要等具體執行器來實現這個最重要的部分,這裡我們要說的就是 ThreadPoolExecutor 類了。

3.6、ThreadPoolExecutor 類

ThreadPoolExecutor 繼承了 AbstractExecutorService 抽象類。 ThreadPoolExecutor 是 JDK 中的執行緒池實現,這個類實現了一個執行緒池需要的各個方法,它實現了任務提交、執行緒管理、監控等等方法。

我們可以基於它來進行業務上的擴充套件,以實現我們需要的其他功能,比如實現定時任務的類 ScheduledThreadPoolExecutor 就繼承自 ThreadPoolExecutor。當然,這不是本文關注的重點,下面,還是趕緊進行原始碼分析吧。

首先,我們來看看執行緒池實現中的幾個概念和處理流程。

我們先回顧下提交任務的幾個方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

一個最基本的概念是,submit 方法中,引數是 Runnable 型別(也有Callable 型別),這個引數不是用於 new Thread(runnable).start() 中的,此處的這個引數不是用於啟動執行緒的,這裡指的是任務,任務要做的事情是 run() 方法裡面定義的或 Callable 中的 call() 方法裡面定義的。

初學者往往會搞混這個,因為 Runnable 總是在各個地方出現,經常把一個 Runnable 包到另一個 Runnable 中。請把它想象成有個 Task 介面,這個接口裡面有一個 run() 方法(我想作者只是不想因為這個再定義一個完全可以用 Runnable 來代替的介面,Callable 的出現,完全是因為 Runnable 不能滿足需要)。

我們回過神來繼續往下看,我畫了一個簡單的示意圖來描述執行緒池中的一些主要的構件:

當然,上圖沒有考慮佇列是否有界,提交任務時佇列滿了怎麼辦?什麼情況下會建立新的執行緒?提交任務時執行緒池滿了怎麼辦?空閒執行緒怎麼關掉?這些問題下面我們會一一解決。

我們經常會使用 Executors 這個工具類來快速構造一個執行緒池,對於初學者而言,這種工具類是很有用的,開發者不需要關注太多的細節,只要知道自己需要一個執行緒池,僅僅提供必需的引數就可以了,其他引數都採用作者提供的預設值。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, 
                                  nThreads,
                                  0L, 
                                  TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, 
                                  Integer.MAX_VALUE,
                                  60L, 
                                  TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

這裡先不說有什麼區別,它們最終都會導向這個 ThreadPoolExecutor 構造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // 這幾個引數都是必須要有的
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
 
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

基本上,上面的構造方法中列出了我們最需要關心的幾個屬性了,下面逐個介紹下構造方法中出現的這幾個屬性:

  • corePoolSize

核心執行緒數,不要摳字眼,反正先記著有這麼個屬性就可以了。

  • maximumPoolSize

​最大執行緒數,執行緒池允許建立的最大執行緒數。

  • workQueue

任務佇列,BlockingQueue 介面的某個實現(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。

  • keepAliveTime

空閒執行緒的保活時間,如果某執行緒的空閒時間超過這個值都沒有任務給它做,那麼可以被關閉了。注意這個值並不會對所有執行緒起作用,如果執行緒池中的執行緒數少於等於核心執行緒數 corePoolSize,那麼這些執行緒不會因為空閒太長時間而被關閉,當然,也可以通過呼叫 allowCoreThreadTimeOut(true)使核心執行緒數內的執行緒也可以被回收。

  • threadFactory

用於生成執行緒,一般我們可以用預設的就可以了。通常,我們可以通過它將我們的執行緒的名字設定得比較可讀一些,如 Message-Thread-1, Message-Thread-2 類似這樣。

  • handler:

當執行緒池已經滿了,但是又有新的任務提交的時候,該採取什麼策略由這個來指定。有幾種方式可供選擇,像丟擲異常、直接拒絕然後返回等,也可以自己實現相應的介面實現自己的邏輯,這個之後再說。

除了上面幾個屬性外,我們再看看其他重要的屬性。

Doug Lea 採用一個 32 位的整數來存放執行緒池的狀態和當前池中的執行緒數,其中高 3 位用於存放執行緒池狀態,低 29 位表示執行緒數(即使只有 29 位,也已經不小了,大概 5 億多,現在還沒有哪個機器能起這麼多執行緒的吧)。我們知道,Java 語言在整數編碼上是統一的,都是採用補碼的形式,下面是簡單的移位操作和布林操作,都是挺簡單的。

 
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 
// 這裡 COUNT_BITS 設定為 29(32-3),意味著前三位用於存放執行緒狀態,後29位用於存放執行緒數
// 很多初學者很喜歡在自己的程式碼中寫很多 29 這種數字,或者某個特殊的字串,然後分佈在各個地方,這是非常糟糕的
private static final int COUNT_BITS = Integer.SIZE - 3;
 
// 000 11111111111111111111111111111
// 這裡得到的是 29 個 1,也就是說執行緒池的最大執行緒數是 2^29-1=536870911
// 以我們現在計算機的實際情況,這個數量還是夠用的
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
// 我們說了,執行緒池的狀態存放在高 3 位中
// 運算結果為 111跟29個0:111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;
 
// 將整數 c 的低 29 位修改為 0,就得到了執行緒池的狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 將整數 c 的高 3 為修改為 0,就得到了執行緒池中的執行緒數
private static int workerCountOf(int c)  { return c & CAPACITY; }
 
private static int ctlOf(int rs, int wc) { return rs | wc; }
 
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
 
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
 
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

上面就是對一個整數的簡單的位操作,幾個操作方法將會在後面的原始碼中一直出現,所以讀者最好把方法名字和其代表的功能記住,看原始碼的時候也就不需要來來回回翻了。

在這裡,介紹下執行緒池中的各個狀態和狀態變化的轉換過程:

  • RUNNING:這個沒什麼好說的,這是最正常的狀態:接受新的任務,處理等待佇列中的任務;
  • SHUTDOWN:不接受新的任務提交,但是會繼續處理等待佇列中的任務;
  • STOP:不接受新的任務提交,不再處理等待佇列中的任務,中斷正在執行任務的執行緒;
  • TIDYING:所有的任務都銷燬了,workCount 為 0。執行緒池的狀態在轉換為 TIDYING 狀態時,會執行鉤子方法 terminated();
  • TERMINATED:terminated() 方法結束後,執行緒池的狀態就會變成這個。

RUNNING 定義為 -1,SHUTDOWN 定義為 0,其他的都比 0 大,所以等於 0 的時候不能提交任務,大於 0 的話,連正在執行的任務也需要中斷。

看了這幾種狀態的介紹,讀者大體也可以猜到十之八九的狀態轉換了,各個狀態的轉換過程有以下幾種:

  • RUNNING -> SHUTDOWN:當呼叫了 shutdown() 後,會發生這個狀態轉換,這也是最重要的;
  • (RUNNING or SHUTDOWN) -> STOP:當呼叫 shutdownNow() 後,會發生這個狀態轉換,要清楚 shutDown() 和 shutDownNow() 的區別;
  • SHUTDOWN -> TIDYING:當任務佇列和執行緒池都清空後,會由 SHUTDOWN 轉換為 TIDYING;
  • STOP -> TIDYING:當任務佇列清空後,發生這個轉換;
  • TIDYING -> TERMINATED:這個前面說了,當 terminated() 方法結束後。

上面的幾個記住核心的就可以了,尤其第一個和第二個。

另外,我們還要看看一個內部類 Worker,因為 Doug Lea 把執行緒池中的執行緒包裝成了一個個 Worker,翻譯成工人,就是執行緒池中做任務的執行緒。所以到這裡,我們知道任務是 Runnable(內部叫 task 或 command),執行緒是 Worker。

Worker 這裡又用到了抽象類 AbstractQueuedSynchronizer。題外話,AQS 在併發中真的是到處出現,而且非常容易使用,寫少量的程式碼就能實現自己需要的同步方式(對 AQS 原始碼感興趣的讀者請參看我之前寫的幾篇文章)。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    private static final long serialVersionUID = 6138294804551838833L;
 
    // 這個是真正的執行緒,任務靠你啦
    final Thread thread;
 
    // 前面說了,這裡的 Runnable 是任務。為什麼叫 firstTask?因為在建立執行緒的時候,如果同時指定了
    // 這個執行緒起來以後需要執行的第一個任務,那麼第一個任務就是存放在這裡的(執行緒可不止執行這一個任務)
    // 當然了,也可以為 null,這樣執行緒起來了,自己到任務佇列(BlockingQueue)中取任務(getTask 方法)就行了
    Runnable firstTask;
 
    // 用於存放此執行緒完全的任務數,注意了,這裡用了 volatile,保證可見性
    volatile long completedTasks;
 
    // Worker 只有這一個構造方法,傳入 firstTask,也可以傳 null
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 呼叫 ThreadFactory 來建立一個新的執行緒
        this.thread = getThreadFactory().newThread(this);
    }
 
    // 這裡呼叫了外部類的 runWorker 方法
    public void run() {
        runWorker(this);
    }
 
    ...// 其他幾個方法沒什麼好看的,就是用 AQS 操作,來獲取這個執行緒的執行權,用了獨佔鎖
}

前面雖然囉嗦,但是簡單。有了上面的這些基礎後,我們終於可以看看 ThreadPoolExecutor 的 execute 方法了,前面原始碼分析的時候也說了,各種方法都最終依賴於 execute 方法:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
 
    // 前面說的那個表示 “執行緒池狀態” 和 “執行緒數” 的整數
    int c = ctl.get();
 
    // 如果當前執行緒數少於核心執行緒數,那麼直接新增一個 worker 來執行任務,
    // 建立一個新的執行緒,並把當前任務 command 作為這個執行緒的第一個任務(firstTask)
    if (workerCountOf(c) < corePoolSize) {
        // 新增任務成功,那麼就結束了。提交任務嘛,執行緒池已經接受了這個任務,這個方法也就可以返回了
        // 至於執行的結果,到時候會包裝到 FutureTask 中。
        // 返回 false 代表執行緒池不允許提交任務
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 到這裡說明,要麼當前執行緒數大於等於核心執行緒數,要麼剛剛 addWorker 失敗了
 
    // 如果執行緒池處於 RUNNING 狀態,把這個任務新增到任務佇列 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
        /* 這裡面說的是,如果任務進入了 workQueue,我們是否需要開啟新的執行緒
         * 因為執行緒數在 [0, corePoolSize) 是無條件開啟新的執行緒
         * 如果執行緒數已經大於等於 corePoolSize,那麼將任務新增到佇列中,然後進到這裡
         */
        int recheck = ctl.get();
        // 如果執行緒池已不處於 RUNNING 狀態,那麼移除已經入隊的這個任務,並且執行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果執行緒池還是 RUNNING 的,並且執行緒數為 0,那麼開啟新的執行緒
        // 到這裡,我們知道了,這塊程式碼的真正意圖是:擔心任務提交到佇列中了,但是執行緒都關閉了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果 workQueue 佇列滿了,那麼進入到這個分支
    // 以 maximumPoolSize 為界建立新的 worker,
    // 如果失敗,說明當前執行緒數已經達到 maximumPoolSize,執行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

注意:

對建立執行緒的錯誤理解:如果執行緒數少於 corePoolSize,建立一個執行緒,如果執行緒數在 [corePoolSize, maximumPoolSize] 之間那麼可以建立執行緒或複用空閒執行緒,keepAliveTime 對這個區間的執行緒有效。

從上面的幾個分支,我們