1. 程式人生 > >深入理解 Java 執行緒池

深入理解 Java 執行緒池

一、簡介

什麼是執行緒池

執行緒池是一種多執行緒處理形式,處理過程中將任務新增到佇列,然後在建立執行緒後自動啟動這些任務。

為什麼要用執行緒池

如果併發請求數量很多,但每個執行緒執行的時間很短,就會出現頻繁的建立和銷燬執行緒。如此一來,會大大降低系統的效率,可能頻繁建立和銷燬執行緒的時間、資源開銷要大於實際工作的所需。

正是由於這個問題,所以有必要引入執行緒池。使用 執行緒池的好處 有以下幾點:

  • 降低資源消耗 - 通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
  • 提高響應速度 - 當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
  • 提高執行緒的可管理性 - 執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。但是要做到合理的利用執行緒池,必須對其原理了如指掌。

二、Executor 框架

Executor 框架是一個根據一組執行策略呼叫,排程,執行和控制的非同步任務的框架,目的是提供一種將”任務提交”與”任務如何執行”分離開來的機制。

核心 API 概述

Executor 框架核心 API 如下:

  • Executor - 執行任務的簡單介面。
  • ExecutorService - 擴充套件了 Executor 介面。擴充套件能力:
    • 支援有返回值的執行緒;
    • 支援管理執行緒的生命週期。
  • ScheduledExecutorService - 擴充套件了 ExecutorService 介面。擴充套件能力:支援定期執行任務。
  • AbstractExecutorService
    - ExecutorService 介面的預設實現。
  • ThreadPoolExecutor - Executor 框架最核心的類,它繼承了 AbstractExecutorService 類。
  • ScheduledThreadPoolExecutor - ScheduledExecutorService 介面的實現,一個可定時排程任務的執行緒池。
  • Executors - 可以通過呼叫 Executors 的靜態工廠方法來建立執行緒池並返回一個 ExecutorService 物件。

Executor

Executor 介面中只定義了一個 execute 方法,用於接收一個 Runnable

物件。

public interface Executor {
    void execute(Runnable command);
}

ExecutorService

ExecutorService 介面繼承了 Executor 介面,它還提供了 invokeAllinvokeAnyshutdownsubmit 等方法。

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

從其支援的方法定義,不難看出:相比於 Executor 介面,ExecutorService 介面主要的擴充套件是:

  • 支援有返回值的執行緒 - sumbitinvokeAllinvokeAny 方法中都支援傳入Callable 物件。
  • 支援管理執行緒生命週期 - shutdownshutdownNowisShutdown 等方法。

ScheduledExecutorService

ScheduledExecutorService 介面擴充套件了 ExecutorService 介面。

它除了支援前面兩個介面的所有能力以外,還支援定時排程執行緒。

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

其擴充套件的介面提供以下能力:

  • schedule 方法可以在指定的延時後執行一個 Runnable 或者 Callable 任務。
  • scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法可以按照指定時間間隔,定期執行任務。

三、ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor 類是 Executor 框架中最核心的類。所以,本文將著重講述一下這個類。

重要欄位

ThreadPoolExecutor 有以下重要欄位:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

引數說明:

  • ctl - 用於控制執行緒池的執行狀態和執行緒池中的有效執行緒數量。它包含兩部分的資訊:
    • 執行緒池的執行狀態 (runState)
    • 執行緒池內有效執行緒的數量 (workerCount)
    • 可以看到,ctl 使用了 Integer 型別來儲存,高 3 位儲存 runState,低 29 位儲存 workerCountCOUNT_BITS 就是 29,CAPACITY 就是 1 左移 29 位減 1(29 個 1),這個常量表示 workerCount 的上限值,大約是 5 億。
  • 執行狀態 - 執行緒池一共有五種執行狀態:
    • RUNNING - 執行狀態。接受新任務,並且也能處理阻塞佇列中的任務。
    • SHUTDOWN - 關閉狀態。不接受新任務,但可以處理阻塞佇列中的任務。
      • 線上程池處於 RUNNING 狀態時,呼叫 shutdown 方法會使執行緒池進入到該狀態。
      • finalize 方法在執行過程中也會呼叫 shutdown 方法進入該狀態。
    • STOP - 停止狀態。不接受新任務,也不處理佇列中的任務。會中斷正在處理任務的執行緒。線上程池處於 RUNNINGSHUTDOWN 狀態時,呼叫 shutdownNow 方法會使執行緒池進入到該狀態。
    • TIDYING - 整理狀態。如果所有的任務都已終止了,workerCount (有效執行緒數) 為 0,執行緒池進入該狀態後會呼叫 terminated 方法進入 TERMINATED 狀態。
    • TERMINATED - 已終止狀態。在 terminated 方法執行完後進入該狀態。預設 terminated 方法中什麼也沒有做。進入 TERMINATED 的條件如下:
      • 執行緒池不是 RUNNING 狀態;
      • 執行緒池狀態不是 TIDYING 狀態或 TERMINATED 狀態;
      • 如果執行緒池狀態是 SHUTDOWN 並且 workerQueue 為空;
      • workerCount 為 0;
      • 設定 TIDYING 狀態成功。

