1. 程式人生 > >深度解讀 java 線程池設計思想及源碼實現

深度解讀 java 線程池設計思想及源碼實現

blog execute 進行 第一個 組合 main cit ria 等待

我相信大家都看過很多的關於線程池的文章,基本上也是面試必問的,好像我寫這篇文章其實是沒有什麽意義的,不過,我相信你也和我一樣,看了很多文章還是一知半解,甚至可能看了很多瞎說的文章。希望大家看過這篇文章以後,就可以完全掌握 Java 線程池了。

我發現好些人都是因為這篇文章來到本站的,希望這篇讓人留下第一眼印象的文章能給你帶來收獲。

本文一大重點是源碼解析,不過線程池設計思想以及作者實現過程中的一些巧妙用法是我想傳達給讀者的。本文還是會一行行關鍵代碼進行分析,目的是為了讓那些自己看源碼不是很理解的同學可以得到參考。

線程池是非常重要的工具,如果你要成為一個好的工程師,還是得比較好地掌握這個知識。即使你為了謀生,也要知道,這基本上是面試必問的題目,而且面試官很容易從被面試者的回答中捕捉到被面試者的技術水平。

本文略長,建議在 pc 上閱讀,邊看文章邊翻源碼(Java7 和 Java8 都一樣),建議想好好看的讀者抽出至少 15 至 30 分鐘的整塊時間來閱讀。當然,如果讀者僅為面試準備,可以直接滑到最後的總結部分。

目錄

總覽
Executor 接口
ExecutorService
FutureTask
AbstractExecutorService
ThreadPoolExecutor
Executors
總結
總覽
開篇來一些廢話。下圖是 java 線程池幾個相關類的繼承結構:

技術分享圖片

先簡單說說這個繼承結構,Executor 位於最頂層,也是最簡單的,就一個 execute(Runnable runnable) 接口方法定義。

ExecutorService 也是接口,在 Executor 接口的基礎上添加了很多的接口方法,所以一般來說我們會使用這個接口。

然後再下來一層是 AbstractExecutorService,從名字我們就知道,這是抽象類,這裏實現了非常有用的一些方法供子類直接使用,之後我們再細說。

然後才到我們的重點部分 ThreadPoolExecutor 類,這個類提供了關於線程池所需的非常豐富的功能。

另外,我們還涉及到下圖中的這些類:
技術分享圖片
同在並發包中的 Executors 類,類名中帶字母 s,我們猜到這個是工具類,裏面的方法都是靜態方法,如以下我們最常用的用於生成 ThreadPoolExecutor 的實例的一些方法:

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
另外,由於線程池支持獲取線程執行的結果,所以,引入了 Future 接口,RunnableFuture 繼承自此接口,然後我們最需要關心的就是它的實現類 FutureTask。到這裏,記住這個概念,在線程池的使用過程中,我們是往線程池提交任務(task),使用過線程池的都知道,我們提交的每個任務是實現了 Runnable 接口的,其實就是先將 Runnable 的任務包裝成 FutureTask,然後再提交到線程池。這樣,讀者才能比較容易記住 FutureTask 這個類名:它首先是一個任務(Task),然後具有 Future 接口的語義,即可以在將來(Future)得到執行的結果。

當然,線程池中的 BlockingQueue 也是非常重要的概念,如果線程數達到 corePoolSize,我們的每個任務會提交到等待隊列中,等待線程池中的線程來取任務並執行。這裏的 BlockingQueue 通常我們使用其實現類 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每個實現類都有不同的特征,使用場景之後會慢慢分析。想要詳細了解各個 BlockingQueue 的讀者,可以參考我的前面的一篇對 BlockingQueue 的各個實現類進行詳細分析的文章。

把事情說完整:除了上面說的這些類外,還有一個很重要的類,就是定時任務實現類 ScheduledThreadPoolExecutor,它繼承自本文要重點講解的 ThreadPoolExecutor,用於實現定時執行。不過本文不會介紹它的實現,我相信讀者看完本文後可以比較容易地看懂它的源碼。

以上就是本文要介紹的知識,廢話不多說,開始進入正文。

Executor 接口
/*

  • @since 1.5
  • @author Doug Lea
    */
    public interface Executor {
    void execute(Runnable command);
    }
    我們可以看到 Executor 接口非常簡單,就一個 void execute(Runnable command) 方法,代表提交一個任務。為了讓大家理解 java 線程池的整個設計方案,我會按照 Doug Lea 的設計思路來多說一些相關的東西。

我們經常這樣啟動一個線程:

new Thread(new Runnable(){
// do something
}).start();
用了線程池 Executor 後就可以像下面這麽使用:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
如果我們希望線程池同步執行每一個任務,我們可以這麽實現這個接口:

class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();// 這裏不是用的new Thread(r).start(),也就是說沒有啟動任何一個新的線程。
}
}
我們希望每個任務提交進來後,直接啟動一個新的線程來執行這個任務,我們可以這麽實現:

class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start(); // 每個任務都用一個新的線程來執行
}
}
我們再來看下怎麽組合兩個 Executor 來使用,下面這個實現是將所有的任務都加到一個 queue 中,然後從 queue 中取任務,交給真正的執行器執行,這裏采用 synchronized 進行並發控制:

class SerialExecutor implements Executor {
// 任務隊列
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
// 這個才是真正的執行器
final Executor executor;
// 當前正在執行的任務
Runnable active;

// 初始化的時候,指定執行器
SerialExecutor(Executor executor) {
    this.executor = executor;
}

// 添加任務到線程池: 將任務添加到任務隊列,scheduleNext 觸發執行器去任務隊列取任務
public synchronized void execute(final Runnable r) {
    tasks.offer(new Runnable() {
        public void run() {
            try {
                r.run();
            } finally {
                scheduleNext();
            }
        }
    });
    if (active == null) {
        scheduleNext();
    }
}

protected synchronized void scheduleNext() {
    if ((active = tasks.poll()) != null) {
        // 具體的執行轉給真正的執行器 executor
        executor.execute(active);
    }
}

}
當然了,Executor 這個接口只有提交任務的功能,太簡單了,我們想要更豐富的功能,比如我們想知道執行結果、我們想知道當前線程池有多少個線程活著、已經完成了多少任務等等,這些都是這個接口的不足的地方。接下來我們要介紹的是繼承自 Executor 接口的 ExecutorService 接口,這個接口提供了比較豐富的功能,也是我們最常使用到的接口。

