1. 程式人生 > >你真的懂ThreadPoolExecutor執行緒池技術嗎?看了原始碼你會有全新的認識

你真的懂ThreadPoolExecutor執行緒池技術嗎?看了原始碼你會有全新的認識

Java是一門多執行緒的語言,基本上生產環境的Java專案都離不開多執行緒。而執行緒則是其中最重要的系統資源之一,如果這個資源利用得不好,很容易導致程式低效率,甚至是出問題。

有以下場景,有個電話撥打系統,有一堆需要撥打的任務要執行,首先肯定是考慮多執行緒非同步去執行。假如我每執行一個撥打任務都new一個Thread去執行,當同時有1萬個任務需要執行的時候,那麼就會新建1萬個執行緒,加上執行緒各種初始銷燬等操作,這個消耗是巨大的。而其實往往實現這些功能的時候,並不是完全需要實時馬上完成,只是希望在可控範圍內儘量提高執行的併發效能。

因此執行緒池技術應用而生,Java中最常用的執行緒池技術就是ThreadPoolExecutor。接下來就整體看看ThreadPoolExecutor的實現。
這個類的註解非常多,很多也是重點,所以就不從註解開始看起。先從使用說起,有個概念先。

基本使用

        // 核心執行緒
        int corePoolSize = 5;
        // 最大執行緒
        int maximumPoolSize = 10;
        // 執行緒空閒回收時間
        int keepAliveTime = 30;
        // 執行緒空閒回撥時間單位
        TimeUnit unit = TimeUnit.SECONDS;
        // 佇列大小
        int queueSize = 20;
        // 佇列
        BlockingQueue workQueue = new ArrayBlockingQueue<Runnable>(queueSize);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        executor.execute(() -> {
            // do something 1
        });
        executor.execute(() -> {
            // do something 2
        });

定義好一些必要的引數,構建一個ThreadPoolExecutor物件。然後呼叫物件的execute()方法即可。
引數說明:

  • corePoolSize,執行緒池保留的最小執行緒數。如果執行緒池中的執行緒少於此數目,則在執行execut()時建立。
  • maximumPoolSize,執行緒池中允許擁有的最大執行緒數。
  • keepAliveTime、unit,當執行緒閒置時,保持執行緒存活的時間。
  • workQueue,工作佇列,存放提交的等待任務,其中有佇列大小的限制。

執行緒管理機制

非常多人誤解了corePoolSize、maximumPoolSize、workQueue的相互關係。不少人認為無論佇列選擇什麼,corePoolSize和maximumPoolSize一定是有用,定義一定是生效的,其實並不然啊!


看下執行緒基本規則註解說明

  1. 預設情況下,執行緒池在初始的時候,執行緒數為0。當接收到一個任務時,如果執行緒池中存活的執行緒數小於corePoolSize核心執行緒,則新建一個執行緒。
  2. 如果所有執行的核心執行緒都都在忙,超出核心執行緒處理的任務,執行器更多地選擇把任務放進佇列,而不是新建一個執行緒。
  3. 如果一個任務提交不了到佇列,在不超出最大執行緒數量情況下,會新建執行緒。超出了就會報錯。

另外,如果想線上程初始化時候就有核心執行緒,可以呼叫prestartCoreThread()或prestartAllCoreThread(),前者是初始一個,後者是初始全部。

再看看排隊策略

  • 直接提交,用SynchronousQueue。特點是不儲存,直接提交給執行緒,如果沒沒執行緒,則新建一個。
  • 無限提交,用類似LinkedBlockingQueue無界佇列。特點是儲存所以核心執行緒處理不了的任務,佇列無上限,最大執行緒也沒用。
  • 有限提交,用類似ArrayBlockingQueue有界佇列。特點是可以儲存超過核心執行緒的任務,並且佇列也是有上限的。超過上限,新建執行緒(滿了拋錯)。更好地保護資源,防止崩潰,也是最常用的排隊策略。

從以上規則可以看出來,核心執行緒數和最大執行緒數,還有佇列結構是相互影響的,如何排隊,佇列多大,最大執行緒是多少都是不一定的。

再看看保持存活機制

當超過核心執行緒數的執行緒,執行緒池會讓該執行緒保持存活keepAliveTime時間,超過該時間則會銷燬該執行緒。
另外預設對非核心執行緒有效,若想核心執行緒也適用於這個機制,可以呼叫allowCoreThreadTimeOut()方法。這樣的話就沒有核心執行緒這一說了。

綜合以上,執行緒池在多次執行任務後,會一直維持部分執行緒存活,即使它是閒置的。這樣的目的是為了減少執行緒銷燬建立的開銷,下次有個任務需要執行,直接從池子裡拿執行緒就能用了。但核心執行緒不能維護太多,因為也需要一定開銷。最大的執行緒數保護了整個系統的穩定性,避免併發量大的時候,把執行緒擠滿。工作佇列則是保證了任務順序和暫存,系統的可靠性。執行緒存活規則的目的和維護核心執行緒的目的類似,但降低了它的存活的時間。

