1. 引言

合理利用執行緒池能夠帶來三個好處。

第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。

第二:提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。

第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。

但是要做到合理的利用執行緒池,必須對其原理了如指掌。

2. 執行緒池的使用

執行緒池的建立

我們可以通過java.util.concurrent.ThreadPoolExecutor來建立一個執行緒池。

new  ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

  

建立一個執行緒池需要輸入幾個引數:

  • corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads方法,執行緒池會提前建立並啟動所有基本執行緒。

  • runnableTaskQueue(任務佇列):用於儲存等待執行的任務的阻塞佇列。 可以選擇以下幾個阻塞佇列。

    • ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。
    • LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
    • SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
    • PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。
  • maximumPoolSize(執行緒池最大大小):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是如果使用了無界的任務佇列這個引數就沒什麼效果。

  • ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。

  • RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。以下是JDK1.5提供的四種策略。

    • AbortPolicy:直接丟擲異常。
    • CallerRunsPolicy:只用呼叫者所線上程來執行任務。
    • DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。
    • 當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。
  • keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。

  • TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

向執行緒池提交任務

我們可以使用execute提交的任務,但是execute方法沒有返回值,所以無法判斷任務是否被執行緒池執行成功。通過以下程式碼可知execute方法輸入的任務是一個Runnable類的例項

threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});

  

我們也可以使用submit 方法來提交任務,它會返回一個future,那麼我們可以通過這個future來判斷任務是否執行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後立即返回,這時有可能任務沒有執行完。

Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執行任務異常
} finally {
// 關閉執行緒池
executor.shutdown();
}

  

執行緒池的關閉

我們可以通過呼叫執行緒池的shutdown或shutdownNow方法來關閉執行緒池,它們的原理是

遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。

但是它們存在一定的區別,shutdownNow首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表,而shutdown只是將執行緒池的狀態設定成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的執行緒。

只要呼叫了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫isTerminaed方法會返回true。至於我們應該呼叫哪一種方法來關閉執行緒池,應該由提交到執行緒池的任務特性決定,

通常調shutdown來關閉執行緒池,如果任務不一定要執行完,則可以呼叫shutdownNow。

3. 執行緒池的分析

流程分析:執行緒池的主要工作流程如下圖:

從上圖我們可以看出,當提交一個新任務到執行緒池時,執行緒池的處理流程如下:

  1. 首先執行緒池判斷基本執行緒池是否已滿?沒滿,建立一個工作執行緒來執行任務。滿了,則進入下個流程。
  2. 其次執行緒池判斷工作佇列是否已滿?沒滿,則將新提交的任務儲存在工作佇列裡。滿了,則進入下個流程。
  3. 最後執行緒池判斷整個執行緒池是否已滿?沒滿,則建立一個新的工作執行緒來執行任務,滿了,則交給飽和策略來處理這個任務。

原始碼分析

上面的流程分析讓我們很直觀的瞭解了執行緒池的工作原理,讓我們再通過原始碼來看看是如何實現的。執行緒池執行任務的方法如下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果執行緒數小於基本執行緒數,則建立執行緒並執行當前任務
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如執行緒數大於等於基本執行緒數或執行緒建立失敗,則將當前任務放到工作佇列中。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果執行緒池不處於執行中或任務無法放入佇列,並且當前執行緒數量小於最大允許的執行緒數量,
則建立一個執行緒執行任務。
else if (!addIfUnderMaximumPoolSize(command))
//丟擲RejectedExecutionException異常
reject(command); // is shutdown or saturated
}
}

  

工作執行緒。執行緒池建立執行緒時,會將執行緒封裝成工作執行緒Worker,Worker在執行完任務後,還會無限迴圈獲取工作佇列裡的任務來執行。我們可以從Worker的run方法裡看到這點:

public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

  

4. 合理的配置執行緒池

要想合理的配置執行緒池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
  2. 任務的優先順序:高,中和低。
  3. 任務的執行時間:長,中和短。
  4. 任務的依賴性:是否依賴其他系統資源,如資料庫連線。

任務性質不同的任務可以用不同規模的執行緒池分開處理。

CPU密集型任務配置儘可能小的執行緒,如配置Ncpu+1個執行緒的執行緒池。

IO密集型任務則由於執行緒並不是一直在執行任務,則配置儘可能多的執行緒,如2*Ncpu。

混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於序列執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。

我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。

優先順序不同的任務可以使用優先順序佇列PriorityBlockingQueue來處理。它可以讓優先順序高的任務先得到執行,需要注意的是如果一直有優先順序高的任務提交到佇列裡,那麼優先順序低的任務可能永遠不能執行。

執行時間不同的任務可以交給不同規模的執行緒池來處理,或者也可以使用優先順序佇列,讓執行時間短的任務先執行。

依賴資料庫連線池的任務,因為執行緒提交SQL後需要等待資料庫返回結果,如果等待的時間越長CPU空閒時間就越長,那麼執行緒數應該設定越大,這樣才能更好的利用CPU。

建議使用有界佇列,有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。