ExecutorService
一般我們定義一個線程池的時候,往往都是使用這個接口:

ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);
因為這個接口中定義的一系列方法大部分情況下已經可以滿足我們的需要了。

那麽我們簡單初略地來看一下這個接口中都有哪些方法:

public interface ExecutorService extends Executor {

// 關閉線程池,已提交的任務繼續執行,不接受繼續提交新任務
void shutdown();

// 關閉線程池,嘗試停止正在執行的所有任務,不接受繼續提交新任務
// 它和前面的方法相比,加了一個單詞“now”,區別在於它會去停止當前正在進行的任務
List<Runnable> shutdownNow();

// 線程池是否已關閉
boolean isShutdown();

// 如果調用了 shutdown() 或 shutdownNow() 方法後,所有任務結束了,那麽返回true
// 這個方法必須在調用shutdown或shutdownNow方法之後調用才會返回true
boolean isTerminated();

// 等待所有任務完成,並設置超時時間
// 我們這麽理解,實際應用中是,先調用 shutdown 或 shutdownNow,
// 然後再調這個方法等待所有的線程真正地完成,返回值意味著有沒有超時
boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

// 提交一個 Callable 任務
<T> Future<T> submit(Callable<T> task);

// 提交一個 Runnable 任務,第二個參數將會放到 Future 中,作為返回值,
// 因為 Runnable 的 run 方法本身並不返回任何東西
<T> Future<T> submit(Runnable task, T result);

// 提交一個 Runnable 任務
Future<?> submit(Runnable task);

// 執行所有任務,返回 Future 類型的一個 list
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

// 也是執行所有任務,但是這裏設置了超時時間
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
        throws InterruptedException;

// 只有其中的一個任務結束了,就可以返回,返回執行完的那個任務的結果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

// 同上一個方法,只有其中的一個任務結束了,就可以返回,返回執行完的那個任務的結果,
// 不過這個帶超時,超過指定的時間,拋出 TimeoutException 異常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

}
這些方法都很好理解,一個簡單的線程池主要就是這些功能,能提交任務,能獲取結果,能關閉線程池,這也是為什麽我們經常用這個接口的原因。

FutureTask
在繼續往下層介紹 ExecutorService 的實現類之前,我們先來說說相關的類 FutureTask。

Future -> RunnableFuture -> FutureTask
Runnable -> RunnableFuture

FutureTask 通過 RunnableFuture 間接實現了 Runnable 接口,
所以每個 Runnable 通常都先包裝成 FutureTask,
然後調用 executor.execute(Runnable command) 將其提交給線程池
我們知道,Runnable 的 void run() 方法是沒有返回值的,所以,通常,如果我們需要的話,會在 submit 中指定第二個參數作為返回值:

<T> Future<T> submit(Runnable task, T result);
其實到時候會通過這兩個參數,將其包裝成 Callable。

Callable 也是因為線程池的需要,所以才有了這個接口。它和 Runnable 的區別在於 run() 沒有返回值,而 Callable 的 call() 方法有返回值,同時,如果運行出現異常,call() 方法會拋出異常。

public interface Callable<V> {

V call() throws Exception;

}
在這裏,就不展開說 FutureTask 類了,因為本文篇幅本來就夠大了,這裏我們需要知道怎麽用就行了。

下面,我們來看看 ExecutorService 的抽象實現 AbstractExecutorService 。

AbstractExecutorService
AbstractExecutorService 抽象類派生自 ExecutorService 接口,然後在其基礎上實現了幾個實用的方法,這些方法提供給子類進行調用。

這個抽象類實現了 invokeAny 方法和 invokeAll 方法,這裏的兩個 newTaskFor 方法也比較有用,用於將任務包裝成 FutureTask。定義於最上層接口 Executor中的 void execute(Runnable command) 由於不需要獲取結果,不會進行 FutureTask 的包裝。

需要獲取結果(FutureTask),用 submit 方法,不需要獲取結果,可以用 execute 方法。

下面,我將一行一行源碼地來分析這個類,跟著源碼來看看其實現吧:

Tips: invokeAny 和 invokeAll 方法占了這整個類的絕大多數篇幅,讀者可以選擇適當跳過,因為它們可能在你的實踐中使用的頻次比較低,而且它們不帶有承前啟後的作用,不用擔心會漏掉什麽導致看不懂後面的代碼。

public abstract class AbstractExecutorService implements ExecutorService {

// RunnableFuture 是用於獲取執行結果的,我們常用它的子類 FutureTask
// 下面兩個 newTaskFor 方法用於將我們的任務包裝成 FutureTask 提交到線程池中執行
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

// 提交任務
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // 1. 將任務包裝成 FutureTask
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    // 2. 交給執行器執行,execute 方法由具體的子類來實現
    // 前面也說了,FutureTask 間接實現了Runnable 接口。
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    // 1. 將任務包裝成 FutureTask
    RunnableFuture<T> ftask = newTaskFor(task, result);
    // 2. 交給執行器執行
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // 1. 將任務包裝成 FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    // 2. 交給執行器執行
    execute(ftask);
    return ftask;
}

