1. 程式人生 > >製造一個輪子執行緒池

製造一個輪子執行緒池

很早之前就看過執行緒池原始碼(知道大概的執行原理),但是隻是知道怎麼用,並沒有深究。這次為了幫助自己深入理解執行緒池,決定手動寫一個極簡(陋)的執行緒池,順便記錄思考和造輪過程。

雖然不太可能和jdk自帶的那麼完美,但是該有的功能還是要有:

  • 新建執行緒池,有核心執行緒數和最大執行緒數,執行緒存活時間,佇列
  • 線上程池加入執行緒,當前執行緒數不超過核心執行緒數就新建執行緒,超過核心放佇列,佇列滿了再新建執行緒,達到最大執行緒
  • 全部執行緒執行完成後會保留核心執行緒數,支援執行緒存活時間
  • 立即關閉執行緒池
  • 優雅的關閉執行緒池
1.新建一個輪子執行緒池類,就一個建構函式,把需要的引數都傳進來

2.用ThreadPoolExecutor的時候,新建了執行緒池就會往裡面提交執行緒了,我們寫的也一樣,而且往執行緒池裡面加執行緒的時候就會判斷:當前執行執行緒數是否大於核心執行緒數,是否大於最大執行緒數等,這裡需要一個當前執行執行緒數的變數。

所以這裡增加一個成員變數activeCount,初始值為0,執行一個執行緒就加1,執行緒執行結束就減1,這裡面減的時候是在不同執行緒裡面,所以為了執行緒安全用AtomicInteger型別

    /** 當前活動執行緒數 */
    private AtomicInteger activeCount = new AtomicInteger(0);

提交執行緒的方法,看過ThreadPoolExecutor原始碼的應該知道,裡面每個執行緒都是包裝了一個Worker類,新增執行緒的時候就會新建一個worker,為什麼要這麼做呢?
我第一次看的時候也不理解,想著如果直接把傳進來的執行緒start一下怎麼樣,如果直接新建執行緒會立馬就發現問題,怎麼知道執行緒什麼執行完,怎麼把activeCount減1?
所以這裡不能直接start,必須新建一個執行緒(非同步執行,廢話),這個執行緒必須要執行引數的執行緒run方法(廢話,不然引數還有啥用,我們的業務邏輯咋執行),執行緒執行完成後activeCount減1。

所以這裡有了worker類,worker本身也是一個執行緒。順便把執行緒名字也解決了,新建一個threadNum從0開始,新建一個worker就+1

提交執行緒

3.現在已經有點樣子了,但是問題還很多,主要的一個問題:比如我建一個core=2,max=5,queue=5的執行緒池,假設往裡面放了8個執行緒,會出現只運行了3個執行緒,跑完也就結束了,剩下的5個會在佇列裡面沒處理,而且也不會保留2個核心執行緒

WheelThreadPool2的執行結果

現在想想怎麼把佇列的執行緒拿出來執行,沒看ThreadPoolExecutor原始碼前我第一次想著是不是在建立執行緒池的時候預設啟動一個執行緒去佇列裡面獲取並執行,然而一想立馬否定了,因為執行緒池是多執行緒執行的,佇列裡面的執行緒需要max(引數最大執行緒數)個執行緒同時執行。
所以在我們新建的worker裡面要能不斷的迴圈獲取佇列的執行緒去執行,如果佇列為空了,則退出迴圈,讓執行緒結束

改造一下worker的run方法,在execute方法建立的worker執行緒執行完通過引數傳進來的runnable之後,迴圈獲取佇列並執行佇列執行緒的run方法

這樣還有點問題,如果try裡面出現異常,比如runnable.run異常或者r.run異常,這個執行緒就退出了,不能保持max個執行緒並行執行

所以如果異常了需要重新建立一個執行緒繼續跑迴圈,改造後

這樣改造後如果佇列空了會把所有執行緒都結束掉,所以現在要解決執行完佇列後保留core個執行緒的問題,怎麼保留執行緒其實是通過阻塞佇列實現的,

