1. 程式人生 > >談談執行緒池使用原則 (執行緒池如何監控)

談談執行緒池使用原則 (執行緒池如何監控)

基礎知識

作為Java開發工程師,工作中基本上都會用到執行緒池。Java中執行緒池最基本的定義如下:

其中corePoolSize是執行緒池核心數目;maximumPoolSize執行緒池中執行緒的最大數目;keepAliveTime表示當執行緒數目大於core並且超過一定時間,會關閉多餘的執行緒池;workQueue存放任務的執行緒;threadFactory建立執行緒的類;handler拒絕處理任務時的策略。

當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:

  1. 如果執行緒池裡執行緒數量小於corePoolSize,不管執行緒池裡面的執行緒是否處於執行狀態,那麼馬上建立執行緒執行這個任務;

  2. 如果執行緒池裡執行緒數量大於或等於corePoolSize,那麼將這個任務放入佇列。

  3. 如果這時候佇列滿了,而且執行緒池裡的執行緒數目小於maximumPoolSize,那麼還是要建立執行緒執行這個任務;

  4. 如果佇列滿了,並且執行緒池裡的執行緒數目達到maximumPoolSize,那麼執行緒池就會執行handler策略。

使用原則

一定要傳遞threadFactory這個引數,定義有意義的執行緒名

因為執行緒名有時候在排查問題的時候特別有用,比如:使用jstack,當整個執行緒棧看不出有用的資訊,此時執行緒名就尤為關鍵了。每次看到poo-num-thread-num就想罵人。

另外有時候在配置日誌的時候會輸入執行緒名,此時有意義的執行緒名比毫無意義的預設執行緒名要好很多。

儘量避免區域性變數建立執行緒池

引入執行緒池的目的提高資源複用,如果在區域性變數建立執行緒池,基本上達不到提高資源複用,而且很有可能因為忘記呼叫shutdown出現資源洩漏。下面是一個這樣的case:

上面的程式碼在多次執行之後將出現下面的OOM。

執行緒池大小和佇列設定原則

在談論這個問題之前,我們先來看一個case。

有一次我們提供的服務介面時間慢慢上升,上升到一定時間之後不再上升,但是上游服務從我們這裡獲取不到資料了。而我們服務依賴的下游服務響應時間和資料確實正常的。最後排查下來發現在呼叫下游的時候使用了執行緒池。其中core=10,佇列的size又特別大,下游服務介面的平均響應時間為100ms。那我們我們服務單機能夠提供的最高QPS也就是100,當超過100的時候,任務進來之後會先在佇列裡等待。持續的處理能力跟不上,就會導致任務還沒有執行,上游介面就超時了,拿不到資料。

所以對於核心介面以及沒有突發流量情況下,我通過給出的建議是使用SynchronousQueue 這個佇列,並且maxPoolSize儘量大一些。

當使用有界佇列的時候,corePoolSize設定的應該儘可能和maximumPoolSize相等,並且針對佇列應該設定監控。

還有可以根據任務特點來設定執行緒數。比如任務要是IO密集型執行緒池大小可以設定的大一些;要是CPU密集型設定小一點,可以簡單設定為cpu ~ cpu *2。

最好能設計一個可監控的執行緒池

因為使用執行緒池有太多坑,特別是剛入門的新人,我司每年都會因為執行緒池問題發生的case。我認為要杜絕事故發生就是應該完善監控,線上程池使用不當時能夠自動發現及時告警避免事故發生。

執行緒池監控的關鍵點,我認為以下幾點:

  1. handler的監控。一旦任務進入handler說明此時執行緒池數目在max的時候都處理不過來了,服務肯定會收到影響。這種情況要及時處理。

  2. workQueue的大小。如果workQueue裡面有擠壓,說明執行緒數在core任務處理不過來,要注意這種情況對服務帶來的影響。

  3. 監控activeCount的數目。這樣可以瞭解設定的引數是否合理,比如core設定的太大,浪費資源。

  4. 監控通過執行緒池建立的執行緒總數。在建立執行緒時候+1,銷燬的時候-1,這樣可以監控是否有資源洩漏。

在完善監控之後,要是能做到動態調整執行緒池引數就更好了,比如發現任務進入了handler,可以動態調整max去處理擠壓,處理完擠壓之後再把max設定會原來的值。

總結

在使用執行緒的時候必須要仔細考量每個引數,以及可能帶來的影響。並且還得考慮執行緒資源洩漏的問題。最好的情況下,公司能定義一個可監控的執行緒池元件,類似於hystrix。

什麼是執行緒池?