// 此方法目的:將 tasks 集合中的任務提交到線程池執行,任意一個線程執行完後就可以結束了
// 第二個參數 timed 代表是否設置超時機制,超時時間為第三個參數,
// 如果 timed 為 true,同時超時了還沒有一個線程返回結果,那麽拋出 TimeoutException 異常
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                        boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)
        throw new NullPointerException();
    // 任務數
    int ntasks = tasks.size();
    if (ntasks == 0)
        throw new IllegalArgumentException();
    // 
    List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);

    // ExecutorCompletionService 不是一個真正的執行器,參數 this 才是真正的執行器
    // 它對執行器進行了包裝,每個任務結束後,將結果保存到內部的一個 completionQueue 隊列中
    // 這也是為什麽這個類的名字裏面有個 Completion 的原因吧。
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);
    try {
        // 用於保存異常信息,此方法如果沒有得到任何有效的結果,那麽我們可以拋出最後得到的一個異常
        ExecutionException ee = null;
        long lastTime = timed ? System.nanoTime() : 0;
        Iterator<? extends Callable<T>> it = tasks.iterator();

        // 首先先提交一個任務,後面的任務到下面的 for 循環一個個提交
        futures.add(ecs.submit(it.next()));
        // 提交了一個任務,所以任務數量減 1
        --ntasks;
        // 正在執行的任務數(提交的時候 +1,任務結束的時候 -1)
        int active = 1;

        for (;;) {
            // ecs 上面說了,其內部有一個 completionQueue 用於保存執行完成的結果
            // BlockingQueue 的 poll 方法不阻塞,返回 null 代表隊列為空
            Future<T> f = ecs.poll();
            // 為 null,說明剛剛提交的第一個線程還沒有執行完成
            // 在前面先提交一個任務,加上這裏做一次檢查,也是為了提高性能
            if (f == null) {
                if (ntasks > 0) {
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                // 這裏是 else if,不是 if。這裏說明,沒有任務了,同時 active 為 0 說明
                // 任務都執行完成了。其實我也沒理解為什麽這裏做一次 break?
                // 因為我認為 active 為 0 的情況,必然從下面的 f.get() 返回了

                // 2018-02-23 感謝讀者 newmicro 的 comment,
                //  這裏的 active == 0,說明所有的任務都執行失敗,那麽這裏是 for 循環出口
                else if (active == 0)
                    break;
                // 這裏也是 else if。這裏說的是,沒有任務了,但是設置了超時時間,這裏檢測是否超時
                else if (timed) {
                    // 帶等待的 poll 方法
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    // 如果已經超時,拋出 TimeoutException 異常,這整個方法就結束了
                    if (f == null)
                        throw new TimeoutException();
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
                // 這裏是 else。說明,沒有任務需要提交,但是池中的任務沒有完成,還沒有超時(如果設置了超時)
                // take() 方法會阻塞,直到有元素返回,說明有任務結束了
                else
                    f = ecs.take();
            }
            /*
             * 我感覺上面這一段並不是很好理解,這裏簡單說下。
             * 1. 首先,這在一個 for 循環中,我們設想每一個任務都沒那麽快結束,
             *     那麽,每一次都會進到第一個分支,進行提交任務,直到將所有的任務都提交了
             * 2. 任務都提交完成後,如果設置了超時,那麽 for 循環其實進入了“一直檢測是否超時”
                   這件事情上
             * 3. 如果沒有設置超時機制,那麽不必要檢測超時,那就會阻塞在 ecs.take() 方法上,
                   等待獲取第一個執行結果
             * 4. 如果所有的任務都執行失敗,也就是說 future 都返回了,
                   但是 f.get() 拋出異常,那麽從 active == 0 分支出去(感謝 newmicro 提出)
                     // 當然,這個需要看下面的 if 分支。
             */

            // 有任務結束了
            if (f != null) {
                --active;
                try {
                    // 返回執行結果,如果有異常,都包裝成 ExecutionException
                    return f.get();
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }// 註意看 for 循環的範圍,一直到這裏

        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {
        // 方法退出之前,取消其他的任務
        for (Future<T> f : futures)
            f.cancel(true);
    }
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

// 執行所有的任務,返回任務結果。
// 先不要看這個方法,我們先想想,其實我們自己提交任務到線程池,也是想要線程池執行所有的任務
// 只不過,我們是每次 submit 一個任務,這裏以一個集合作為參數提交
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        // 這個很簡單
        for (Callable<T> t : tasks) {
            // 包裝成 FutureTask
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            // 提交任務
            execute(f);
        }
        for (Future<T> f : futures) {
            if (!f.isDone()) {
                try {
                    // 這是一個阻塞方法,直到獲取到值,或拋出了異常
                    // 這裏有個小細節,其實 get 方法簽名上是會拋出 InterruptedException 的
                    // 可是這裏沒有進行處理,而是拋給外層去了。此異常發生於還沒執行完的任務被取消了
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        // 這個方法返回,不像其他的場景,返回 List<Future>,其實執行結果還沒出來
        // 這個方法返回是真正的返回,任務都結束了
        return futures;
    } finally {
        // 為什麽要這個?就是上面說的有異常的情況
        if (!done)
            for (Future<T> f : futures)
                f.cancel(true);
    }
}

// 帶超時的 invokeAll,我們找不同吧
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null || unit == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        long lastTime = System.nanoTime();

        Iterator<Future<T>> it = futures.iterator();
        // 提交一個任務,檢測一次是否超時
        while (it.hasNext()) {
            execute((Runnable)(it.next()));
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            // 超時
            if (nanos <= 0)
                return futures;
        }

        for (Future<T> f : futures) {
            if (!f.isDone()) {
                if (nanos <= 0)
                    return futures;
                try {
                    // 調用帶超時的 get 方法,這裏的參數 nanos 是剩余的時間,
                    // 因為上面其實已經用掉了一些時間了
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                }
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (Future<T> f : futures)
                f.cancel(true);
    }
}

}
到這裏,我們發現,這個抽象類包裝了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它們都沒有真正開啟線程來執行任務,它們都只是在方法內部調用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法還沒出現,需要等具體執行器來實現這個最重要的部分,這裏我們要說的就是 ThreadPoolExecutor 類了。

鑒於本文的篇幅,我覺得看到這裏的讀者應該已經不多了,快餐文化使然啊!我寫的每篇文章都力求讓讀者可以通過我的一篇文章而記住所有的相關知識點,所以篇幅不免長了些。其實,工作了很多年的話,會有一個感覺,比如說線程池,即使看了 20 篇各種總結,也不如一篇長文實實在在講解清楚每一個知識點,有點少即是多,多即是少的意味了。

ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中的線程池實現,這個類實現了一個線程池需要的各個方法,它實現了任務提交、線程管理、監控等等方法。

我們可以基於它來進行業務上的擴展,以實現我們需要的其他功能,比如實現定時任務的類 ScheduledThreadPoolExecutor 就繼承自 ThreadPoolExecutor。當然,這不是本文關註的重點,下面,還是趕緊進行源碼分析吧。

首先,我們來看看線程池實現中的幾個概念和處理流程。

我們先回顧下提交任務的幾個方法:

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
一個最基本的概念是,submit 方法中,參數是 Runnable 類型(也有Callable 類型),這個參數不是用於 new Thread(runnable).start() 中的,此處的這個參數不是用於啟動線程的,這裏指的是任務,任務要做的事情是 run() 方法裏面定義的或 Callable 中的 call() 方法裏面定義的。

初學者往往會搞混這個,因為 Runnable 總是在各個地方出現,經常把一個 Runnable 包到另一個 Runnable 中。請把它想象成有個 Task 接口,這個接口裏面有一個 run() 方法(我想作者只是不想因為這個再定義一個完全可以用 Runnable 來代替的接口,Callable 的出現,完全是因為 Runnable 不能滿足需要)。

我們回過神來繼續往下看,我畫了一個簡單的示意圖來描述線程池中的一些主要的構件:
技術分享圖片

當然,上圖沒有考慮隊列是否有界,提交任務時隊列滿了怎麽辦?什麽情況下會創建新的線程?提交任務時線程池滿了怎麽辦?空閑線程怎麽關掉?這些問題下面我們會一一解決。

我們經常會使用 Executors 這個工具類來快速構造一個線程池,對於初學者而言,這種工具類是很有用的,開發者不需要關註太多的細節,只要知道自己需要一個線程池,僅僅提供必需的參數就可以了,其他參數都采用作者提供的默認值。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
這裏先不說有什麽區別,它們最終都會導向這個構造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    // 這幾個參數都是必須要有的
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();

    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

基本上,上面的構造方法中列出了我們最需要關心的幾個屬性了,下面逐個介紹下構造方法中出現的這幾個屬性:

corePoolSize

核心線程數,不要摳字眼,反正先記著有這麽個屬性就可以了。

maximumPoolSize

?最大線程數,線程池允許創建的最大線程數。

workQueue

任務隊列,BlockingQueue 接口的某個實現(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。

keepAliveTime

空閑線程的保活時間,如果某線程的空閑時間超過這個值都沒有任務給它做,那麽可以被關閉了。註意這個值並不會對所有線程起作用,如果線程池中的線程數少於等於核心線程數 corePoolSize,那麽這些線程不會因為空閑太長時間而被關閉,當然,也可以通過調用 allowCoreThreadTimeOut(true)使核心線程數內的線程也可以被回收。

threadFactory

用於生成線程,一般我們可以用默認的就可以了。通常,我們可以通過它將我們的線程的名字設置得比較可讀一些,如 Message-Thread-1, Message-Thread-2 類似這樣。

handler:

當線程池已經滿了,但是又有新的任務提交的時候,該采取什麽策略由這個來指定。有幾種方式可供選擇,像拋出異常、直接拒絕然後返回等,也可以自己實現相應的接口實現自己的邏輯,這個之後再說。

除了上面幾個屬性外,我們再看看其他重要的屬性。

Doug Lea 采用一個 32 位的整數來存放線程池的狀態和當前池中的線程數,其中高 3 位用於存放線程池狀態,低 29 位表示線程數(即使只有 29 位,也已經不小了,大概 5 億多,現在還沒有哪個機器能起這麽多線程的吧)。我們知道,java 語言在整數編碼上是統一的,都是采用補碼的形式,下面是簡單的移位操作和布爾操作,都是挺簡單的。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 這裏 COUNT_BITS 設置為 29(32-3),意味著前三位用於存放線程狀態,後29位用於存放線程數
// 很多初學者很喜歡在自己的代碼中寫很多 29 這種數字,或者某個特殊的字符串,然後分布在各個地方,這是非常糟糕的
private static final int COUNT_BITS = Integer.SIZE - 3;

// 000 11111111111111111111111111111
// 這裏得到的是 29 個 1,也就是說線程池的最大線程數是 2^29-1=536870911
// 以我們現在計算機的實際情況,這個數量還是夠用的
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 我們說了,線程池的狀態存放在高 3 位中
// 運算結果為 111跟29個0:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;

// 將整數 c 的低 29 位修改為 0,就得到了線程池的狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 將整數 c 的高 3 為修改為 0,就得到了線程池中的線程數
private static int workerCountOf(int c) { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

/*

  • Bit field accessors that don‘t require unpacking ctl.
  • These depend on the bit layout and on workerCount being never negative.
    */

private static boolean runStateLessThan(int c, int s) {
return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
上面就是對一個整數的簡單的位操作,幾個操作方法將會在後面的源碼中一直出現,所以讀者最好把方法名字和其代表的功能記住,看源碼的時候也就不需要來來回回翻了。

在這裏,介紹下線程池中的各個狀態和狀態變化的轉換過程:

RUNNING:這個沒什麽好說的,這是最正常的狀態:接受新的任務,處理等待隊列中的任務
SHUTDOWN:不接受新的任務提交,但是會繼續處理等待隊列中的任務
STOP:不接受新的任務提交,不再處理等待隊列中的任務,中斷正在執行任務的線程
TIDYING:所有的任務都銷毀了,workCount 為 0。線程池的狀態在轉換為 TIDYING 狀態時,會執行鉤子方法 terminated()
TERMINATED:terminated() 方法結束後,線程池的狀態就會變成這個
RUNNING 定義為 -1,SHUTDOWN 定義為 0,其他的都比 0 大,所以等於 0 的時候不能提交任務,大於 0 的話,連正在執行的任務也需要中斷。

看了這幾種狀態的介紹,讀者大體也可以猜到十之八九的狀態轉換了,各個狀態的轉換過程有以下幾種:

RUNNING -> SHUTDOWN:當調用了 shutdown() 後,會發生這個狀態轉換,這也是最重要的
(RUNNING or SHUTDOWN) -> STOP:當調用 shutdownNow() 後,會發生這個狀態轉換,這下要清楚 shutDown() 和 shutDownNow() 的區別了
SHUTDOWN -> TIDYING:當任務隊列和線程池都清空後,會由 SHUTDOWN 轉換為 TIDYING
STOP -> TIDYING:當任務隊列清空後,發生這個轉換
TIDYING -> TERMINATED:這個前面說了,當 terminated() 方法結束後
上面的幾個記住核心的就可以了,尤其第一個和第二個。

另外,我們還要看看一個內部類 Worker,因為 Doug Lea 把線程池中的線程包裝成了一個個 Worker,翻譯成工人,就是線程池中做任務的線程。所以到這裏,我們知道任務是 Runnable(內部叫 task 或 command),線程是 Worker。

Worker 這裏又用到了抽象類 AbstractQueuedSynchronizer。題外話,AQS 在並發中真的是到處出現,而且非常容易使用,寫少量的代碼就能實現自己需要的同步方式(對 AQS 源碼感興趣的讀者請參看我之前寫的幾篇文章)。

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;

// 這個是真正的線程,任務靠你啦
final Thread thread;

// 前面說了,這裏的 Runnable 是任務。為什麽叫 firstTask?因為在創建線程的時候,如果同時指定了
// 這個線程起來以後需要執行的第一個任務,那麽第一個任務就是存放在這裏的(線程可不止執行這一個任務)
// 當然了,也可以為 null,這樣線程起來了,自己到任務隊列(BlockingQueue)中取任務(getTask 方法)就行了
Runnable firstTask;

// 用於存放此線程完全的任務數,註意了,這裏用了 volatile,保證可見性
volatile long completedTasks;

// Worker 只有這一個構造方法,傳入 firstTask,也可以傳 null
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 調用 ThreadFactory 來創建一個新的線程
    this.thread = getThreadFactory().newThread(this);
}

// 這裏調用了外部類的 runWorker 方法
public void run() {
    runWorker(this);
}

...// 其他幾個方法沒什麽好看的,就是用 AQS 操作,來獲取這個線程的執行權,用了獨占鎖

}
前面雖然啰嗦,但是簡單。有了上面的這些基礎後,我們終於可以看看 ThreadPoolExecutor 的 execute 方法了,前面源碼分析的時候也說了,各種方法都最終依賴於 execute 方法:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

// 前面說的那個表示 “線程池狀態” 和 “線程數” 的整數
int c = ctl.get();

// 如果當前線程數少於核心線程數,那麽直接添加一個 worker 來執行任務,
// 創建一個新的線程,並把當前任務 command 作為這個線程的第一個任務(firstTask)
if (workerCountOf(c) < corePoolSize) {
    // 添加任務成功,那麽就結束了。提交任務嘛,線程池已經接受了這個任務,這個方法也就可以返回了
    // 至於執行的結果,到時候會包裝到 FutureTask 中。
    // 返回 false 代表線程池不允許提交任務
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
// 到這裏說明,要麽當前線程數大於等於核心線程數,要麽剛剛 addWorker 失敗了

// 如果線程池處於 RUNNING 狀態,把這個任務添加到任務隊列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
    /* 這裏面說的是,如果任務進入了 workQueue,我們是否需要開啟新的線程
     * 因為線程數在 [0, corePoolSize) 是無條件開啟新的線程
     * 如果線程數已經大於等於 corePoolSize,那麽將任務添加到隊列中,然後進到這裏
     */
    int recheck = ctl.get();
    // 如果線程池已不處於 RUNNING 狀態,那麽移除已經入隊的這個任務,並且執行拒絕策略
    if (! isRunning(recheck) && remove(command))
        reject(command);
    // 如果線程池還是 RUNNING 的,並且線程數為 0,那麽開啟新的線程
    // 到這裏,我們知道了,這塊代碼的真正意圖是:擔心任務提交到隊列中了,但是線程都關閉了
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
// 如果 workQueue 隊列滿了,那麽進入到這個分支
// 以 maximumPoolSize 為界創建新的 worker,
// 如果失敗,說明當前線程數已經達到 maximumPoolSize,執行拒絕策略
else if (!addWorker(command, false))
    reject(command);

}
對創建線程的錯誤理解:如果線程數少於 corePoolSize,創建一個線程,如果線程數在 [corePoolSize, maximumPoolSize] 之間那麽可以創建線程或復用空閑線程,keepAliveTime 對這個區間的線程有效。

從上面的幾個分支,我們就可以看出,上面的這段話是錯誤的。

上面這些一時半會也不可能全部消化搞定,我們先繼續往下吧,到時候再回頭看幾遍。

這個方法非常重要 addWorker(Runnable firstTask, boolean core) 方法,我們看看它是怎麽創建新的線程的:

// 第一個參數是準備提交給這個線程執行的任務,之前說了,可以為 null
// 第二個參數為 true 代表使用核心線程數 corePoolSize 作為創建線程的界線,也就說創建這個線程的時候,
// 如果線程池中的線程總數已經達到 corePoolSize,那麽不能響應這次創建線程的請求
// 如果是 false,代表使用最大線程數 maximumPoolSize 作為界線
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

    // 這個非常不好理解
    // 如果線程池已關閉,並滿足以下條件之一,那麽不創建新的 worker:
    // 1. 線程池狀態大於 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED
    // 2. firstTask != null
    // 3. workQueue.isEmpty()
    // 簡單分析下:
    // 還是狀態控制的問題,當線程池處於 SHUTDOWN 的時候,不允許提交任務,但是已有的任務繼續執行
    // 當狀態大於 SHUTDOWN 時,不允許提交任務,且中斷正在執行的任務
    // 多說一句:如果線程池處於 SHUTDOWN,但是 firstTask 為 null,且 workQueue 非空,那麽是允許創建 worker 的
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        // 如果成功,那麽就是所有創建線程前的條件校驗都滿足了,準備創建線程執行任務了
        // 這裏失敗的話,說明有其他線程也在嘗試往線程池中創建線程
        if (compareAndIncrementWorkerCount(c))
            break retry;
        // 由於有並發,重新再讀取一下 ctl
        c = ctl.get();
        // 正常如果是 CAS 失敗的話,進到下一個裏層的for循環就可以了
        // 可是如果是因為其他線程的操作,導致線程池的狀態發生了變更,如有其他線程關閉了這個線程池
        // 那麽需要回到外層的for循環
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

/* 
 * 到這裏,我們認為在當前這個時刻,可以開始創建線程來執行任務了,
 * 因為該校驗的都校驗了,至於以後會發生什麽,那是以後的事,至少當前是滿足條件的
 */

// worker 是否已經啟動
boolean workerStarted = false;
// 是否已將這個 worker 添加到 workers 這個 HashSet 中
boolean workerAdded = false;
Worker w = null;
try {
    final ReentrantLock mainLock = this.mainLock;
    // 把 firstTask 傳給 worker 的構造方法
    w = new Worker(firstTask);
    // 取 worker 中的線程對象,之前說了,Worker的構造方法會調用 ThreadFactory 來創建一個新的線程
    final Thread t = w.thread;
    if (t != null) {
        // 這個是整個類的全局鎖,持有這個鎖才能讓下面的操作“順理成章”,
        // 因為關閉一個線程池需要這個鎖,至少我持有鎖的期間,線程池不會被關閉
        mainLock.lock();
        try {

            int c = ctl.get();
            int rs = runStateOf(c);

            // 小於 SHUTTDOWN 那就是 RUNNING,這個自不必說,是最正常的情況
            // 如果等於 SHUTDOWN,前面說了,不接受新的任務,但是會繼續執行等待隊列中的任務
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                // worker 裏面的 thread 可不能是已經啟動的
                if (t.isAlive())
                    throw new IllegalThreadStateException();
                // 加到 workers 這個 HashSet 中
                workers.add(w);
                int s = workers.size();
                // largestPoolSize 用於記錄 workers 中的個數的最大值
                // 因為 workers 是不斷增加減少的,通過這個值可以知道線程池的大小曾經達到的最大值
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        // 添加成功的話,啟動這個線程
        if (workerAdded) {
            // 啟動線程
            t.start();
            workerStarted = true;
        }
    }
} finally {
    // 如果線程沒有啟動,需要做一些清理工作,如前面 workCount 加了 1,將其減掉
    if (! workerStarted)
        addWorkerFailed(w);
}
// 返回線程是否啟動成功
return workerStarted;

}
簡單看下 addWorkFailed 的處理:

// workers 中刪除掉相應的 worker
// workCount 減 1
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
// rechecks for termination, in case the existence of this worker was holding up termination
tryTerminate();
} finally {
mainLock.unlock();
}
}
回過頭來,繼續往下走。我們知道,worker 中的線程 start 後,其 run 方法會調用 runWorker 方法:

// Worker 類的 run() 方法
public void run() {
runWorker(this);
}
繼續往下看 runWorker 方法:

// 此方法由 worker 線程啟動後調用,這裏用一個 while 循環來不斷地從等待隊列中獲取任務並執行
// 前面說了,worker 在初始化的時候,可以指定 firstTask,那麽第一個任務也就可以不需要從隊列中獲取
final void runWorker(Worker w) {
//
Thread wt = Thread.currentThread();
// 該線程的第一個任務(如果有的話)
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循環調用 getTask 獲取任務
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果線程池狀態大於等於 STOP,那麽意味著該線程也要中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 這是一個鉤子方法,留給需要的子類實現
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 到這裏終於可以執行任務了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 這裏不允許拋出 Throwable,所以轉換為 Error
thrown = x; throw new Error(x);
} finally {
// 也是一個鉤子方法,將 task 和異常作為參數,留給需要的子類實現
afterExecute(task, thrown);
}
} finally {
// 置空 task,準備 getTask 獲取下一個任務
task = null;
// 累加完成的任務數
w.completedTasks++;
// 釋放掉 worker 的獨占鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 如果到這裏,需要執行線程關閉:
// 1. 說明 getTask 返回 null,也就是說,這個 worker 的使命結束了,執行關閉
// 2. 任務執行過程中發生了異常
// 第一種情況,已經在代碼處理了將 workCount 減 1,這個在 getTask 方法分析中會說
// 第二種情況,workCount 沒有進行處理,所以需要在 processWorkerExit 中處理
// 限於篇幅,我不準備分析這個方法了,感興趣的讀者請自行分析源碼
processWorkerExit(w, completedAbruptly);
}
}
我們看看 getTask() 是怎麽獲取任務的,這個方法寫得真的很好,每一行都很簡單,組合起來卻所有的情況都想好了:

// 此方法有三種可能:
// 1. 阻塞直到獲取到任務返回。我們知道,默認 corePoolSize 之內的線程是不會被回收的,
// 它們會一直等待任務
// 2. 超時退出。keepAliveTime 起作用的時候,也就是如果這麽多時間內都沒有任務,那麽應該執行關閉
// 3. 如果發生了以下條件,此方法必須返回 null:
// - 池中有大於 maximumPoolSize 個 workers 存在(通過調用 setMaximumPoolSize 進行設置)
// - 線程池處於 SHUTDOWN,而且 workQueue 是空的,前面說了,這種不再接受新的任務
// - 線程池處於 STOP,不僅不接受新的線程,連 workQueue 中的線程也不再執行
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);
    // 兩種可能
    // 1. rs == SHUTDOWN && workQueue.isEmpty()
    // 2. rs >= STOP
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        // CAS 操作,減少工作線程數
        decrementWorkerCount();
        return null;
    }

    boolean timed;      // Are workers subject to culling?
    for (;;) {
        int wc = workerCountOf(c);
        // 允許核心線程數內的線程回收,或當前線程數超過了核心線程數,那麽有可能發生超時關閉
        timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 這裏 break,是為了不往下執行後一個 if (compareAndDecrementWorkerCount(c))
        // 兩個 if 一起看:如果當前線程數 wc > maximumPoolSize,或者超時,都返回 null
        // 那這裏的問題來了,wc > maximumPoolSize 的情況,為什麽要返回 null?
        //    換句話說,返回 null 意味著關閉線程。
        // 那是因為有可能開發者調用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調小了
        if (wc <= maximumPoolSize && ! (timedOut && timed))
            break;
        if (compareAndDecrementWorkerCount(c))
            return null;
        c = ctl.get();  // Re-read ctl
        // compareAndDecrementWorkerCount(c) 失敗,線程池中的線程數發生了改變
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
    // wc <= maximumPoolSize 同時沒有超時
    try {
        // 到 workQueue 中獲取任務
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;
    } catch (InterruptedException retry) {
        // 如果此 worker 發生了中斷,采取的方案是重試
        // 解釋下為什麽會發生中斷,這個讀者要去看 setMaximumPoolSize 方法,
        // 如果開發者將 maximumPoolSize 調小了,導致其小於當前的 workers 數量,
        // 那麽意味著超出的部分線程要被關閉。重新進入 for 循環,自然會有部分線程會返回 null
        timedOut = false;
    }
}

}
到這裏,基本上也說完了整個流程,讀者這個時候應該回到 execute(Runnable command) 方法,看看各個分支,我把代碼貼過來一下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

