1. 程式人生 > >聊聊併發(三)Java執行緒池的分析和使用

聊聊併發(三)Java執行緒池的分析和使用

1.    引言

合理利用執行緒池能夠帶來三個好處。第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。第二:提高響應速度。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。但是要做到合理的利用執行緒池,必須對其原理了如指掌。

2.執行緒池的使用

執行緒池的建立

我們可以通過ThreadPoolExecutor來建立一個執行緒池。

new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);

建立一個執行緒池需要輸入幾個引數:

  • corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads方法,執行緒池會提前建立並啟動所有基本執行緒。
  • runnableTaskQueue(任務佇列):用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。
  1. ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。
  2. LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
  3. SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
  4. PriorityBlockingQueue:一個具有優先順序得無限阻塞佇列。
  • maximumPoolSize(執行緒池最大大小):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是如果使用了無界的任務佇列這個引數就沒什麼效果。
  • ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字,Debug和定位問題時非常又幫助。

RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。以下是JDK1.5提供的四種策略。n  AbortPolicy:直接丟擲異常。

  1. CallerRunsPolicy:只用呼叫者所線上程來執行任務。
  2. DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
  3. DiscardPolicy:不處理,丟棄掉。
  4. 當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。
  • keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。
  • TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

向執行緒池提交任務

我們可以使用execute提交的任務,但是execute方法沒有返回值,所以無法判斷任務知否被執行緒池執行成功。通過以下程式碼可知execute方法輸入的任務是一個Runnable類的例項。

threadsPool.execute(new Runnable() {
@Override

public void run() {

// TODO Auto-generated method stub

}

});

我們也可以使用submit 方法來提交任務,它會返回一個future,那麼我們可以通過這個future來判斷任務是否執行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後立即返回,這時有可能任務沒有執行完。

try {

Object s = future.get();

} catch (InterruptedException e) {

// 處理中斷異常

} catch (ExecutionException e) {

// 處理無法執行任務異常

} finally {

// 關閉執行緒池

executor.shutdown();

}

執行緒池的關閉

我們可以通過呼叫執行緒池的shutdown或shutdownNow方法來關閉執行緒池,但是它們的實現原理不同,shutdown的原理是隻是將執行緒池的狀態設定成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的執行緒。shutdownNow的原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。shutdownNow會首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表。

只要呼叫了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫isTerminaed方法會返回true。至於我們應該呼叫哪一種方法來關閉執行緒池,應該由提交到執行緒池的任務特性決定,通常呼叫shutdown來關閉執行緒池,如果任務不一定要執行完,則可以呼叫shutdownNow。

3.    執行緒池的分析

流程分析:執行緒池的主要工作流程如下圖:

Java執行緒池主要工作流程

從上圖我們可以看出,當提交一個新任務到執行緒池時,執行緒池的處理流程如下:

  1. 首先執行緒池判斷基本執行緒池是否已滿?沒滿,建立一個工作執行緒來執行任務。滿了,則進入下個流程。
  2. 其次執行緒池判斷工作佇列是否已滿?沒滿,則將新提交的任務儲存在工作佇列裡。滿了,則進入下個流程。
  3. 最後執行緒池判斷整個執行緒池是否已滿?沒滿,則建立一個新的工作執行緒來執行任務,滿了,則交給飽和策略來處理這個任務。

原始碼分析。上面的流程分析讓我們很直觀的瞭解的執行緒池的工作原理,讓我們再通過原始碼來看看是如何實現的。執行緒池執行任務的方法如下:

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

//如果執行緒數小於基本執行緒數,則建立執行緒並執行當前任務

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

//如執行緒數大於等於基本執行緒數或執行緒建立失敗,則將當前任務放到工作佇列中。

if (runState == RUNNING && workQueue.offer(command)) {

if (runState != RUNNING || poolSize == 0)

ensureQueuedTaskHandled(command);

}

//如果執行緒池不處於執行中或任務無法放入佇列,並且當前執行緒數量小於最大允許的執行緒數量,則建立一個執行緒執行任務。

