1. 程式人生 > >java併發程式設計之執行緒池

java併發程式設計之執行緒池

前言

本文介紹幾種java常用的執行緒池如:FixedThreadPool,ScheduledThreadPool,CachedThreadPool等執行緒池,並分析介紹Executor框架,做到“知其然”:會用執行緒池,正確使用執行緒池。並且“知其所以然”:瞭解背後的基本原理。

1.Executor

Executor是java SE5的java.util.concurrent包下的執行器,用於管理Thread物件,它幫程式設計師簡化了併發程式設計。與客戶端直接執行任務不同,Executor作為客戶端和任務執行之間的中介,將任務的提交和任務的執行策略解耦開來,Executor允許我們管理非同步任務的執行。

Executor的實現思想基於生產者-消費者模型,提交任務的執行緒相當於生產者,執行任務的工作執行緒相當於消費者,這裡所說的任務即我們實現Runnable介面的類。

Executor關鍵類圖如下:
這裡寫圖片描述
根據這個類圖,詳細分析一下其中的門路:

  • Executor:該介面定義了一個 void execute(Runnable command)方法,用來接受任務。
  • ExecutorService:繼承了Executor並對其進行了豐富的拓展,提供了任務生命週期管理相關的方法,如shutdown(),shutdownNow()等方法,並提供了追蹤一個或者多個非同步任務執行狀況並返回Future的方法,如submit(),invokeAll()方法
  • AbstractExecutorService:ExecutorService的預設實現類,該類是一個抽象類。
  • ThreadPoolExecutor:繼承了AbstractExecutorService並實現了其方法,可以通過Executors提供的靜態工廠方法建立執行緒池並返回ExecutorService例項
  • ScheduledExecutorService:提供定時排程任務的介面
  • ScheduledThreadPoolExecutor:ScheduledExecutorService的實現類,提供可定時排程任務的執行緒池。

2.ThreadPoolExecutor

根據上面的類圖,我們詳細看下ThreadPoolExecutor提供的構造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
      //略 ……
    }

引數介紹:

     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached

引數說明:

  • corePoolSize:核心執行緒數/基本執行緒數,沒有任務執行時執行緒池的大小(在建立ThreadPoolExecutor時,執行緒不會立即啟動,有任務提交時才會啟動)
  • maximumPoolSize:最大執行緒數,允許的建立的執行緒數
  • keepAliveTime:執行緒存活時間,當某個執行緒的空閒時間超過了存活時間,將被標記為可回收的,如果此時執行緒池的大小超過了corePoolSize,執行緒將被終止。
  • TimeUnit:keepAliveTime的單位,可選毫秒,秒等
  • workQueue:儲存任務的阻塞佇列,主要有三種:有界佇列,無界佇列和同步移交佇列(Synchronous Handoff)。下面將詳細說明
  • threadFactory(可選):在Executor建立新執行緒的時候使用,預設使用defaultThreadFactory建立執行緒
  • handler:定義“飽和策略”,這裡的飽和根據引數說明是指執行緒池容量(workQueue也滿了)滿了的時候,會使用飽和策略進行任務的拒絕。預設的策略是“Abort”即中止策略,該策略丟擲RejectExecutorException異常,其他的策略有“拋棄(Discard)、拋棄最舊(Discard-Oldest)”等。

很多人有疑問這些引數的含義到底是什麼,網上的解釋也五花八門,這裡我通俗的解釋一下,當提交一個任務的時候,會先檢查當前執行的執行緒數,如果當前執行的執行緒數小於基本執行緒數(corePoolSize),則直接建立執行緒執行任務,且這個執行緒屬於core執行緒。如果當前執行的執行緒數大於等於基本執行緒數,該任務會被放到阻塞佇列(workQueue)中,在阻塞佇列裡的任務會複用core執行緒,即阻塞佇列裡的任務會等待core執行緒提取執行。而當阻塞佇列是有界佇列時,在阻塞佇列滿了的時候,會建立新的執行緒來執行任務,這些新的執行緒是非core執行緒,滿足keepAliveTime的時候會被銷燬,而已經進入佇列裡的任務會繼續由已有的全部執行緒來執行。超過最大執行緒時(maximumPoolSize),會使用我們定義的“飽和策略”來處理。