有一次我們組使用的後臺任務執行緒池的佇列和執行緒池全滿了,不斷的丟擲拋棄任務的異常,通過排查發現是資料庫出現了問題,導致執行SQL變得非常緩慢,因為後臺任務執行緒池裡的任務全是需要向資料庫查詢和插入資料的,所以導致執行緒池裡的工作執行緒全部阻塞住,

任務積壓線上程池裡。

如果當時我們設定成無界佇列,執行緒池的佇列就會越來越多,有可能會撐滿記憶體,導致整個系統不可用,而不只是後臺任務出現問題。

當然我們的系統所有的任務是用的單獨的伺服器部署的,而我們使用不同規模的執行緒池跑不同型別的任務,但是出現這樣問題時也會影響到其他任務。

5. 執行緒池的監控

通過執行緒池提供的引數進行監控。執行緒池裡有一些屬性在監控執行緒池的時候可以使用

  • taskCount:執行緒池需要執行的任務數量。
  • completedTaskCount:執行緒池在執行過程中已完成的任務數量。小於或等於taskCount。
  • largestPoolSize:執行緒池曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否滿過。如等於執行緒池的最大大小,則表示執行緒池曾經滿了。
  • getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,池裡的執行緒不會自動銷燬,所以這個大小隻增不+ getActiveCount:獲取活動的執行緒數。

通過擴充套件執行緒池進行監控。通過繼承執行緒池並重寫執行緒池的beforeExecute,afterExecute和terminated方法,我們可以在任務執行前,執行後和執行緒池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法線上程池裡是空方法。如:

protected void beforeExecute(Thread t, Runnable r) { }

  

6. 執行緒池配置技巧

當執行緒數小於核心執行緒數時,建立執行緒。
當執行緒數大於等於核心執行緒數,且任務佇列未滿時,將任務放入任務佇列。
當執行緒數大於等於核心執行緒數,且任務佇列已滿:

  • 若執行緒數小於最大執行緒數,建立執行緒
  • 若執行緒數等於最大執行緒數,丟擲異常,拒絕任務

corePoolSize:核心執行緒數

核心執行緒會一直存活,及時沒有任務需要執行
當執行緒數小於核心執行緒數時,即使有執行緒空閒,執行緒池也會優先建立新執行緒處理
設定allowCoreThreadTimeout=true(預設false)時,核心執行緒會超時關閉。

queueCapacity:任務佇列容量(阻塞佇列)

當核心執行緒數達到最大時,新任務會放在佇列中排隊等待執行。

maxPoolSize:最大執行緒數

當執行緒數>=corePoolSize,且任務佇列已滿時。執行緒池會建立新執行緒來處理任務
當執行緒數=maxPoolSize,且任務佇列已滿時,執行緒池會拒絕處理任務而丟擲異常

keepAliveTime:執行緒空閒時間

當執行緒空閒時間達到keepAliveTime時,執行緒會退出,直到執行緒數量=corePoolSize
如果allowCoreThreadTimeout=true,則會直到執行緒數量=0
allowCoreThreadTimeout:允許核心執行緒超時

rejectedExecutionHandler:任務拒絕處理器

兩種情況會拒絕處理任務:
當執行緒數已經達到maxPoolSize,切佇列已滿,會拒絕新任務
當執行緒池被呼叫shutdown()後,會等待執行緒池裡的任務執行完畢,再shutdown。如果在呼叫shutdown()和執行緒池真正shutdown之間提交任務,會拒絕新任務
執行緒池會呼叫rejectedExecutionHandler來處理這個任務。如果沒有設定預設是AbortPolicy,會丟擲異常

ThreadPoolExecutor類有幾個內部實現類來處理這類情況:

  • AbortPolicy 丟棄任務,拋執行時異常
  • CallerRunsPolicy 執行任務
  • DiscardPolicy 忽視,什麼都不會發生
  • DiscardOldestPolicy 從佇列中踢出最先進入佇列(最後一個執行)的任務

實現RejectedExecutionHandler介面,可自定義處理器。

預設值

corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()

如何來設定

需要根據幾個值來決定
tasks :每秒的任務數,假設為500~1000
taskcost:每個任務花費時間,假設為0.1s
responsetime:系統允許容忍的最大響應時間,假設為1s

做幾個計算

corePoolSize = 每秒需要多少個執行緒處理? 
threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個執行緒。corePoolSize設定應該大於50
根據8020原則,如果80%的每秒任務數小於800,那麼corePoolSize設定為80即可
queueCapacity = (coreSizePool/taskcost)*responsetime
計算可得 queueCapacity = 80/0.1*1 = 80。意思是佇列裡的執行緒可以等待1s,超過了的需要新開執行緒來執行
切記不能設定為Integer.MAX_VALUE,這樣佇列會很大,執行緒數只會保持在corePoolSize大小,當任務陡增時,不能新開執行緒來執行,響應時間會隨之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
計算可得 maxPoolSize = (1000-80)/10 = 92
(最大任務數-佇列容量)/每個執行緒每秒處理能力 = 最大執行緒數
rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩衝機制來處理
keepAliveTime和allowCoreThreadTimeout採用預設通常能滿足。

轉載

聊聊併發(三)——JAVA執行緒池的分析和使用