1. 程式人生 > >Java執行緒池的使用總結

Java執行緒池的使用總結

  Java中的執行緒池是運用場景最多的併發框架, 幾乎所有需要非同步或併發執行任務的程式都可以使用執行緒池。 在開發過程中, 合理地使用執行緒池能夠帶來3個好處。

  第一: 降低資源消耗。 通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗
  第二: 提高響應速度。 當任務到達時, 任務可以不需要等到執行緒建立就能立即執行
  第三: 提高執行緒的可管理性。 執行緒是稀缺資源, 如果無限制地建立, 不僅會消耗系統資源,還會降低系統的穩定性, 使用執行緒池可以進行統一分配、 調優和監控。 但是, 要做到合理利用執行緒池, 必須對其實現原理了如指掌。

1、執行緒池工作流程

  當向執行緒池提交一個任務之後, 執行緒池是如何處理這個任務的呢?下圖展示了執行緒池的主要處理流程

/

  從圖中可以看出, 當提交一個新任務到執行緒池時, 執行緒池的處理流程如下:   1) 執行緒池判斷核心執行緒池裡的執行緒是否都在執行任務。 如果不是, 則建立一個新的工作執行緒來執行任務。 如果核心執行緒池裡的執行緒都在執行任務, 則進入下個流程。
  2) 執行緒池判斷工作佇列是否已經滿。 如果工作佇列沒有滿, 則將新提交的任務儲存在這個工作佇列裡。 如果工作佇列滿了, 則進入下個流程。
  3) 執行緒池判斷執行緒池的執行緒是否都處於工作狀態。 如果沒有,則建立一個新的工作執行緒來執行任務。 如果已經滿了,則交給飽和策略來處理這個任務

2、執行緒池的使用

  Java提供了自己的執行緒池。每次只執行指定數量的執行緒,java.util.concurrent.ThreadPoolExecutor 就是這樣的執行緒池,我們可以通過ThreadPoolExecutor來建立一個執行緒池。建立程式碼如下:

new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,
milliseconds,runnableTaskQueue,handler);

  建立一個執行緒池時需要輸入幾個引數,對引數的解釋如下:
  1) corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時, 執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒, 等到需要執行的任務數大於執行緒池基本大小時就不再建立不超過maximumPoolSize值時,執行緒池中最多有corePoolSize 個執行緒工作

。 如果呼叫了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有基本執行緒。

  2)runnableTaskQueue(任務佇列): 用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列:
  ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列, 此佇列按FIFO(先進先出) 原則對元素進行排序
  LinkedBlockingQueue: 一個基於連結串列結構的阻塞佇列, 此佇列按FIFO排序元素, 吞吐量通常要高於ArrayBlockingQueue。 靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列;
  SynchronousQueue: 一個不儲存元素的阻塞佇列。 每個插入操作必須等到另一個執行緒呼叫移除操作, 否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列;
  PriorityBlockingQueue: 一個具有優先順序的無界阻塞佇列。

  3)maximumPoolSize(執行緒池最大數量):執行緒池允許建立的最大執行緒數。 如果佇列滿了, 並且已建立的執行緒數小於最大執行緒數, 則執行緒池會再建立新的執行緒執行任務。 值得注意的是, 如果使用了無界的任務佇列這個引數就沒什麼效果

  4)ThreadFactory: 用於設定建立執行緒的工廠, 可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。 使用開源框架guava提供的ThreadFactoryBuilder可以快速給執行緒池裡的執行緒設定有意義的名字。

  5)RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。 這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。在JDK 1.5中Java執行緒池框架提供了以下4種策略:
  AbortPolicy:直接丟擲異常;
  CallerRunsPolicy:只用呼叫者所線上程來執行任務;
  DiscardOldestPolicy:丟棄佇列裡最近的一個任務, 並執行當前任務;
  DiscardPolicy:不處理,丟棄掉。

  6)keepAliveTime(執行緒活動保持時間): 執行緒池的工作執行緒空閒後, 保持存活的時間。 所以,如果任務很多, 並且每個任務執行的時間比較短,可以調大時間,提高執行緒的利用率。

  7)TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、 微秒(MICROSECONDS, 千分之一毫秒) 和納秒(NANOSECONDS, 千分之一微秒)。

  執行緒池執行execute() 方法的示意圖如下圖所示:

 

  當呼叫執行緒池的 execute() 方法新增一個任務時,執行緒池會做如下判斷:
  a. 如果正在執行的執行緒數量小於 corePoolSize, 則建立新執行緒來執行任務(注意, 執行這一步驟需要獲取全域性鎖);
  b. 如果正在執行如果執行的執行緒等於或多於corePoolSize, 則將任務加入BlockingQueue;
  c. 如果無法將任務加入BlockingQueue(佇列已滿),而且正在執行的執行緒數量小於 maximumPoolSize,則建立新的執行緒來處理任務(注意, 執行這一步驟需要獲取全域性鎖);
  d. 如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,任務將被拒絕,執行緒池呼叫RejectedExecutionHandler.rejectedExecution()方法,執行飽和策略 。

  這個過程說明,並不是先加入的任務就一定會先執行。假設佇列大小為 4,corePoolSize為2,maximumPoolSize為6,那麼當加入15個任務時,執行的順序類似這樣:首先執行任務 1、2,然後任務3~6被放入佇列。這時候佇列滿了,任務7、8、9、10 會被馬上執行,而任務 11~15 則會丟擲異常。最終順序是:1、2、7、8、9、10、3、4、5、6。當然這個過程是針對指定大小的ArrayBlockingQueue<Runnable>任務佇列來說,如果是LinkedBlockingQueue<Runnable>任務佇列,初始化時又不指定容量,因為該佇列無大小限制(預設限制是Integer.MAX_VALUE),所以不存在上述問題。

  執行緒池的任務佇列選用LinkedBlockingQueue<Runnable>時,測試程式碼如下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest implements Runnable {

    @Override
    public void run() {
        synchronized (this) {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        BlockingQueue<Runnable> queue = new LinkedBlockingDeque<Runnable>();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1,
                TimeUnit.DAYS, queue);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Thread(new ThreadPoolTest(), "TestThread"
                    .concat("" + i)));
            int threadSize = queue.size();
            System.out.println("執行緒佇列大小為-->" + threadSize);
        }
        executor.shutdown();
    }
}

  執行效果如下圖:

這裡寫圖片描述

  可見,共執行了10個任務,有3個任務立即被建立執行緒,獲得執行。7個任務儲存在任務佇列中(程式碼中使用了無界的任務佇列,引數maximumPoolSize失效,即程式碼中設定的最大執行緒數量6無效)。因為是從執行緒池裡執行的執行緒,所以雖然將執行緒的名稱設為”TestThread”.concat(“”+i),但輸出後還是變成了pool-1-thread-x。

  執行緒池的任務佇列選用ArrayBlockingQueue<Runnable>時,測試程式碼如下:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest implements Runnable {

    @Override
    public void run() {
        synchronized (this) {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(5);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1,
                TimeUnit.DAYS, queue);
        for (int i = 0; i < 12; i++) {
            executor.execute(new Thread(new ThreadPoolTest(), "TestThread"
                    .concat("" + i)));
            int threadSize = queue.size();
            System.out.println("執行緒佇列大小為-->" + threadSize);
        }
        executor.shutdown();
    }
}

  執行效果如下圖:

這裡寫圖片描述

  因為執行緒池的最大執行緒數量為6,基本大小為3,有界阻塞佇列長度為5,所以執行緒池能夠容納的任務最多是11個。編號為1,2,3的任務被立刻建立執行緒執行,然後編號為4,5,6,7,8的任務進阻塞佇列。編號9,10,11的任務被立刻建立執行緒執行(此時阻塞佇列已滿,而執行緒數量沒有達到執行緒池允許的最大執行緒數量)。此時,執行緒池處於飽和狀態。編號為12的任務提交後就丟擲異常。

  可以通過呼叫執行緒池的shutdown()或shutdownNow()方法來關閉執行緒池。 它們的原理是遍歷執行緒池中的工作執行緒, 然後逐個呼叫執行緒的interrupt()方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。 但是它們存在一定的區別, shutdownNow()首先將執行緒池的狀態設定成STOP, 然後嘗試停止所有的正在執行或暫停任務的執行緒, 並返回等待執行任務的列表, 而shutdown()只是將執行緒池的狀態設定成SHUTDOWN狀態, 然後中斷所有沒有正在執行任務的執行緒

  只要呼叫了這兩個關閉方法中的任意一個, isShutdown()方法就會返回true。 當所有的任務都已關閉後, 才表示執行緒池關閉成功, 這時呼叫isTerminaed()方法會返回true。 至於應該呼叫哪一種方法來關閉執行緒池, 應該由提交到執行緒池的任務特性決定, 通常呼叫shutdown()方法來關閉執行緒池, 如果任務不一定要執行完, 則可以呼叫shutdownNow()方法

  ArrayBlockingQueue是一個由陣列支援的有界阻塞佇列。在讀寫操作上都需要鎖住整個容器,因此吞吐量與一般的實現是相似的,適合於實現“生產者消費者”模式。

  LinkedBlockingQueue是基於連結串列的阻塞佇列,預設最大是Integer.MAX_VALUE。同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。其中主要用到put和take方法,put方法在佇列滿的時候會阻塞直到有佇列成員被消費,take方法在佇列空的時候會阻塞,直到有佇列成員被放進來。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。

3、合理使用執行緒池

  要想合理地配置執行緒池, 就必須首先分析任務特性, 可以從以下幾個角度來分析:
  任務的性質: CPU密集型任務、 IO密集型任務和混合型任務。
  任務的優先順序: 高、 中和低。
  任務的執行時間: 長、 中和短。
  任務的依賴性: 是否依賴其他系統資源, 如資料庫連線。
  性質不同的任務可以用不同規模的執行緒池分開處理。假設N為裝置的CPU個數, CPU密集型任務應配置儘可能小的執行緒, 如配置N+1個執行緒的執行緒池。 由於IO密集型任務執行緒並不是一直在執行任務, 則應配置儘可能多的執行緒, 如2*N。 混合型的任務, 如果可以拆分, 將其拆分成一個CPU密集型任務和一個IO密集型任務, 只要這兩個任務執行的時間相差不是太大, 那麼分解後執行的吞吐量將高於序列執行的吞吐量。 如果這兩個任務執行時間相差太大, 則沒必要進行分解。 可以通過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。

  建議使用有界佇列。 有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點兒, 比如幾千。 有一次, 我們系統裡後臺任務執行緒池的佇列和執行緒池全滿了,不斷丟擲拋棄任務的異常, 通過排查發現是資料庫出現了問題, 導致執行SQL變得非常緩慢, 因為後臺任務執行緒池裡的任務全是需要向資料庫查詢和插入資料的, 所以導致執行緒池裡的工作執行緒全部阻塞, 任務積壓線上程池裡。 如果當時我們設定成無界佇列,那麼執行緒池的佇列就會越來越多,有可能會撐滿記憶體, 導致整個系統不可用,而不只是後臺任務出現問題。 當然, 我們的系統所有的任務是用單獨的伺服器部署的, 我們使用不同規模的執行緒池完成不同型別的任務, 但是出現這樣問題時也會影響到其他任務。

  本文內容大部分出自《Java併發程式設計的藝術》第9章。