// 前面說的那個表示 “線程池狀態” 和 “線程數” 的整數
int c = ctl.get();

// 如果當前線程數少於核心線程數,那麽直接添加一個 worker 來執行任務,
// 創建一個新的線程,並把當前任務 command 作為這個線程的第一個任務(firstTask)
if (workerCountOf(c) < corePoolSize) {
    // 添加任務成功,那麽就結束了。提交任務嘛,線程池已經接受了這個任務,這個方法也就可以返回了
    // 至於執行的結果,到時候會包裝到 FutureTask 中。
    // 返回 false 代表線程池不允許提交任務
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
// 到這裏說明,要麽當前線程數大於等於核心線程數,要麽剛剛 addWorker 失敗了

// 如果線程池處於 RUNNING 狀態,把這個任務添加到任務隊列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
    /* 這裏面說的是,如果任務進入了 workQueue,我們是否需要開啟新的線程
     * 因為線程數在 [0, corePoolSize) 是無條件開啟新的線程
     * 如果線程數已經大於等於 corePoolSize,那麽將任務添加到隊列中,然後進到這裏
     */
    int recheck = ctl.get();
    // 如果線程池已不處於 RUNNING 狀態,那麽移除已經入隊的這個任務,並且執行拒絕策略
    if (! isRunning(recheck) && remove(command))
        reject(command);
    // 如果線程池還是 RUNNING 的,並且線程數為 0,那麽開啟新的線程
    // 到這裏,我們知道了,這塊代碼的真正意圖是:擔心任務提交到隊列中了,但是線程都關閉了
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
// 如果 workQueue 隊列滿了,那麽進入到這個分支
// 以 maximumPoolSize 為界創建新的 worker,
// 如果失敗,說明當前線程數已經達到 maximumPoolSize,執行拒絕策略
else if (!addWorker(command, false))
    reject(command);

}
上面各個分支中,有兩種情況會調用 reject(command) 來處理任務,因為按照正常的流程,線程池此時不能接受這個任務,所以需要執行我們的拒絕策略。接下來,我們說一說 ThreadPoolExecutor 中的拒絕策略。

