1. 程式人生 > >【Java多執行緒】執行緒池的工作原理詳解(下)

【Java多執行緒】執行緒池的工作原理詳解(下)

接著上篇文章,我接下來繼續介紹執行緒池的工作原理,如果你還沒有看上篇,我建議最好瀏覽一下:執行緒池的工作原理詳解(上)

Executors 工具類

1.定義

Executors是java執行緒池的工廠類,通過它可以快速初始化一個符合業務需求的執行緒池。

2.常用方法

(1) newFixedThreadPool方法 :建立固定大小的執行緒池

  • 適用於為了滿足資源管理的需求,而需要限制當前執行緒數量的應用場景,即負載比較重的伺服器

FixedThreadPool方法原始碼如下:

    public static ExecutorService newFixedThreadPool
(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

分析:從原始碼可以看出,其實是初始化了一個指定執行緒數的執行緒池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞佇列,如果執行緒池沒有可執行任務時,也不會釋放執行緒。

執行FixedThreadPool圖解:

這裡寫圖片描述

分析:

1)如果當前執行的執行緒數少於corePoolSize,則建立新執行緒來執行任務。

2)線上程池完成預熱之後(當前執行的執行緒數等於corePoolSize),將任務加入LinkedBlockingQueue。

3)執行緒執行完1)中的任務後,會在迴圈中反覆從LinkedBlockingQueue獲取任務來執行。

FixedThreadPool使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為
Integer.MAX_VALUE)。使用無界佇列作為工作佇列會對執行緒池帶來如下影響。

①當執行緒池中的執行緒數達到corePoolSize後,新任務將在無界佇列中等待,因此執行緒池中
的執行緒數不會超過corePoolSize。

②由於①,使用無界佇列時maximumPoolSize將是一個無效引數。

③ 由於①和②,使用無界佇列時keepAliveTime將是一個無效引數。

④ 由於使用無界佇列,執行中的FixedThreadPool(未執行方法shutdown()或
shutdownNow())不會拒絕任務(不會呼叫RejectedExecutionHandler.rejectedExecution方法)。

(2)newCachedThreadPool() 方法 :快取執行緒池,執行緒池的數量不固定,可以根據需求自動的更改數量。

大小無界的執行緒池,適用於執行很多的短期非同步任務的小程式,或是負載較輕的伺服器。

CachedThreadPool方法原始碼如下

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

分析:

  • 初始化一個可以快取執行緒的執行緒池,預設快取60s,執行緒池的執行緒數可達到Integer.MAX_VALUE,即2147483647,內部使用不儲存元素的SynchronousQueue作為阻塞佇列;

  • newCachedThreadPool在沒有任務執行時,當執行緒的空閒時間超過keepAliveTime,會自動釋放執行緒資源,當提交新任務時,如果沒有空閒執行緒,則建立新執行緒執行任務,會導致一定的系統開銷;

執行CachedThreadPool圖解:

這裡寫圖片描述

分析:

① :首先執行SynchronousQueue.offer(Runnable task)。如果當前maximumPool中有空閒執行緒正在執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那麼主執行緒執行offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行,execute()方法執行完成;否則執行下面的 2)

② :當初始maximumPool為空,或者maximumPool中當前沒有空閒執行緒時,將沒有執行緒執行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,①將失敗。此時CachedThreadPool會建立一個新執行緒執行任務,execute()方法執行完成。

③ :在 ②中新建立的執行緒將任務執行完後,會執行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這個poll操作會讓空閒執行緒最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內主執行緒提交了一個新任務(主執行緒執行①),那麼這個空閒執行緒將執行主執行緒提交的新任務;否則,這個空閒執行緒將終止。由於空閒60秒的空閒執行緒會被終止,因此長時間保持空閒的CachedThreadPool不會使用任何資源。

  • CachedThreadPool使用SynchronousQueue,把主執行緒提交的任務傳遞給空閒執行緒執行。

  • CachedThreadPool中任務傳遞示意圖如下:

這裡寫圖片描述

(3)newSingleThreadExecutor() 方法 : 建立單個執行緒池。執行緒池中只有一個執行緒

適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個執行緒是活動的應用場景。

SingleThreadExecutor方法原始碼如下

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

分析:

初始化的執行緒池中只有一個執行緒,如果該執行緒異常結束,會重新建立一個新的執行緒繼續執行任務,唯一的執行緒可以保證所提交任務的順序執行,內部使用LinkedBlockingQueue作為阻塞佇列。

執行SingleThreadExecutor圖解:

這裡寫圖片描述

分析:

1)如果當前執行的執行緒數少於corePoolSize(即執行緒池中無執行的執行緒),則建立一個新執行緒來執行任務。

2)線上程池完成預熱之後(當前執行緒池中有一個執行的執行緒),將任務加入Linked-BlockingQueue。

3)執行緒執行完1中的任務後,會在一個無限迴圈中反覆從LinkedBlockingQueue獲取任務來執行。

(4)newScheduledThreadPool() 方法: 建立固定大小的執行緒,可以延遲或定時的執行任務。

適用於需要多個後臺執行緒執行週期任務,同時為了滿足資源 管理的需求而需要限制後臺執行緒的數量的應用場景

newScheduledThreadPool方法原始碼如下:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
  • ScheduledThreadPoolExecutor原始碼
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
  • 繼承父類的構造方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

分析:初始化的執行緒池可以在指定的時間內週期性的執行所提交的任務,在實際的業務場景中可以使用該執行緒池定期的同步資料。其實最底層還是通過ThreadPoolExecutor類實現的

執行ScheduledThreadPoolExecutor圖解

這裡寫圖片描述

分析:

  • 當呼叫ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWith-
    FixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue新增一個實現了
    RunnableScheduledFutur介面的ScheduledFutureTask。

  • 執行緒池中的執行緒從DelayQueue中獲取ScheduledFutureTask,然後執行任務。

如何合理配置執行緒池

1.任務特性

  • 任務的性質:CPU密集型任務、IO密集型任務和混合型任務。

  • 任務的優先順序:高、中和低。

  • 任務的執行時間:長、中和短。

  • 任務的依賴性:是否依賴其他系統資源,如資料庫連線。

2.處理策略

性質不同的任務可以用不同規模的執行緒池分開處理。

  • CPU密集型任務應配置儘可能小的 執行緒,如配置Ncpu+1個執行緒的執行緒池。由於IO密集型任務執行緒並不是一直在執行任務,則應配置儘可能多的執行緒,如2*Ncpu。

  • 混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務 和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於序列執行的吞吐量。

  • 如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。

  • 優先順序不同的任務可以使用優先順序佇列PriorityBlockingQueue來處理。它可以讓優先順序高的任務先執行。

3.注意

  • 如果一直有優先順序高的任務提交到佇列裡,那麼優先順序低的任務可能永遠不能執行。執行時間不同的任務可以交給不同規模的執行緒池來處理,或者可以使用優先順序佇列,讓執行時間短的任務先執行。

  • 依賴資料庫連線池的任務,因為執行緒提交SQL後需要等待資料庫返回結果,等待的時間越 長,則CPU空閒時間就越長,那麼執行緒數應該設定得儘量大,這樣才能更好地利用CPU。

  • 建議使用有界佇列。有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,如果當時我們設定成無界佇列,那麼執行緒池的佇列就會越來越多,有可能會撐滿記憶體,導致整個系統不可用,而不只是後臺任務出現問題。

執行緒池的監控

1.定義

如果在系統中大量使用執行緒池,則有必要對執行緒池進行監控,方便在出現問題時,可以根據執行緒池的使用狀況快速定位問題。可以通過執行緒池提供的引數進行監控,

2.監控執行緒池使用的屬性

  • taskCount:執行緒池需要執行的任務數量。

  • completedTaskCount:執行緒池在執行過程中已完成的任務數量,小於或等於taskCount。

  • largestPoolSize:執行緒池裡曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否曾經滿過。如該數值等於執行緒池的最大大小,則表示執行緒池曾經滿過。

  • getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,執行緒池裡的執行緒不會自動銷燬,所以這個大小隻增不減。

  • getActiveCount:獲取活動的執行緒數。通過擴充套件執行緒池進行監控。可以通過繼承執行緒池來自定義執行緒池,重寫執行緒池的

注:
beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行後和執行緒池關閉前執行一些程式碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。



本人才疏學淺,若有錯,請指出,謝謝!
如果你有更好的建議,可以留言我們一起討論,共同進步!
衷心的感謝您能耐心的讀完本篇博文。