1. 程式人生 > >你真的懂ThreadPoolExecutor線程池技術嗎?看了源碼你會有全新的認識

你真的懂ThreadPoolExecutor線程池技術嗎?看了源碼你會有全新的認識

線程的狀態 sso ges 並且 基本上 微信公眾 if判斷 基本使用 ble

Java是一門多線程的語言,基本上生產環境的Java項目都離不開多線程。而線程則是其中最重要的系統資源之一,如果這個資源利用得不好,很容易導致程序低效率,甚至是出問題。

有以下場景,有個電話撥打系統,有一堆需要撥打的任務要執行,首先肯定是考慮多線程異步去執行。假如我每執行一個撥打任務都new一個Thread去執行,當同時有1萬個任務需要執行的時候,那麽就會新建1萬個線程,加上線程各種初始銷毀等操作,這個消耗是巨大的。而其實往往實現這些功能的時候,並不是完全需要實時馬上完成,只是希望在可控範圍內盡量提高執行的並發性能。

因此線程池技術應用而生,Java中最常用的線程池技術就是ThreadPoolExecutor。接下來就整體看看ThreadPoolExecutor的實現。<!-- more -->

這個類的註解非常多,很多也是重點,所以就不從註解開始看起。先從使用說起,有個概念先。

基本使用

        // 核心線程
        int corePoolSize = 5;
        // 最大線程
        int maximumPoolSize = 10;
        // 線程空閑回收時間
        int keepAliveTime = 30;
        // 線程空閑回調時間單位
        TimeUnit unit = TimeUnit.SECONDS;
        // 隊列大小
        int queueSize = 20;
        // 隊列
        BlockingQueue workQueue = new ArrayBlockingQueue<Runnable>(queueSize);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        executor.execute(() -> {
            // do something 1
        });
        executor.execute(() -> {
            // do something 2
        });

定義好一些必要的參數,構建一個ThreadPoolExecutor對象。然後調用對象的execute()方法即可。
參數說明:

  • corePoolSize,線程池保留的最小線程數。如果線程池中的線程少於此數目,則在執行execut()時創建。
  • maximumPoolSize,線程池中允許擁有的最大線程數。
  • keepAliveTime、unit,當線程閑置時,保持線程存活的時間。
  • workQueue,工作隊列,存放提交的等待任務,其中有隊列大小的限制。

線程管理機制

非常多人誤解了corePoolSize、maximumPoolSize、workQueue的相互關系。不少人認為無論隊列選擇什麽,corePoolSize和maximumPoolSize一定是有用,定義一定是生效的,其實並不然啊!


看下線程基本規則註解說明
技術分享圖片

  1. 默認情況下,線程池在初始的時候,線程數為0。當接收到一個任務時,如果線程池中存活的線程數小於corePoolSize核心線程,則新建一個線程。
  2. 如果所有運行的核心線程都都在忙,超出核心線程處理的任務,執行器更多地選擇把任務放進隊列,而不是新建一個線程。
  3. 如果一個任務提交不了到隊列,在不超出最大線程數量情況下,會新建線程。超出了就會報錯。

另外,如果想在線程初始化時候就有核心線程,可以調用prestartCoreThread()或prestartAllCoreThread(),前者是初始一個,後者是初始全部。

再看看排隊策略
技術分享圖片

  • 直接提交,用SynchronousQueue。特點是不保存,直接提交給線程,如果沒沒線程,則新建一個。
  • 無限提交,用類似LinkedBlockingQueue×××隊列。特點是保存所以核心線程處理不了的任務,隊列無上限,最大線程也沒用。
  • 有限提交,用類似ArrayBlockingQueue有界隊列。特點是可以保存超過核心線程的任務,並且隊列也是有上限的。超過上限,新建線程(滿了拋錯)。更好地保護資源,防止崩潰,也是最常用的排隊策略。

從以上規則可以看出來,核心線程數和最大線程數,還有隊列結構是相互影響的,如何排隊,隊列多大,最大線程是多少都是不一定的。

再看看保持存活機制
技術分享圖片
當超過核心線程數的線程,線程池會讓該線程保持存活keepAliveTime時間,超過該時間則會銷毀該線程。
另外默認對非核心線程有效,若想核心線程也適用於這個機制,可以調用allowCoreThreadTimeOut()方法。這樣的話就沒有核心線程這一說了。

綜合以上,線程池在多次執行任務後,會一直維持部分線程存活,即使它是閑置的。這樣的目的是為了減少線程銷毀創建的開銷,下次有個任務需要執行,直接從池子裏拿線程就能用了。但核心線程不能維護太多,因為也需要一定開銷。最大的線程數保護了整個系統的穩定性,避免並發量大的時候,把線程擠滿。工作隊列則是保證了任務順序和暫存,系統的可靠性。線程存活規則的目的和維護核心線程的目的類似,但降低了它的存活的時間。

另外還有拒絕機制,它提供了一些異常情況下的解決方案。
技術分享圖片