final void reject(Runnable command) {
// 執行拒絕策略
handler.rejectedExecution(command, this);
}
此處的 handler 我們需要在構造線程池的時候就傳入這個參數,它是 RejectedExecutionHandler 的實例。

RejectedExecutionHandler 在 ThreadPoolExecutor 中有四個已經定義好的實現類可供我們直接使用,當然,我們也可以實現自己的策略,不過一般也沒有必要。

// 只要線程池沒有被關閉,那麽由提交任務的線程自己來執行這個任務。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

// 不管怎樣,直接拋出 RejectedExecutionException 異常
// 這個是默認的策略,如果我們構造線程池的時候不傳相應的 handler 的話,那就會指定使用這個
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

// 不做任何處理,直接忽略掉這個任務
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

// 這個相對霸道一點,如果線程池沒有被關閉的話,
// 把隊列隊頭的任務(也就是等待了最長時間的)直接扔掉,然後提交這個任務到等待隊列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
到這裏,ThreadPoolExecutor 的源碼算是分析結束了。單純從源碼的難易程度來說,ThreadPoolExecutor 的源碼還算是比較簡單的,只是需要我們靜下心來好好看看罷了。

Executors
這節其實也不是分析 Executors 這個類,因為它僅僅是工具類,它的所有方法都是 static 的。