很簡單,簡單看名字就知道是裝有執行緒的池子,我們可以把要執行的多執行緒交給執行緒池來處理,和連線池的概念一樣,通過維護一定數量的執行緒池來達到多個執行緒的複用。

執行緒池的好處

我們知道不用執行緒池的話,每個執行緒都要通過new Thread(xxRunnable).start()的方式來建立並執行一個執行緒,執行緒少的話這不會是問題,而真實環境可能會開啟多個執行緒讓系統和程式達到最佳效率,當執行緒數達到一定數量就會耗盡系統的CPU和記憶體資源,也會造成GC頻繁收集和停頓,因為每次建立和銷燬一個執行緒都是要消耗系統資源的,如果為每個任務都建立執行緒這無疑是一個很大的效能瓶頸。所以,執行緒池中的執行緒複用極大節省了系統資源,當執行緒一段時間不再有任務處理時它也會自動銷燬,而不會長駐記憶體。

執行緒池核心類

在java.util.concurrent包中我們能找到執行緒池的定義,其中ThreadPoolExecutor是我們執行緒池核心類,首先看看執行緒池類的主要引數有哪些。

  • corePoolSize:執行緒池的核心大小,也可以理解為最小的執行緒池大小。

  • maximumPoolSize:最大執行緒池大小。

  • keepAliveTime:空餘執行緒存活時間,指的是超過corePoolSize的空餘執行緒達到多長時間才進行銷燬。

  • unit:銷燬時間單位。

  • workQueue:儲存等待執行執行緒的工作佇列。

  • threadFactory:建立執行緒的工廠,一般用預設即可。

  • handler:拒絕策略,當工作佇列、執行緒池全已滿時如何拒絕新任務,預設丟擲異常。

執行緒池工作流程

1、如果執行緒池中的執行緒小於corePoolSize時就會建立新執行緒直接執行任務。

2、如果執行緒池中的執行緒大於corePoolSize時就會暫時把任務儲存到工作佇列workQueue中等待執行。

3、如果工作佇列workQueue也滿時:當執行緒數小於最大執行緒池數maximumPoolSize時就會建立新執行緒來處理,而執行緒數大於等於最大執行緒池數maximumPoolSize時就會執行拒絕策略。

執行緒池分類

Executors是jdk裡面提供的建立執行緒池的工廠類,它預設提供了4種常用的執行緒池應用,而不必我們去重複構造。

  • newFixedThreadPool

    固定執行緒池,核心執行緒數和最大執行緒數固定相等,而空閒存活時間為0毫秒,說明此引數也無意義,工作佇列為最大為Integer.MAX_VALUE大小的阻塞佇列。當執行任務時,如果執行緒都很忙,就會丟到工作佇列等有空閒執行緒時再執行,佇列滿就執行預設的拒絕策略。

  • newCachedThreadPool

       帶緩衝執行緒池,從構造看核心執行緒數為0,最大執行緒數為Integer最大值大小,超過0個的空閒執行緒在60秒後銷燬,SynchronousQueue這是一個直接提交的佇列,意味著每個新任務都會有執行緒來執行,如果執行緒池有可用執行緒則執行任務,沒有的話就建立一個來執行,執行緒池中的執行緒數不確定,一般建議執行速度較快較小的執行緒,不然這個最大執行緒池邊界過大容易造成記憶體溢位。

  • newSingleThreadExecutor

       單執行緒執行緒池,核心執行緒數和最大執行緒數均為1,空閒執行緒存活0毫秒同樣無意思,意味著每次只執行一個執行緒,多餘的先儲存到工作佇列,一個一個執行,保證了執行緒的順序執行。

  • newScheduledThreadPool

    排程執行緒池,即按一定的週期執行任務,即定時任務,對ThreadPoolExecutor進行了包裝而已。

拒絕策略

  • AbortPolicy

      簡單粗暴,直接丟擲拒絕異常,這也是預設的拒絕策略。

  • CallerRunsPolicy

       如果執行緒池未關閉,則會在呼叫者執行緒中直接執行新任務,這會導致主執行緒提交執行緒效能變慢。

  • DiscardPolicy

       從方法看沒做任務操作,即表示不處理新任務,即丟棄。

  • DiscardOldestPolicy

       拋棄最老的任務,就是從佇列取出最老的任務然後放入新的任務進行執行。        

如何提交執行緒

如可以先隨便定義一個固定大小的執行緒池

ExecutorService es = Executors.newFixedThreadPool(3);

提交一個執行緒

es.submit(xxRunnble);

es.execute(xxRunnble);

submit和execute分別有什麼區別呢?

