執行緒池沒你想的那麼簡單(續)
前言
前段時間寫過一篇《執行緒池沒你想的那麼簡單》,和大家一起擼了一個基本的執行緒池,具備:
- 執行緒池基本排程功能。
- 執行緒池自動擴容縮容。
- 佇列快取執行緒。
- 關閉執行緒池。
這些功能,最後也留下了三個待實現的 features
。
- 執行帶有返回值的執行緒。
- 異常處理怎麼辦?
- 所有任務執行完怎麼通知我?
這次就實現這三個特性來看看 j.u.c
中的執行緒池是如何實現這些需求的。
再看本文之前,強烈建議先檢視上文《執行緒池沒你想的那麼簡單》
任務完成後的通知
大家在用執行緒池的時候或多或少都會有這樣的需求:
執行緒池中的任務執行完畢後再通知主執行緒做其他事情,比如一批任務都執行完畢後再執行下一波任務等等。
以我們之前的程式碼為例:
總共往執行緒池中提交了 13 個任務,直到他們都執行完畢後再列印 “任務執行完畢” 這個日誌。
執行結果如下:
為了簡單的達到這個效果,我們可以在初始化執行緒池的時候傳入一個介面的實現,這個介面就是用於任務完成之後的回撥。
public interface Notify {
/**
* 回撥
*/
void notifyListen() ;
}
以上就是執行緒池的建構函式以及介面的定義。
所以想要實現這個功能的關鍵是在何時回撥這個介面?
仔細想想其實也簡單:只要我們記錄提交到執行緒池中的任務及完成的數量,他們兩者的差為 0 時就認為執行緒池中的任務已執行完畢;這時便可回撥這個介面。
所以在往執行緒池中寫入任務時我們需要記錄任務數量:
為了併發安全的考慮,這裡的計數器採用了原子的 AtomicInteger
。
而在任務執行完畢後就將計數器 -1 ,一旦為 0 時則任務任務全部執行完畢;這時便可回撥我們自定義的介面完成通知。
JDK 的實現
這樣的需求在 jdk 中的 ThreadPoolExecutor
中也有相關的 API
,只是用法不太一樣,但本質原理都大同小異。
我們使用 ThreadPoolExecutor
的常規關閉流程如下:
executorService.shutdown(); while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) { logger.info("thread running"); }
執行緒提交完畢後執行 shutdown()
關閉執行緒池,接著迴圈呼叫 awaitTermination()
方法,一旦任務全部執行完畢後則會返回 true
從而退出迴圈。
這兩個方法的目的和原理如下:
- 執行
shutdown()
後會將執行緒池的狀態置為關閉狀態,這時將會停止接收新的任務同時會等待佇列中的任務全部執行完畢後才真正關閉執行緒池。 awaitTermination
會阻塞直到執行緒池所有任務執行完畢或者超時時間已到。
為什麼要兩個 api
結合一起使用呢?
主要還在最終的目的是:所有執行緒執行完畢後再做某件事情,也就是線上程執行完畢之前其實主執行緒是需要被阻塞的。
shutdown()
執行後並不會阻塞,會立即返回,所有才需要後續用迴圈不停的呼叫 awaitTermination()
,因為這個 api 才會阻塞執行緒。
其實我們檢視原始碼會發現,ThreadPoolExecutor
中的阻塞依然也是等待通知機制的運用,只不過用的是 LockSupport
的 API
而已。
帶有返回值的執行緒
接下來是帶有返回值的執行緒,這個需求也非常常見;比如需要執行緒非同步計算某些資料然後得到結果最終彙總使用。
先來看看如何使用(和 jdk 的類似):
首先任務是不能實現 Runnable
介面了,畢竟他的 run()
函式是沒有返回值的;所以我們改實現一個 Callable
的介面:
這個介面有一個返回值。
同時在提交任務時也稍作改動:
首先是執行任務的函式由 execute()
換為了 submit()
,同時他會返回一個返回值 Future
,通過它便可拿到執行緒執行的結果。
最後通過第二步將所有執行結果打印出來:
實現原理
再看具體實現之前先來思考下這樣的功能如何實現?
- 首先受限於
jdk
的執行緒api
的規範,要執行一個執行緒不管是實現介面還是繼承類,最終都是執行的run()
函式。 - 所以我們想要一個執行緒有返回值無非只能是在執行
run()
函式時去呼叫一個有返回值的方法,再將這個返回值存放起來用於後續使用。
比如我們這裡新建了一個 Callable<T>
的介面:
public interface Callable<T> {
/**
* 執行任務
* @return 執行結果
*/
T call() ;
}
它的 call
函式就是剛才提到的有返回值的方法,所以我們應當在執行緒的 run() 函式中去呼叫它。
接著還會有一個 Future
的介面,他的主要作用是獲取執行緒的返回值,也就是 再將這個返回值存放起來用於後續使用
這裡提到的後續使用。
既然有了介面那自然就得有它的實現 FutureTask
,它實現了 Future
介面用於後續獲取返回值。
同時實現了 Runnable
介面會把自己變為一個執行緒。
所以在它的 run()
函式中會呼叫剛才提到的具有返回值的 call()
函式。
再次結合 submit()
提交任務和 get()
獲取返回值的原始碼來看會更加理解這其中的門道。
/**
* 有返回值
*
* @param callable
* @param <T>
* @return
*/
public <T> Future<T> submit(Callable<T> callable) {
FutureTask<T> future = new FutureTask(callable);
execute(future);
return future;
}
submit()
非常簡單,將我們丟進來的 Callable
物件轉換為一個 FutureTask
物件,然後再呼叫之前的 execute()
來丟進執行緒池(後續的流程就和一個普通的執行緒進入執行緒池的流程一樣)。
FutureTask 本身也是執行緒,所以可以直接使用
execute()
函式。
而 future.get()
函式中 future
物件由於在 submit()
中返回的真正物件是 FutureTask
,所以我們直接看其中的原始碼就好。
由於 get()
線上程沒有返回之前是一個阻塞函式,最終也是通過 notify.wait()
使執行緒進入阻塞狀態來實現的。
而使其從 wait()
中返回的條件必然是線上程執行完畢拿到返回值的時候才進行喚醒。
也就是圖中的第二部分;一旦執行緒執行完畢(callable.call()
)就會喚醒 notify
物件,這樣 get
方法也就能返回了。
同樣的道理,ThreadPoolExecutor
中的原理也是類似,只不過它考慮的細節更多所以看起來很複雜,但精簡程式碼後核心也就是這些。
甚至最終使用的 api 看起來都是類似的:
異常處理
最後一個是一些新手使用執行緒池很容易踩坑的一個地方:那就是異常處理。
比如類似於這樣的場景:
建立了只有一個執行緒的執行緒池,這個執行緒只做一件事,就是一直不停的 while 迴圈。
但是迴圈的過程中不小心丟擲了一個異常,巧的是這個異常又沒有被捕獲。你覺得後續會發生什麼事情呢?
是執行緒繼續執行?還是執行緒池會退出?
通過現象來看其實哪種都不是,執行緒既沒有繼續運行同時執行緒池也沒有退出,會一直卡在這裡。
當我們 dump
執行緒快照會發現:
這時執行緒池中還有一個執行緒在執行,通過執行緒名稱會發現這是新建立的一個執行緒(之前是Thread-0
,現在是 Thread-1
)。
它的執行緒狀態為 WAITING
,通過堆疊發現是卡在了 CustomThreadPool.java:272
處。
就是卡在了從佇列裡獲取任務的地方,由於此時的任務佇列是空的,所以他會一直阻塞在這裡。
看到這裡,之前關注的朋友有沒有似曾相識的感覺。
沒錯,我之前寫過兩篇:
- 一個執行緒罷工的詭異事件
- 執行緒池中你不容錯過的一些細節
執行緒池相關的問題,當時的討論也非常“激烈”
,其實最終的原因和這裡是一模一樣的。
所以就這次簡版的程式碼來看看其中的問題:
現在又簡化了一版程式碼我覺得之前還有疑問的朋友這次應該會更加明白。
其實線上程池內部會對執行緒的執行捕獲異常,但它並不會處理,只是用於標記是否執行成功;
一旦執行失敗則會回收掉當前異常的執行緒,然後重新建立一個新的 Worker
執行緒繼續從佇列裡取任務然後執行。
所以最終才會卡在從佇列中取任務
處。
其實 ThreadPoolExecutor
的異常處理也是類似的,具體的原始碼就不多分析了,在上面兩篇文章中已經說過幾次。
所以我們在使用執行緒池時,其中的任務一定要做好異常處理。
總結
這一波下來我覺得執行緒池搞清楚沒啥問題了,總的來看它內部運用了非常多的多執行緒解決方案,比如:
ReentrantLock
重入鎖來保證執行緒寫入的併發安全。- 利用等待通知機制來實現執行緒間通訊(執行緒執行結果、等待執行緒池執行完畢等)。
最後也學會了:
- 標準的執行緒池關閉流程。
- 如何使用有返回值的執行緒。
- 執行緒異常捕獲的重要性。
最後本文所有原始碼(結合其中的測試程式碼使用):
https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/CustomThreadPool.java
你的點贊與分享是對我最大的支援
相關推薦
執行緒池沒你想的那麼簡單
前言 原以為執行緒池還挺簡單的(平時常用,也分析過原理),這次是想自己動手寫一個執行緒池來更加深入的瞭解它;但在動手寫的過程中落地到細節時發現並沒想的那麼容易。結合原始碼對比後確實不得不佩服 Doug Lea 。 我覺得大部分人直接去看 java.util.concurrent.ThreadPoolExe
執行緒池沒你想的那麼簡單(續)
前言 前段時間寫過一篇《執行緒池沒你想的那麼簡單》,和大家一起擼了一個基本的執行緒池,具備: 執行緒池基本排程功能。 執行緒池自動擴容縮容。 佇列快取執行緒。 關閉執行緒池。 這些功能,最後也留下了三個待實現的 features 。 執行帶有返回值的執行緒。 異常處理怎麼辦? 所有任務執行完怎麼通知
【Java多執行緒】執行緒池的工作原理詳解(下)
接著上篇文章,我接下來繼續介紹執行緒池的工作原理,如果你還沒有看上篇,我建議最好瀏覽一下:執行緒池的工作原理詳解(上) Executors 工具類 1.定義 Executors是java執行緒池的工廠類,通過它可以快速初始化一個符合業務需求的執行緒池。
【小家java】Java中的執行緒池,你真的用對了嗎?(教你用正確的姿勢使用執行緒池)
相關閱讀 【小家java】java5新特性(簡述十大新特性) 重要一躍 【小家java】java6新特性(簡述十大新特性) 雞肋升級 【小家java】java7新特性(簡述八大新特性) 不溫不火 【小家java】java8新特性(簡述十大新特性) 飽受讚譽 【小家java】java9
Java中執行緒池,你真的會用嗎
轉載自 Java中執行緒池,你真的會用嗎 在《深入原始碼分析Java執行緒池的實現原理》這篇文章中,我們介紹過了Java中執行緒池的常見用法以及基本原理。 在文中有這樣一段描述: 可以通過Executors靜態工廠構建執行緒池,但一般不建議這樣使用。 關於這個
Java中執行緒池,你真的瞭解會用嗎
在《 深入原始碼分析Java執行緒池的實現原理 》這篇文章中,我們介紹過了Java中執行緒池的常見用法以及基本原理。 在文中有這樣一段描述: 可以通過Executors靜態工廠構建執行緒池,但一般不建議這樣使用。 關於這個問題,在那篇文章中並沒有深入的展開。作者之所以這
Java中執行緒池,你真的會用嗎?
我騎著小毛驢,喝著大紅牛哇,哩個啷格里格朗,別問我為什麼這木開心,如果活著不是為了浪蕩那將毫無意義 今天來捋一捋我們平日經常用的instanceof和typeof的一些小問題 typeof: typeof裡面是由一個小坑的 我們今天著重來研
執行緒池submit和execute方法區別(類似callable和runnable)
執行緒池中的execute方法大家都不陌生,即開啟執行緒執行池中的任務。還有一個方法submit也可以做到,它的功能是提交指定的任務去執行並且返回Future物件,即執行的結果。下面簡要介紹一下兩者的三個區別: 1、接收的引數不一樣 2、submit有返回值,而execute沒有 用到返
從零開始學產品第八篇:登入註冊沒那麼簡單(上)
登入註冊應該是最常見的功能了。 上節課我們講了為什麼要從常見的功能模組來入手,這一節課,直接選最最最最常見的模組,登入和註冊。 1註冊 2登入 3修改密碼 4忘記密碼 這應該算是四個小的功能點,先從註冊開始說起。 一 註冊
執行緒池ExecutorService 中併發數的(引入訊號量Semaphore)控制執行
檢視本機處理器的核心數程式碼:Runtime.getRuntime().availableProcessors() 所以,應用程式的最小執行緒數應該等於可用的處理器核數。如果所有的任務都是計算密集型的,則建立處理器可用的核心數那麼多執行緒就可以了。在這種情況下
Java-執行緒池專題(什麼是執行緒池,如何使用,為什麼要用)
1、什麼是執行緒池: java.util.concurrent.Executors提供了一個 java.util.concurrent.Executor介面的實現用於建立執行緒池 多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增
網絡爬蟲技術Jsoup——爬到一切你想要的(轉)
append nload ntp 信任 can 網絡爬蟲 ets bst contain 轉自:http://blog.csdn.net/ccg_201216323/article/details/53576654 本文由我的微信公眾號(bruce常)原創首發,
Java多執行緒學習---Condition和wait、notify(十三)
1.問題:實現兩個執行緒交叉執行(Condition和wait、notify都可以實現) public class ConditionStudy { public static void main(String[] args) { //執行緒程式碼 BussinessTes
多執行緒呼叫系統COM元件的體會(CoInitialize)
多執行緒呼叫COM元件的體會(CoInitialize) 呼叫任何COM元件之前,你必須首先初始化COM套件環境,即呼叫CoInitialize或CoInitializeEx。COM套件環境線上程的生存週期內有效,執行緒退出前需要呼叫CoUninitializ
PyQt訊號與槽之多執行緒中訊號與槽的使用(六)
簡單多執行緒訊號與槽的使用 最簡單的多執行緒使用方法是利用QThread函式,展示QThread函式和訊號簡單結合的方法 import sys from PyQt5.QtCore import
多執行緒與高併發程式設計進階(一)
前言: 使用多執行緒的目的: 充分利用CPU資源,提高程式執行速度 使用多執行緒面臨的挑戰: 上下文切換、死鎖、計算機軟硬體資源的限制等問題 結論: 不是一味地開啟執行緒就能夠讓程式最大限度地併發執行,以及提升執行速度,想利用多執行緒提升程式執行速度需要結合實際
iOS中執行緒Call Stack的捕獲和解析(二)
1. 部分參考資料 做這一塊時也是查閱了很多連結和書籍,包括但不限於: 以及很多Google Search。 2. 相關API和資料結構 由於我們在上面回溯執行緒呼叫棧拿到的是一組地址,所以這裡進行符號化的輸入輸出應該分別是地址和符號,介面設
執行緒進階:多工處理(17)——Java中的鎖(Unsafe基礎)
1. 概述 本專題在之前的文章中詳細介紹了Java中最常使用的一種鎖機制——同步鎖。但是同步鎖肯定是不適合在所有應用場景中使用的。所以從本文開始,筆者將試圖通過兩到三篇文章的篇幅向讀者介紹Java中鎖的分類、原理和底層實現。以便大家在實際工作中根據應用場景進行
Java多執行緒-生產者消費者例子-使用阻塞佇列(BlockingQueue)實現
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * Created by wisgood . */ public
iOS 多執行緒在專案中的應用場景(一)
獲取線上版本號是一件很耗時的操作,所以開闢一個子執行緒,程式碼如下 //檢測新版本 //說明:開闢子執行緒執行耗時程式碼塊,然後在主執行緒中重新整理和顯示 dispatch_async(dispatch_get_global_queue(0, 0