else if (!addIfUnderMaximumPoolSize(command))

//丟擲RejectedExecutionException異常

reject(command); // is shutdown or saturated

}

}

工作執行緒。執行緒池建立執行緒時,會將執行緒封裝成工作執行緒Worker,Worker在執行完任務後,還會無限迴圈獲取工作佇列裡的任務來執行。我們可以從Worker的run方法裡看到這點:

public void run() {

     try {

           Runnable task = firstTask;

           firstTask = null;

            while (task != null || (task = getTask()) != null) {

                    runTask(task);

                    task = null;

            }

      } finally {

             workerDone(this);

      }

}

4.    合理的配置執行緒池

要想合理的配置執行緒池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
  2. 任務的優先順序:高,中和低。
  3. 任務的執行時間:長,中和短。
  4. 任務的依賴性:是否依賴其他系統資源,如資料庫連線。

任務性質不同的任務可以用不同規模的執行緒池分開處理。CPU密集型任務配置儘可能少的執行緒數量,如配置Ncpu+1個執行緒的執行緒池。IO密集型任務則由於需要等待IO操作,執行緒並不是一直在執行任務,則配置儘可能多的執行緒,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於序列執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。

優先順序不同的任務可以使用優先順序佇列PriorityBlockingQueue來處理。它可以讓優先順序高的任務先得到執行,需要注意的是如果一直有優先順序高的任務提交到佇列裡,那麼優先順序低的任務可能永遠不能執行。

執行時間不同的任務可以交給不同規模的執行緒池來處理,或者也可以使用優先順序佇列,讓執行時間短的任務先執行。

依賴資料庫連線池的任務,因為執行緒提交SQL後需要等待資料庫返回結果,如果等待的時間越長CPU空閒時間就越長,那麼執行緒數應該設定越大,這樣才能更好的利用CPU。

建議使用有界佇列,有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。有一次我們組使用的後臺任務執行緒池的佇列和執行緒池全滿了,不斷的丟擲拋棄任務的異常,通過排查發現是資料庫出現了問題,導致執行SQL變得非常緩慢,因為後臺任務執行緒池裡的任務全是需要向資料庫查詢和插入資料的,所以導致執行緒池裡的工作執行緒全部阻塞住,任務積壓線上程池裡。如果當時我們設定成無界佇列,執行緒池的佇列就會越來越多,有可能會撐滿記憶體,導致整個系統不可用,而不只是後臺任務出現問題。當然我們的系統所有的任務是用的單獨的伺服器部署的,而我們使用不同規模的執行緒池跑不同型別的任務,但是出現這樣問題時也會影響到其他任務。

5.    執行緒池的監控

通過執行緒池提供的引數進行監控。執行緒池裡有一些屬性在監控執行緒池的時候可以使用

  • taskCount:執行緒池需要執行的任務數量。
  • completedTaskCount:執行緒池在執行過程中已完成的任務數量。小於或等於taskCount。
  • largestPoolSize:執行緒池曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否滿過。如等於執行緒池的最大大小,則表示執行緒池曾經滿了。
  • getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,池裡的執行緒不會自動銷燬,所以這個大小隻增不減。
  • getActiveCount:獲取活動的執行緒數。

通過擴充套件執行緒池進行監控。通過繼承執行緒池並重寫執行緒池的beforeExecute,afterExecute和terminated方法,我們可以在任務執行前,執行後和執行緒池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法線上程池裡是空方法。如:

<b>protected</b> <b>void</b> beforeExecute(Thread t, Runnable r) { }

6.    參考資料

  • Java併發程式設計實戰。
  • JDK1.6原始碼。


方 騰飛

花名清英,併發網(ifeve.com)創始人,暢銷書《Java併發程式設計的藝術》作者,螞蟻金服技術專家。目前工作於支付寶微貸事業部,關注網際網路金融,併發程式設計和敏捷實踐。微信公眾號aliqinying。