生成一個固定大小的線程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
最大線程數設置為與核心線程數相等,此時 keepAliveTime 設置為 0(因為這裏它是沒用的,即使不為 0,線程池默認也不會回收 corePoolSize 內的線程),任務隊列采用 LinkedBlockingQueue,×××隊列。

過程分析:剛開始,每提交一個任務都創建一個 worker,當 worker 的數量達到 nThreads 後,不再創建新的線程,而是把任務提交到 LinkedBlockingQueue 中,而且之後線程數始終為 nThreads。

生成只有一個線程的固定線程池,這個更簡單,和上面的一樣,只要設置線程數為 1 就可以了:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
生成一個需要的時候就創建新的線程,同時可以復用之前創建的線程(如果這個線程當前沒有任務)的線程池:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心線程數為 0,最大線程數為 Integer.MAX_VALUE,keepAliveTime 為 60 秒,任務隊列采用 SynchronousQueue。

這種線程池對於任務可以比較快速地完成的情況有比較好的性能。如果線程空閑了 60 秒都沒有任務,那麽將關閉此線程並從線程池中移除。所以如果線程池空閑了很長時間也不會有問題,因為隨著所有的線程都會被關閉,整個線程池不會占用任何的系統資源。

過程分析:我把 execute 方法的主體黏貼過來,讓大家看得明白些。鑒於 corePoolSize 是 0,那麽提交任務的時候,直接將任務提交到隊列中,由於采用了 SynchronousQueue,所以如果是第一個任務提交的時候,offer 方法肯定會返回 false,因為此時沒有任何 worker 對這個任務進行接收,那麽將進入到最後一個分支來創建第一個 worker。之後再提交任務的話,取決於是否有空閑下來的線程對任務進行接收,如果有,會進入到第二個 if 語句塊中,否則就是和第一個任務一樣,進到最後的 else if 分支。