當佇列為空時,通過queue.take()方法阻塞住當前執行緒,直到又有執行緒提交。如果當前活動執行緒超出core,結束當前執行緒

這樣改造後大概輪廓出來了,因為queue是阻塞佇列,而且各個方法都加了執行緒鎖,所以本身也是執行緒安全的,這部分程式碼貌似不需要加鎖,跑個測試用例試下,貌似很合理

WheelThreadPoolTest3結果

先放著,再看看下個功能,要支援執行緒存活時間,這個存活時間的意思是:比如上面WheelThreadPoolTest3裡面的執行緒池運行了10個執行緒,跑完之後剩餘2個執行緒,3個消亡了(完成任務了)。
後面再提交10個執行緒,又新建3個執行緒(從控制檯的執行緒名字可以看出),如果我們設定一個存活時間,讓第一批的10個跑完後的那3個執行緒不消亡,比如存活5秒,第二批的10個跑的時候就可以複用,不需要重新建立執行緒。
因為執行緒是稀缺資源,能複用就複用,新建執行緒也影響效率

目前的程式碼執行緒消亡的標記是因為queue.poll獲取到了null,導致迴圈退出,執行緒完成。而阻塞佇列的poll方法還有一個多型方法E poll(long timeout, TimeUnit unit),可以在一定時間內poll,在時間內獲取到了就會返回,這個時間剛好用於是執行緒的存活時間(死亡倒計時??)。

構造方法已經傳了存活時間和單位,直接加上這兩個引數

再來測試下,存活時間設定為5秒,那樣第二批只能提交5個執行緒,否則會導致執行緒池慢

結果

發生一個大問題,最後執行緒都沒了,而且主執行緒也退出了

原因:第一批的10個執行緒執行完後因為執行緒存活5秒,所以都保留了

堆疊打印出來也證實了,都在poll裡面阻塞,然後第二批5個執行緒已提交,這存活的5個執行緒就會立馬開始執行,執行完後再次阻塞再poll,等過了存活時間,執行緒全部結束!

在第二批執行結束後再次列印堆疊,結果果然是這樣

問題知道了,解決方法


這裡的lock是一個執行緒鎖,防止多個執行緒同時判斷(同時判斷了寫這個還有意義麼。。)
/** 執行緒鎖 */ private Lock lock = new ReentrantLock();

至此,寫好了主要部分,測試下

不過有個地方我自己程式碼走讀,感覺是有問題

感覺這裡會存線上程安全問題,假設執行緒池佇列為空,當前activeCount大於core,併發情況下,多個執行緒同時滿足activeCount.get()>coreCount,之後所有執行緒都會走queue.poll分支,因為佇列為空,所有執行緒queue.poll返回為null,所有執行緒全部結束掉,這樣和保留core個執行緒衝突了。
糾結許久後來發現這個假設不成立,要在沒有進入while之前就出現佇列為空,且activeCount>core,這種情況不會出現,因為在提交執行緒的方法(execute)已經限制了這種情況,但是這個程式碼看起來會有歧義,還是決定改造改造

這回走讀一次,感覺好多了,終於把主要功能寫完了,也能正常跑了

4.立即關閉執行緒池

執行緒池裡面的執行緒跑完了,但是還有core個執行緒阻塞著,這麼一直阻塞著也不是辦法,所以要有個關閉的方法,先寫暴力關閉,當前執行的執行緒中斷,佇列拋棄

思考時間:中斷執行緒肯定是呼叫Thread.interrupt方法,這樣我得拿到正在執行得執行緒才行,所以在新增執行緒的時候得儲存在一個集合裡,而且執行緒執行得時候異常了,也會新增執行緒,所以這個儲存集合要執行緒安全,而且存取速度要快
這裡需要一個執行緒安全得set集合,ConcurrentHashMap裡面有個newKeySet方法,看了下原始碼是通過ConcurrentHashMap的key來的,是執行緒安全的,直接用

   /** 儲存正在執行的執行緒 */
   private Set<Worker> workers = ConcurrentHashMap.newKeySet();