這裡面其實牽扯了另一個問題,即如何實現執行緒複用,簡單來說就是執行緒在執行任務時,執行完後會去佇列裡面take新的任務,而take方法是阻塞的,因此執行緒並不會被銷燬,只會不停的執行任務,沒有任務時要麼根據我們的邏輯銷燬要麼阻塞等待任務。

3.Executors

有時候使用ThreadPoolExecutor自己建立執行緒池時由於不清楚如何設定執行緒池大小,存活時間等問題會導致資源浪費,而ThreadPoolExecutor是一個靈活的、穩定的執行緒池,允許各種定製。Executors提供了一系列的靜態工程方法幫我們建立各種執行緒池。

newFixedThreadPool

newFixedThreadPool建立可重用且執行緒數量固定的執行緒池,當執行緒池所有執行緒都在執行任務時,新的任務會在阻塞佇列中等待,當某個執行緒因為異常而結束,執行緒池會建立新的執行緒進行補充。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newFixedThreadPool使用基於FIFO(先進先出)策略的LinkedBlockingQueue作為阻塞佇列。

newSingleThreadPool

newSingleThreadPool使用單執行緒的Executor,其中corePoolSize和maximumPoolSize都為1,固定執行緒池大小為1。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newScheduledThreadPool

newScheduledThreadPool建立可以延遲執行或者定時執行任務的執行緒池,使用延時工作佇列DelayedWorkQueue。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }


public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

newCachedThreadPool

newCachedThreadPool是一個可快取的執行緒池,執行緒池中的執行緒如果60s內未被使用將被移除。使用同步佇列SynchronousQueue。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

關於同步佇列SynchronousQueue

回想一下,newSingleThreadPool,newFixedThreadPool預設使用無界佇列LinkedBlockingQueue,當任務執行速度遠遠低於新任務到來的速度時,該佇列將無限增加,如果我們使用有界佇列例如ArrayBlockingQueue,當佇列填滿後則需要完善的飽和策略,這裡需要根據需求選取折中之道。
這裡就引出了SynchronousQueue,對於非當大的或者無界的執行緒池,使用SynchronousQueue可以避免排隊,它可以講任務直接從生產中交給工作者執行緒,SynchronousQueue本質上不是一個佇列,而是一種同步移交機制,要將一個任務放入SynchronousQueue時必須要有一個執行緒在等待接受,如果沒有執行緒在等待且當前執行緒數大於最大值,將啟用飽和策略拒絕任務。

總結:newSingleThreadPool使用單執行緒的Executor,newFixedThreadPool設定執行緒池的基本大小和最大大小為指定的值,而且建立的執行緒池不會超時。newCachedThreadPool將執行緒池的最大大小設定為Integer.MAX_VALUE(即2的31次方減1),基本大小為0,超時時間為1分鐘,該執行緒池可以無限拓展,且需求降低時會自動收縮。newScheduledThreadPool用於執行定時任務的執行緒池。其他形式的執行緒池大家可參考其他資料。

關於執行緒池大小的問題

執行緒池的大小取決於提交的任務的型別和系統的效能。

  • 如果執行緒池過大,大量執行緒將在相對有限的cpu和記憶體資源上競爭,將導致記憶體使用量過高,耗盡資源
  • 如果執行緒池太小,空想的處理器資源無法有效利用,降低了吞吐率。

為了正確設定執行緒池大小需要考慮系統的CPU數,記憶體容量,任務型別(是計算密集型還是io密集型或是二者皆可),任務是否需要稀缺資源(如jdbc連線)

對於計算密集型任務,在擁有N個處理器的系統上,最佳執行緒池的大小為N+1。這個額外的執行緒保證了突發情況下CPU時鐘週期不會浪費。對於包含IO操作或者其他阻塞操作的任務,由於執行緒不會一直執行,執行緒池需要更大。
執行緒池的大小設定最優公式如下:
引數定義:
N = number of CPUs (cpu數量)
U = target CPU utilization,0<= U <= 1 (cpu利用率)
W/C = ratio of wait time to compute time (任務等待時間和計算時間的比值)
那麼最優執行緒池大小計算公式為:
S = N*U*(1+W/C)
其中cpu數可以通過Runtime.getRuntime().availableProcessors()獲得

CPU週期只是影響執行緒池大小的一個主要引數,其他的因素也很重要,需要綜合實踐。

參考《thinking in java》、《java併發程式設計實戰》