構造方法

ThreadPoolExecutor 有四個構造方法,前三個都是基於第四個實現。第四個構造方法定義如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

引數說明:

  • corePoolSize - 核心執行緒數量。當有新任務通過 execute 方法提交時 ,執行緒池會執行以下判斷:
    • 如果執行的執行緒數少於 corePoolSize,則建立新執行緒來處理任務,即使執行緒池中的其他執行緒是空閒的。
    • 如果執行緒池中的執行緒數量大於等於 corePoolSize 且小於 maximumPoolSize,則只有當 workQueue 滿時才建立新的執行緒去處理任務;
    • 如果設定的 corePoolSizemaximumPoolSize 相同,則建立的執行緒池的大小是固定的。這時如果有新任務提交,若 workQueue 未滿,則將請求放入 workQueue 中,等待有空閒的執行緒去從 workQueue 中取任務並處理;
    • 如果執行的執行緒數量大於等於 maximumPoolSize,這時如果 workQueue 已經滿了,則使用 handler 所指定的策略來處理任務;
    • 所以,任務提交時,判斷的順序為 corePoolSize => workQueue => maximumPoolSize
  • maximumPoolSize - 最大執行緒數量。
    • 如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。
    • 值得注意的是:如果使用了無界的任務佇列這個引數就沒什麼效果。
  • keepAliveTime:執行緒保持活動的時間。
    • 當執行緒池中的執行緒數量大於 corePoolSize 的時候,如果這時沒有新的任務提交,核心執行緒外的執行緒不會立即銷燬,而是會等待,直到等待的時間超過了 keepAliveTime
    • 所以,如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。
  • unit - keepAliveTime 的時間單位。有 7 種取值。可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  • workQueue - 等待執行的任務佇列。用於儲存等待執行的任務的阻塞佇列。 可以選擇以下幾個阻塞佇列。
    • ArrayBlockingQueue - 有界阻塞佇列。
      • 此佇列是基於陣列的先進先出佇列(FIFO)。
      • 此佇列建立時必須指定大小。
    • LinkedBlockingQueue - 無界阻塞佇列。
      • 此佇列是基於連結串列的先進先出佇列(FIFO)。
      • 如果建立時沒有指定此佇列大小,則預設為 Integer.MAX_VALUE
      • 吞吐量通常要高於 ArrayBlockingQueue
      • 使用 LinkedBlockingQueue 意味著: maximumPoolSize 將不起作用,執行緒池能建立的最大執行緒數為 corePoolSize,因為任務等待佇列是無界佇列。
      • Executors.newFixedThreadPool 使用了這個佇列。
    • SynchronousQueue - 不會儲存提交的任務,而是將直接新建一個執行緒來執行新來的任務。
      • 每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態。
      • 吞吐量通常要高於 LinkedBlockingQueue
      • Executors.newCachedThreadPool 使用了這個佇列。
    • PriorityBlockingQueue - 具有優先順序的無界阻塞佇列。
  • threadFactory - 執行緒工廠。可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。
  • handler - 飽和策略。它是 RejectedExecutionHandler 型別的變數。當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。執行緒池支援以下策略:
    • AbortPolicy - 丟棄任務並丟擲異常。這也是預設策略。
    • DiscardPolicy - 丟棄任務,但不丟擲異常。
    • DiscardOldestPolicy - 丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)。
    • CallerRunsPolicy - 只用呼叫者所在的執行緒來執行任務。
    • 如果以上策略都不能滿足需要,也可以通過實現 RejectedExecutionHandler 介面來定製處理策略。如記錄日誌或持久化不能處理的任務。

execute 方法

預設情況下,建立執行緒池之後,執行緒池中是沒有執行緒的,需要提交任務之後才會建立執行緒。

提交任務可以使用 execute 方法,它是 ThreadPoolExecutor 的核心方法,通過這個方法可以向執行緒池提交一個任務,交由執行緒池去執行。

execute 方法工作流程如下:

  1. 如果 workerCount < corePoolSize,則建立並啟動一個執行緒來執行新提交的任務;
  2. 如果 workerCount >= corePoolSize,且執行緒池內的阻塞佇列未滿,則將任務新增到該阻塞佇列中;
  3. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且執行緒池內的阻塞佇列已滿,則建立並啟動一個執行緒來執行新提交的任務;
  4. 如果workerCount >= maximumPoolSize,並且執行緒池內的阻塞佇列已滿,則根據拒絕策略來處理該任務, 預設的處理方式是直接拋異常。

其他重要方法