int c = ctl.get();
// corePoolSize 為 0,所以不會進到這個 if 分支
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// offer 如果有空閑線程剛好可以接收此任務,那麽返回 true,否則返回 false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
SynchronousQueue 是一個比較特殊的 BlockingQueue,其本身不儲存任何元素,它有一個虛擬隊列(或虛擬棧),不管讀操作還是寫操作,如果當前隊列中存儲的是與當前操作相同模式的線程,那麽當前操作也進入隊列中等待;如果是相反模式,則配對成功,從當前隊列中取隊頭節點。具體的信息,可以看我的另一篇關於 BlockingQueue 的文章。

總結
我一向不喜歡寫總結,因為我把所有需要表達的都寫在正文中了,寫小篇幅的總結並不能真正將話說清楚,本文的總結部分為準備面試的讀者而寫,希望能幫到面試者或者沒有足夠的時間看完全文的讀者。

java 線程池有哪些關鍵屬性?

corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler

corePoolSize 到 maximumPoolSize 之間的線程會被回收,當然 corePoolSize 的線程也可以通過設置而得到回收(allowCoreThreadTimeOut(true))。

workQueue 用於存放任務,添加任務的時候,如果當前線程數超過了 corePoolSize,那麽往該隊列中插入任務,線程池中的線程會負責到隊列中拉取任務。