另外還有拒絕機制,它提供了一些異常情況下的解決方案。

ctl執行緒狀態控制

這個ctl變數是整個執行緒池的核心控制狀態。

這個ctl代表了兩個變數

  • workerCount,生效的執行緒數。基本可以理解為存活的執行緒,但某個時候有暫時性的差異。
  • runState,執行緒池的執行狀態。
    其中,ctl(int32位)的低29位代表workerCount,所以最大執行緒數為(2^29)-1。另外3位表示runState。

runState有以下幾種狀態:

  • RUNNING:接收新任務,處理佇列任務。
  • SHUTDOWN:不接收新任務,但處理佇列任務。
  • STOP:不接收新任務,也不處理佇列任務,並且中斷所有處理中的任務。
  • TIDYING:所有任務都被終結,有效執行緒為0。會觸發terminated()方法。
  • TERMINATED:當terminated()方法執行結束。

當呼叫了shutdown(),狀態會從RUNNING變成SHUTDOWN,不再接收新任務,此時會處理完佇列裡面的任務。
如果呼叫的是shutdownNow(),狀態會直接變成STOP。
當執行緒或者佇列都是空的時候,狀態就會變成TIDYING。
當terminated()執行完的時候,就會變成TERMINATED。

execute()

帶著對上面的規則與機制的認識,現在從就這這個入口開始看看原始碼,到底整個流程是怎麼實現的。

  1. 如果少於核心執行緒在跑,用這個任務嘗試建立一個新執行緒。
  2. 如果一個任務成功入隊,再次檢查下執行緒池狀態看是否需要入隊,因為可能在入隊過程中,狀態傳送了變化。如果確認入隊且沒有存活執行緒,則新建一個空執行緒。
  3. 如果進不了隊,則嘗試新建一個執行緒,如果都失敗了。拒絕這個task
    對於第二點最後為什麼新建一個執行緒?很容易猜想到,會有一個輪詢的機制讓下個task出隊,直接利用這個空閒執行緒。

註釋基本解釋了所有程式碼,程式碼也沒什麼特別的。其中最主要的還是addWoker()這個方法,下面來看看。

addWoker()

先了解下這個方法的整體思路

從描述可知,addwoker失敗,會線上程池狀態不對、執行緒滿了或者執行緒工廠建立執行緒池失敗時候發生。
這個方法比較長,分兩段看。先看第一段。

retry:這種寫法,如果比較少看原始碼的,應該是前所未見的了。這是個迴圈的位置標記,是java的語法之一。看回程式碼,這裡面for迴圈還巢狀裡一個for迴圈,而retry:是標記第一個for迴圈的,後面breakcontinue語句都指向到了retry。說明breakcontinue是都是操作外層的for迴圈。retry可以是任何變數命名合法的字元。

然後看看外出for迴圈的if語句

這個if判斷想要執行到return false;,佇列為空是一個必要條件。因為addWork()不單隻接收新任務會呼叫到,處理佇列中的任務也會呼叫到。而前面提到SHUTDOWN狀態下還會處理佇列中的任務的,所以佇列不為空是會讓它繼續執行下去的。

對於內層的for迴圈

會先判斷worker的資料是否符合corePoolSize和maximumPoolSize的定義,不滿足則返回失敗。
然後嘗試CAS讓workerCount自增,如果CAS失敗還是繼續自旋去自增,直到成功。除非執行緒池狀態發生了變化,發退回到外層for迴圈重新執行,判斷執行緒池的狀態。

第一段的程式碼,就是讓workerCount在符合條件下自增

第二段程式碼

這段比較好理解,先建立一個Worker物件,這個Worker裡面包含一個由執行緒工廠建立的執行緒,和一個需要執行的任務(可以為空)。如果執行緒建立成功了,那麼就加一個重入鎖去把這個新建的Worker物件放到workers成員變數中,在加入之前需要重新判斷下執行緒池的狀態和新建執行緒的狀態。如果worker新增到workers成員變數中,就啟動這個新建的執行緒。最後如果新增失敗,則執行addWorkFailed(w)

如果失敗了,加鎖操作回滾下wokers、workerCount,然後判斷下狀態看看是否需要終結執行緒池。

addWorker()大概的流程就這樣。

總結

對於其他方法,沒有什麼特別的,在此不再過多的敘述,有興趣的可以翻翻原始碼閱讀下。
回顧總結下上面的核心要點

  1. 當核心執行緒滿且忙碌時,執行緒池傾向於把提交的任務放進佇列,而不是新建執行緒。
  2. 根據選擇佇列的不同,maximumPoolSize不一定有用的。具體有三種不同的策略。
  3. ctl是執行緒池的核心控制狀態,包含的runState執行緒池執行狀態和workCount有效執行緒數。
  4. retry:是一種標記迴圈的語法,retry可以是任何變數命名合法字元。

更多技術文章、精彩乾貨,請關注
部落格:zackku.com
微信公眾號:Zack說碼