ThreadPoolExecutor 類中還有一些重要的方法:

  • submit - 類似於 execute,但是針對的是有返回值的執行緒。submit 方法是在 ExecutorService 中宣告的方法,在 AbstractExecutorService 就已經有了具體的實現。ThreadPoolExecutor 直接複用 AbstractExecutorServicesubmit 方法。
  • shutdown - 不會立即終止執行緒池,而是要等所有任務快取佇列中的任務都執行完後才終止,但再也不會接受新的任務。
    • 將執行緒池切換到 SHUTDOWN 狀態;
    • 並呼叫 interruptIdleWorkers 方法請求中斷所有空閒的 worker;
    • 最後呼叫 tryTerminate 嘗試結束執行緒池。
  • shutdownNow - 立即終止執行緒池,並嘗試打斷正在執行的任務,並且清空任務快取佇列,返回尚未執行的任務。與 shutdown 方法類似,不同的地方在於:
    • 設定狀態為 STOP
    • 中斷所有工作執行緒,無論是否是空閒的;
    • 取出阻塞佇列中沒有被執行的任務並返回。
  • isShutdown - 呼叫了 shutdownshutdownNow 方法後,isShutdown 方法就會返回 true。
  • isTerminaed - 當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫 isTerminaed 方法會返回 true。
  • setCorePoolSize - 設定核心執行緒數大小。
  • setMaximumPoolSize - 設定最大執行緒數大小。
  • getTaskCount - 執行緒池已經執行的和未執行的任務總數;
  • getCompletedTaskCount - 執行緒池已完成的任務數量,該值小於等於 taskCount
  • getLargestPoolSize - 執行緒池曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否滿過,也就是達到了 maximumPoolSize
  • getPoolSize - 執行緒池當前的執行緒數量;
  • getActiveCount - 當前執行緒池中正在執行任務的執行緒數量。

使用示例

public class ThreadPoolExecutorDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 100; i++) {
            threadPoolExecutor.execute(new MyThread());
            String info = String.format("執行緒池中執行緒數目:%s,佇列中等待執行的任務數目:%s,已執行玩別的任務數目:%s",
                threadPoolExecutor.getPoolSize(),
                threadPoolExecutor.getQueue().size(),
                threadPoolExecutor.getCompletedTaskCount());
            System.out.println(info);
        }
        threadPoolExecutor.shutdown();
    }

    static class MyThread implements Runnable {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " 執行");
        }

    }

}

四、Executors

JDK 的 Executors 類中提供了幾種具有代表性的執行緒池,這些執行緒池 都是基於 ThreadPoolExecutor 的定製化實現。

在實際使用執行緒池的場景中,我們往往不是直接使用 ThreadPoolExecutor ,而是使用 JDK 中提供的具有代表性的執行緒池例項。

newSingleThreadExecutor

建立一個單執行緒的執行緒池。

只會建立唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。 如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它 。

單工作執行緒最大的特點是:可保證順序地執行各個任務。

示例:

public class SingleThreadExecutorDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 執行");
                }
            });
        }
        executorService.shutdown();
    }

}

newFixedThreadPool

建立一個固定大小的執行緒池。

每次提交一個任務就會新建立一個工作執行緒,如果工作執行緒數量達到執行緒池最大執行緒數,則將提交的任務存入到阻塞佇列中。

FixedThreadPool 是一個典型且優秀的執行緒池,它具有執行緒池提高程式效率和節省建立執行緒時所耗的開銷的優點。但是,線上程池空閒時,即執行緒池中沒有可執行任務時,它不會釋放工作執行緒,還會佔用一定的系統資源。

示例:

public class FixedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 執行");
                }
            });
        }
        executorService.shutdown();
    }

}

newCachedThreadPool

建立一個可快取的執行緒池。

  • 如果執行緒池長度超過處理任務所需要的執行緒數,就會回收部分空閒的執行緒;
  • 如果長時間沒有往執行緒池中提交任務,即如果工作執行緒空閒了指定的時間(預設為 1 分鐘),則該工作執行緒將自動終止。終止後,如果你又提交了新的任務,則執行緒池重新建立一個工作執行緒。
  • 此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說 JVM)能夠建立的最大執行緒大小。 因此,使用 CachedThreadPool 時,一定要注意控制任務的數量,否則,由於大量執行緒同時執行,很有會造成系統癱瘓。

示例:

public class CachedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 執行");
                }
            });
        }
        executorService.shutdown();
    }

}

newScheduleThreadPool

建立一個大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

public class ScheduledThreadPoolDemo {

    public static void main(String[] args) {
        schedule();
        scheduleAtFixedRate();
    }

    private static void schedule() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 100; i++) {
            executorService.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 執行");
                }
            }, 1, TimeUnit.SECONDS);
        }
        executorService.shutdown();
    }

    private static void scheduleAtFixedRate() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 100; i++) {
            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 執行");
                }
            }, 1, 1, TimeUnit.SECONDS);
        }
        executorService.shutdown();
    }

}

參考資料

  • 《Java 併發程式設計實戰》
  • 《Java 併發程式設計的藝術》
  • 深入理解 Java 執行緒池:ThreadPoolExecutor
  • java 併發程式設計--Executor 框架