execute沒有返回值,如果不需要知道執行緒的結果就使用execute方法,效能會好很多。

submit返回一個Future物件,如果想知道執行緒結果就使用submit提交,而且它能在主執行緒中通過Future的get方法捕獲執行緒中的異常。

如何關閉執行緒池

e

什麼是執行緒池?

很簡單,簡單看名字就知道是裝有執行緒的池子,我們可以把要執行的多執行緒交給執行緒池來處理,和連線池的概念一樣,通過維護一定數量的執行緒池來達到多個執行緒的複用。

執行緒池的好處

我們知道不用執行緒池的話,每個執行緒都要通過new Thread(xxRunnable).start()的方式來建立並執行一個執行緒,執行緒少的話這不會是問題,而真實環境可能會開啟多個執行緒讓系統和程式達到最佳效率,當執行緒數達到一定數量就會耗盡系統的CPU和記憶體資源,也會造成GC頻繁收集和停頓,因為每次建立和銷燬一個執行緒都是要消耗系統資源的,如果為每個任務都建立執行緒這無疑是一個很大的效能瓶頸。所以,執行緒池中的執行緒複用極大節省了系統資源,當執行緒一段時間不再有任務處理時它也會自動銷燬,而不會長駐記憶體。

執行緒池核心類

在java.util.concurrent包中我們能找到執行緒池的定義,其中ThreadPoolExecutor是我們執行緒池核心類,首先看看執行緒池類的主要引數有哪些。

  • corePoolSize:執行緒池的核心大小,也可以理解為最小的執行緒池大小。

  • maximumPoolSize:最大執行緒池大小。

  • keepAliveTime:空餘執行緒存活時間,指的是超過corePoolSize的空餘執行緒達到多長時間才進行銷燬。

  • unit:銷燬時間單位。

  • workQueue:儲存等待執行執行緒的工作佇列。

  • threadFactory:建立執行緒的工廠,一般用預設即可。

  • handler:拒絕策略,當工作佇列、執行緒池全已滿時如何拒絕新任務,預設丟擲異常。

執行緒池工作流程

1、如果執行緒池中的執行緒小於corePoolSize時就會建立新執行緒直接執行任務。

2、如果執行緒池中的執行緒大於corePoolSize時就會暫時把任務儲存到工作佇列workQueue中等待執行。

3、如果工作佇列workQueue也滿時:當執行緒數小於最大執行緒池數maximumPoolSize時就會建立新執行緒來處理,而執行緒數大於等於最大執行緒池數maximumPoolSize時就會執行拒絕策略。

執行緒池分類

Executors是jdk裡面提供的建立執行緒池的工廠類,它預設提供了4種常用的執行緒池應用,而不必我們去重複構造。

  • newFixedThreadPool

    固定執行緒池,核心執行緒數和最大執行緒數固定相等,而空閒存活時間為0毫秒,說明此引數也無意義,工作佇列為最大為Integer.MAX_VALUE大小的阻塞佇列。當執行任務時,如果執行緒都很忙,就會丟到工作佇列等有空閒執行緒時再執行,佇列滿就執行預設的拒絕策略。

  • newCachedThreadPool

       帶緩衝執行緒池,從構造看核心執行緒數為0,最大執行緒數為Integer最大值大小,超過0個的空閒執行緒在60秒後銷燬,SynchronousQueue這是一個直接提交的佇列,意味著每個新任務都會有執行緒來執行,如果執行緒池有可用執行緒則執行任務,沒有的話就建立一個來執行,執行緒池中的執行緒數不確定,一般建議執行速度較快較小的執行緒,不然這個最大執行緒池邊界過大容易造成記憶體溢位。

  • newSingleThreadExecutor

       單執行緒執行緒池,核心執行緒數和最大執行緒數均為1,空閒執行緒存活0毫秒同樣無意思,意味著每次只執行一個執行緒,多餘的先儲存到工作佇列,一個一個執行,保證了執行緒的順序執行。

  • newScheduledThreadPool

    排程執行緒池,即按一定的週期執行任務,即定時任務,對ThreadPoolExecutor進行了包裝而已。

拒絕策略

  • AbortPolicy

      簡單粗暴,直接丟擲拒絕異常,這也是預設的拒絕策略。

  • CallerRunsPolicy

       如果執行緒池未關閉,則會在呼叫者執行緒中直接執行新任務,這會導致主執行緒提交執行緒效能變慢。

  • DiscardPolicy

       從方法看沒做任務操作,即表示不處理新任務,即丟棄。

  • DiscardOldestPolicy

       拋棄最老的任務,就是從佇列取出最老的任務然後放入新的任務進行執行。        