keepAliveTime 用於設置空閑時間,如果線程數超出了 corePoolSize,並且有些線程的空閑時間超過了這個值,會執行關閉這些線程的操作

rejectedExecutionHandler 用於處理當線程池不能執行此任務時的情況,默認有拋出 RejectedExecutionException 異常、忽略任務、使用提交任務的線程來執行此任務和將隊列中等待最久的任務刪除,然後提交此任務這四種策略,默認為拋出異常。

說說線程池中的線程創建時機?

如果當前線程數少於 corePoolSize,那麽提交任務的時候創建一個新的線程,並由這個線程執行這個任務;
如果當前線程數已經達到 corePoolSize,那麽將提交的任務添加到隊列中,等待線程池中的線程去隊列中取任務;
如果隊列已滿,那麽創建新的線程來執行任務,需要保證池中的線程數不會超過 maximumPoolSize,如果此時線程數超過了 maximumPoolSize,那麽執行拒絕策略。

  • 註意:如果將隊列設置為×××隊列,那麽線程數達到 corePoolSize 後,其實線程數就不會再增長了。

Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 構造出來的線程池有什麽差別?

細說太長,往上滑一點點,在 Executors 的小節進行了詳盡的描述。

任務執行過程中發生異常怎麽處理?

如果某個任務執行出現異常,那麽執行任務的線程會被關閉,而不是繼續接收其他任務。然後會啟動一個新的線程來代替它。

什麽時候會執行拒絕策略?

workers 的數量達到了 corePoolSize(任務此時需要進入任務隊列),任務入隊成功,與此同時線程池被關閉了,而且關閉線程池並沒有將這個任務出隊,那麽執行拒絕策略。這裏說的是非常邊界的問題,入隊和關閉線程池並發執行,讀者仔細看看 execute 方法是怎麽進到第一個 reject(command) 裏面的。
workers 的數量大於等於 corePoolSize,將任務加入到任務隊列,可是隊列滿了,任務入隊失敗,那麽準備開啟新的線程,可是線程數已經達到 maximumPoolSize,那麽執行拒絕策略。
因為本文實在太長了,所以我沒有說執行結果是怎麽獲取的,也沒有說關閉線程池相關的部分,這個就留給讀者吧。

本文篇幅是有點長,如果讀者發現什麽不對的地方,或者有需要補充的地方,請不吝提出,謝謝。

深度解讀 java 線程池設計思想及源碼實現