1. 程式人生 > >多執行緒程式設計學習十一(ThreadPoolExecutor 詳解).

多執行緒程式設計學習十一(ThreadPoolExecutor 詳解).

一、ThreadPoolExecutor 引數說明

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize:核心執行緒池的大小。當提交一個任務到執行緒池時,核心執行緒池會建立一個核心執行緒來執行任務,即使其他核心執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於核心執行緒池基本大小時就不再建立。如果呼叫了執行緒池的 prestartAllCoreThreads() 方法,核心執行緒池會提前建立並啟動所有核心執行緒。
  • workQueue:任務佇列。當核心執行緒池中沒有執行緒時,所提交的任務會被暫存在佇列中。Java 提供了多種阻塞佇列。
  • maximumPoolSize:執行緒池允許建立的最大執行緒數。如果佇列也滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的空閒執行緒執行任務。值得注意的是,如果使用了無界的任務佇列則這個引數不起作用。
  • keepAliveTime:當執行緒池中的執行緒數大於 corePoolSize 時,keepAliveTime 為多餘的空閒執行緒等待新任務的最長時間,超過這個時間後多餘的執行緒將被終止。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高執行緒的利用率。值得注意的是,如果使用了無界的任務佇列則這個引數不起作用。
  • TimeUnit:執行緒活動保持時間的單位。
  • threadFactory:建立執行緒的工廠。可以通過執行緒工廠給每個創建出來的執行緒設定符合業務的名字。

    // 依賴 guava
    new ThreadFactoryBuilder().setNameFormat("xx-task-%d").build();
  • handler:飽和策略。當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。Java 提供了以下4種策略:

    • AbortPolicy:預設。直接丟擲異常。
    • CallerRunsPolicy:只用呼叫者所線上程來執行任務。
    • DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。

tips: 一般我們稱核心執行緒池中的執行緒為核心執行緒,這部分執行緒不會被回收;超過任務佇列後,建立的執行緒為空閒執行緒,這部分執行緒會被回收(回收時間即 keepAliveTime)

二、常見的 ThreadPoolExecutor 介紹

Executors 是建立 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的工廠類。

Java 提供了多種型別的 ThreadPoolExecutor,比較常見的有 FixedThreadPool、SingleThreadExecutor、CachedThreadPool等。

FixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

FixedThreadPool 被稱為可重用固定執行緒數的執行緒池。可以看到 corePoolSize 和 maximumPoolSize 都被設定成了 nThreads;keepAliveTime設定為0L,意味著多餘的空閒執行緒會被立即終止;使用了阻塞佇列 LinkedBlockingQueue 作為執行緒的工作佇列(佇列的容量為 Integer.MAX_VALUE)。

FixedThreadPool 所存在的問題是,由於佇列的容量為 Integer.MAX_VALUE,基本可以認為是無界的,所以 maximumPoolSize 和 keepAliveTime 引數都不會生效,飽和拒絕策略也不會執行,會造成任務大量堆積在阻塞佇列中。

FixedThreadPool 適用於為了滿足資源管理的需求,而需要限制執行緒數量的應用場景。


SingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

SingleThreadExecutor 是使用單個執行緒的執行緒池。可以看到 corePoolSize 和 maximumPoolSize 被設定為1,其他引數與 FixedThreadPool 相同,所以所帶來的風險也和 FixedThreadPool 一致,就不贅述了。

SingleThreadExecutor 適用於需要保證順序的執行各個任務。


CachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool 是一個會根據需要建立新執行緒的執行緒池。可以看到 corePoolSize 被設定為 0,所以建立的執行緒都為空閒執行緒;maximumPoolSize 被設定為 Integer.MAX_VALUE(基本可認為無界),意味著可以建立無限數量的空閒執行緒;keepAliveTime 設定為60L,意味著空閒執行緒等待新任務的最長時間為60秒;使用沒有容量的 SynchronousQueue 作為執行緒池的工作佇列。

CachedThreadPool 所存在的問題是, 如果主執行緒提交任務的速度高於maximumPool 中執行緒處理任務的速度時,CachedThreadPool 會不斷建立新執行緒。極端情況下,CachedThreadPool會因為建立過多執行緒而耗盡CPU和記憶體資源。

CachedThreadPool 適用於執行很多的短期非同步任務的小程式,或者是負載較輕的伺服器。

三、 自建 ThreadPoolExecutor 執行緒池

鑑於上面提到的風險,我們更提倡使用 ThreadPoolExecutor 去建立執行緒池,而不用 Executors 工廠去建立。

以下是一個 ThreadPoolExecutor 建立執行緒池的 Demo 例項:

public class Pool {

    static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pool-task-%d").build();
    static ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
            200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
            threadFactory, new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 無返回值的任務執行 -> Runnable
        executor.execute(() -> System.out.println("Hello World"));
        // 2. 有返回值的任務執行 -> Callable
        Future<String> future = executor.submit(() -> "Hello World");
        // get 方法會阻塞執行緒執行等待返回結果
        String result = future.get();
        System.out.println(result);

        // 3. 監控執行緒池
        monitor();

        // 4. 關閉執行緒池
        shutdownAndAwaitTermination();

        monitor();
    }

    private static void monitor() {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Pool.executor;
        System.out.println("【執行緒池任務】執行緒池中曾經建立過的最大執行緒數:" + threadPoolExecutor.getLargestPoolSize());
        System.out.println("【執行緒池任務】執行緒池中執行緒數:" + threadPoolExecutor.getPoolSize());
        System.out.println("【執行緒池任務】執行緒池中活動的執行緒數:" + threadPoolExecutor.getActiveCount());
        System.out.println("【執行緒池任務】佇列中等待執行的任務數:" + threadPoolExecutor.getQueue().size());
        System.out.println("【執行緒池任務】執行緒池已執行完任務數:" + threadPoolExecutor.getCompletedTaskCount());
    }

    /**
     * 關閉執行緒池
     * 1. shutdown、shutdownNow 的原理都是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的 interrupt 方法來中斷執行緒。
     * 2. shutdownNow:將執行緒池的狀態設定成 STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表。
     * 3. shutdown:將執行緒池的狀態設定成 SHUTDOWN 狀態,然後中斷所有沒有正在執行任務的執行緒。
     */
    private static void shutdownAndAwaitTermination() {
        // 禁止提交新任務
        executor.shutdown();
        try {
            // 等待現有任務終止
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                // 取消當前正在執行的任務
                executor.shutdownNow();
                // 等待一段時間讓任務響應被取消
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            // 如果當前執行緒也中斷,則取消
            executor.shutdownNow();
            // 保留中斷狀態
            Thread.currentThread().interrupt();
        }
    }
}

建立執行緒池需要注意以下幾點:

  1. CPU 密集型任務應配置儘可能小的執行緒,如配置 Ncpu+1 個執行緒。
  2. IO 密集型任務(資料庫讀寫等)應配置儘可能多的執行緒,如配置 Ncpu*2 個執行緒。
  3. 優先順序不同的任務可以使用優先順序佇列 PriorityBlockingQueue 來處理。
  4. 建議使用有界佇列。可以避免建立數量非常多的執行緒,甚至拖垮系統。有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。