如何提交執行緒

如可以先隨便定義一個固定大小的執行緒池

ExecutorService es = Executors.newFixedThreadPool(3);

提交一個執行緒

es.submit(xxRunnble);

es.execute(xxRunnble);

submit和execute分別有什麼區別呢?

execute沒有返回值,如果不需要知道執行緒的結果就使用execute方法,效能會好很多。

submit返回一個Future物件,如果想知道執行緒結果就使用submit提交,而且它能在主執行緒中通過Future的get方法捕獲執行緒中的異常。

如何關閉執行緒池

es.shutdown(); 

不再接受新的任務,之前提交的任務等執行結束再關閉執行緒池。

es.shutdownNow();

不再接受新的任務,試圖停止池中的任務再關閉執行緒池,返回所有未處理的執行緒list列表。

s.shutdown(); 

不再接受新的任務,之前提交的任務等執行結束再關閉執行緒池。

es.shutdownNow();

不再接受新的任務,試圖停止池中的任務再關閉執行緒池,返回所有未處理的執行緒list列表。

Java執行緒池該如何監控?

日常開發中,當我們開發一些複合型任務時,嚐嚐會使用執行緒池通過非同步的方式解決一些對時效性要求不高的任務。下面小編列舉幾種執行緒池的使用方式,以便參考!

Java JDK中預設封裝好的Executors類:

下面簡單的列舉三種我們常用的執行緒池操作類:

  1. public static void main(String[] args) {

  2. //建立大小為4個執行緒的執行緒池

  3. ExecutorService executorService1 = Executors.newFixedThreadPool(4);

  4. //建立一個單獨執行緒的執行緒池

  5. ExecutorService executorService2 = Executors.newSingleThreadExecutor();

  6. //建立快取行執行緒池

  7. ExecutorService executorService3 = Executors.newCachedThreadPool();

  8. }

這段程式碼的優點也是顯而易見的,就是操作執行緒池的便利性,我們可以非常方便的使用執行緒池來結合到我們的業務開發中。 

但往往事物都兩面性,這段程式碼的缺點就是可能導致OOM,因為其內部是一個無解佇列,當你的任務數遠遠大於你的執行緒池數量時,快取佇列則會一直被追加,直到把你當前機器的記憶體塞滿,最終導致OOM事件。

ThreadPoolExecutor類

根據Executors類的原始碼得知,內部其實是通過new ThreadPoolExecutor類進行實現的,下面我們來看下Executors.newFixedThreadPool的原始碼實現:

  1. /**

  2. * Creates a thread pool that reuses a fixed number of threads

  3. * operating off a shared unbounded queue. At any point, at most

  4. * {@code nThreads} threads will be active processing tasks.

  5. * If additional tasks are submitted when all threads are active,

  6. * they will wait in the queue until a thread is available.

  7. * If any thread terminates due to a failure during execution

  8. * prior to shutdown, a new one will take its place if needed to

  9. * execute subsequent tasks. The threads in the pool will exist

  10. * until it is explicitly {@link ExecutorService#shutdown shutdown}.

  11. *

  12. * @param nThreads the number of threads in the pool

  13. * @return the newly created thread pool

  14. * @throws IllegalArgumentException if {@code nThreads <= 0}

  15. */

  16. public static ExecutorService newFixedThreadPool(int nThreads) {

  17. return new ThreadPoolExecutor(nThreads, nThreads,

  18. 0L, TimeUnit.MILLISECONDS,

  19. new LinkedBlockingQueue<Runnable>());

  20. }

其他幾種的Executors方法實現方式都大同小異,不一一列舉。

回到主題,在日常中開發如何監控執行緒池呢,這時就需要我們剛才所說的ThreadPoolExecutor類。

通過ThreadPoolExecutor實現監控

其實監控執行緒池很簡單,我們只要繼承ThreadPoolExecutor就可以得到所有我們想要的,下面程式碼是繼承後,所必須要重寫幾個建構函式過載。

  1. public class MyThreadPool extends ThreadPoolExecutor {

  2. public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {

  3. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

  4. }

  5. public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {

  6. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

  7. }

  8. public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {

  9. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);

  10. }

  11. public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

  12. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

  13. }

  14. }