ctl線程狀態控制

這個ctl變量是整個線程池的核心控制狀態。
技術分享圖片
這個ctl代表了兩個變量

  • workerCount,生效的線程數。基本可以理解為存活的線程,但某個時候有暫時性的差異。
  • runState,線程池的運行狀態。
    其中,ctl(int32位)的低29位代表workerCount,所以最大線程數為(2^29)-1。另外3位表示runState。

runState有以下幾種狀態:
技術分享圖片

  • RUNNING:接收新任務,處理隊列任務。
  • SHUTDOWN:不接收新任務,但處理隊列任務。
  • STOP:不接收新任務,也不處理隊列任務,並且中斷所有處理中的任務。
  • TIDYING:所有任務都被終結,有效線程為0。會觸發terminated()方法。
  • TERMINATED:當terminated()方法執行結束。

當調用了shutdown(),狀態會從RUNNING變成SHUTDOWN,不再接收新任務,此時會處理完隊列裏面的任務。
如果調用的是shutdownNow(),狀態會直接變成STOP。
當線程或者隊列都是空的時候,狀態就會變成TIDYING。
當terminated()執行完的時候,就會變成TERMINATED。

execute()

帶著對上面的規則與機制的認識,現在從就這這個入口開始看看源碼,到底整個流程是怎麽實現的。
技術分享圖片

  1. 如果少於核心線程在跑,用這個任務嘗試創建一個新線程。
  2. 如果一個任務成功入隊,再次檢查下線程池狀態看是否需要入隊,因為可能在入隊過程中,狀態發送了變化。如果確認入隊且沒有存活線程,則新建一個空線程。
  3. 如果進不了隊,則嘗試新建一個線程,如果都失敗了。拒絕這個task
    對於第二點最後為什麽新建一個線程?很容易猜想到,會有一個輪詢的機制讓下個task出隊,直接利用這個空閑線程。

註釋基本解釋了所有代碼,代碼也沒什麽特別的。其中最主要的還是addWoker()這個方法,下面來看看。

addWoker()

先了解下這個方法的整體思路
技術分享圖片
從描述可知,addwoker失敗,會在線程池狀態不對、線程滿了或者線程工廠創建線程池失敗時候發生。
這個方法比較長,分兩段看。先看第一段。
技術分享圖片
retry:這種寫法,如果比較少看源碼的,應該是前所未見的了。這是個循環的位置標記,是java的語法之一。看回代碼,這裏面for循環還嵌套裏一個for循環,而retry:是標記第一個for循環的,後面breakcontinue語句都指向到了retry。說明breakcontinue是都是操作外層的for循環。retry可以是任何變量命名合法的字符。

然後看看外出for循環的if語句
技術分享圖片
這個if判斷想要執行到return false;,隊列為空是一個必要條件。因為addWork()不單只接收新任務會調用到,處理隊列中的任務也會調用到。而前面提到SHUTDOWN狀態下還會處理隊列中的任務的,所以隊列不為空是會讓它繼續執行下去的。

對於內層的for循環
技術分享圖片
會先判斷worker的數據是否符合corePoolSize和maximumPoolSize的定義,不滿足則返回失敗。
然後嘗試CAS讓workerCount自增,如果CAS失敗還是繼續自旋去自增,直到成功。除非線程池狀態發生了變化,發退回到外層for循環重新執行,判斷線程池的狀態。

第一段的代碼,就是讓workerCount在符合條件下自增

第二段代碼
技術分享圖片
這段比較好理解,先創建一個Worker對象,這個Worker裏面包含一個由線程工廠創建的線程,和一個需要執行的任務(可以為空)。如果線程創建成功了,那麽就加一個重入鎖去把這個新建的Worker對象放到workers成員變量中,在加入之前需要重新判斷下線程池的狀態和新建線程的狀態。如果worker添加到workers成員變量中,就啟動這個新建的線程。最後如果添加失敗,則執行addWorkFailed(w)
技術分享圖片
如果失敗了,加鎖操作回滾下wokers、workerCount,然後判斷下狀態看看是否需要終結線程池。

addWorker()大概的流程就這樣。

總結

對於其他方法,沒有什麽特別的,在此不再過多的敘述,有興趣的可以翻翻源碼閱讀下。
回顧總結下上面的核心要點

  1. 當核心線程滿且忙碌時,線程池傾向於把提交的任務放進隊列,而不是新建線程。
  2. 根據選擇隊列的不同,maximumPoolSize不一定有用的。具體有三種不同的策略。
  3. ctl是線程池的核心控制狀態,包含的runState線程池運行狀態和workCount有效線程數。
  4. retry:是一種標記循環的語法,retry可以是任何變量命名合法字符。

更多技術文章、精彩幹貨,請關註
博客:zackku.com
微信公眾號:Zack說碼
技術分享圖片

你真的懂ThreadPoolExecutor線程池技術嗎?看了源碼你會有全新的認識