還需要一個狀態標識當前執行緒池是否關閉,這個狀態要線上程併發(獲取佇列執行緒)情況下可以判斷,所以用volatile修飾,預設正在執行

   /** 執行緒池狀態,-1:正在執行,0:暴力關閉,1:優雅關閉 */ 
   private volatile int status = -1;

新建暴力關閉方法stopNow

在新建worker的時候都加到workers裡面去,這時候一想,activeCount和workers.size是不是重複了,順帶把activeCount刪掉,用workers.size代替,執行緒執行完成workers.remove掉

新增方法addWorker

提交執行緒方法改造,刪掉了activeCount,用workers.size代替

worker的run方法改造,刪掉了activeCount,用workers.size代替,activeCount-1用workers.remove(this)代替

改造完了,檢視一下,暴力關閉需要立即中斷執行緒,拋棄佇列,所以在while獲取佇列那裡要增加判斷

提交執行緒方法增加狀態判斷

改造完成,測試下暴力關閉

結果,整個程式也退出了,執行緒池結束了,佇列只運行了一個執行緒

5.優雅關閉執行緒池

優雅的關閉執行緒池,是要讓所有的執行緒和佇列都執行完畢再關閉所有執行緒,這樣就不能直接interrupt執行緒了,先設定status=1未優雅關閉

新增優雅關閉方法

提交執行緒方法增加狀態,限制提交

這裡呼叫stop方法時分情況:

  • 1.存線上程還在執行(佇列或者當前執行緒),執行完成後呼叫workers所有執行緒的interrupt,防止存線上程在queue.take處阻塞
  • 2.不存線上程還在執行(佇列或者當前執行緒),呼叫workers所有執行緒的interrupt,防止存線上程在queue.take處阻塞

所以這裡需要一個標記,是否還存線上程在執行,我們可以用一個數字標識當前還需要執行的執行緒數量,執行完一個執行緒就-1

增加成員變數remainingCount,標識剩餘執行緒數
/** 剩餘執行緒數 */ private AtomicInteger remainingCount = new AtomicInteger(0);
每次提交一個執行緒+1

每次執行完一個執行緒-1

第一種情況,存線上程還在執行,在執行完成後判斷remainingCount是否為0

第二種情況,不存線上程還在執行,在stop的時候增加判斷

抽象封裝下方法


順便把worker的run方法也優化下,一個螢幕都截不下了


跑個第一種情況的測試用例

結果出乎意料,居然沒有結束所有執行緒

加日誌除錯

出現了更誇張的錯誤

根據控制檯資訊可以想象,原本的5個執行緒全部被interrupt,又不斷地建立執行緒,又不斷的被interrupt。
這裡會建立執行緒的地方只有在worker的run方法異常,finally程式碼段裡面,而且沒加日誌的時候沒有出現這種情況,加了日誌就出現了。
多次程式碼走讀後,發現一種可能,在最初5個執行緒同時將佇列消耗完後,2個執行緒進入take阻塞,3個執行緒開始進入interruptWorkers方法,導致那2個執行緒出現異常,異常後會退出執行緒,再次建立新執行緒,並且interrupt新執行緒,由此陷入死迴圈

改造getQueueTask方法,不丟擲異常,出現異常返回null。順便走讀一下status=0的情況,發現不影響

再次執行測試用例,結果符合預期了,這裡的異常堆疊可以忽略

再測試優雅關閉的第二種情況

結果正常

去掉除錯日誌,至此,這個輪子執行緒池完成,具備執行緒池基礎功能

總結

寫這個執行緒池過程曲折,各種問題不斷出現,特別時兩種關閉方法,判斷比較煩,程式碼走讀和除錯良久,才堪堪解決,由此聯想ThreadPoolExecutor是多麼強大,多麼不簡單

圖片較多,程式碼可以在Github上找到

參考資料:ThreadPoolExecutor原始碼

感謝crossoverjie的文章:https://crossoverjie.top/2019/05/20/concurrent/threadpool-01/

本文來自chentiefeng的部落格