其實小編個人認為,最方便的還是通過重寫execute、shutdown最為實用,可以達到我們監控或統一處理某些業務場景的實現,下面列舉一些示例僅供參考:

  1. /**

  2. * 執行執行緒任務前執行

  3. *

  4. * @param t

  5. * @param r

  6. */

  7. @Override

  8. protected void beforeExecute(Thread t, Runnable r) {

  9. super.beforeExecute(t, r);

  10. }

  11. /**

  12. * 執行執行緒時呼叫

  13. *

  14. * @param command

  15. */

  16. @Override

  17. public void execute(Runnable command) {

  18. //當前核心執行緒大小

  19. this.getCorePoolSize();

  20. //最大執行緒數大小

  21. this.getMaximumPoolSize();

  22. //當前執行緒池任務數量

  23. this.getTaskCount();

  24. //當前佇列

  25. this.getQueue();

  26. //設定核心執行緒數量

  27. this.setCorePoolSize(4);

  28. //設定最大執行緒數

  29. this.setMaximumPoolSize(4);

  30. super.execute(command);

  31. }

  32. /**

  33. * 執行執行緒任務後執行

  34. *

  35. * @param r

  36. * @param t

  37. */

  38. @Override

  39. protected void afterExecute(Runnable r, Throwable t) {

  40. super.afterExecute(r, t);

  41. }

  42. /**

  43. * 結束執行緒池時執行

  44. */

  45. @Override

  46. public void shutdown() {

  47. super.shutdown();

  48. }

通過上述程式碼可以得知,當繼承ThreadPoolExecutor之後,我們可以方便的拿到當前執行緒池的coreSIze、maxiMumSize等等。這樣我們不僅能夠實時的去監控執行緒池的狀態,同樣可以通過setCorePoolSize等方法實現動態擴容,達到我們監控的目的。

其實我們也可以實時監控記憶體佇列的大小,當達到某個預警值的時候進行報警,都可以很方便的實現。

總結:

小編本次不做深入的講解,你希望幫助大家簡單的認識下ThreadPoolExecutor的擴充套件方式,有什麼問題也希望大家及時提問,歡迎可以共同探討的同學,謝謝!

之前寫過一篇 Java 執行緒池的使用介紹文章《執行緒池全面解析》,全面介紹了什麼是執行緒池、執行緒池核心類、執行緒池工作流程、執行緒池分類、拒絕策略、及如何提交與關閉執行緒池等。

但在實際開發過程中,線上程池使用過程中可能會遇到各方面的故障,如執行緒池阻塞,無法提交新任務等。

如果你想監控某一個執行緒池的執行狀態,執行緒池執行類ThreadPoolExecutor也給出了相關的 API, 能實時獲取執行緒池的當前活動執行緒數、正在排隊中的執行緒數、已經執行完成的執行緒數、匯流排程數等。

匯流排程數 = 排隊執行緒數 + 活動執行緒數 +  執行完成的執行緒數。

下面給出一個執行緒池使用示例,及教你獲取執行緒池狀態。

privatestaticExecutorService es =newThreadPoolExecutor(50,100,0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue(100000));publicstaticvoidmain(String[] args)throwsException{for(inti =0; i <100000; i++) {        es.execute(() -> {            System.out.print(1);try{                Thread.sleep(1000);            }catch(InterruptedException e) {                e.printStackTrace();            }        });    }    ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es);while(true) {        System.out.println();intqueueSize = tpe.getQueue().size();        System.out.println("當前排隊執行緒數:"+ queueSize);intactiveCount = tpe.getActiveCount();        System.out.println("當前活動執行緒數:"+ activeCount);longcompletedTaskCount = tpe.getCompletedTaskCount();        System.out.println("執行完成執行緒數:"+ completedTaskCount);longtaskCount = tpe.getTaskCount();        System.out.println("匯流排程數:"+ taskCount);        Thread.sleep(3000);    }}

執行緒池提交了 100000 個任務,但同時只有 50 個執行緒在執行工作,我們每陋 3 秒來獲取當前執行緒池的執行狀態。

第一次程式輸出:

當前排隊執行緒數:99950

當前活動執行緒數:50

執行完成執行緒數:0

匯流排程數(排隊執行緒數 + 活動執行緒數 +  執行完成執行緒數):100000

第二次程式輸出:

當前排隊執行緒數:99800

當前活動執行緒數:50

執行完成執行緒數:150

匯流排程數(排隊執行緒數 + 活動執行緒數 +  執行完成執行緒數):100000

活動執行緒數和匯流排程數是不變的,排隊中的執行緒數和執行完成的執行緒數不斷在變化,直到所有任務執行完畢,最後輸出:

當前排隊執行緒數:0

當前活動執行緒數:0

執行完成執行緒數:100000

匯流排程數(排隊執行緒數 + 活動執行緒數 +  執行完成執行緒數):100000

這樣,你瞭解了這些 API 的使用方法,你想監控執行緒池的狀態就非常方便了。