面試必會必知:ThreadPoolExecutor 執行緒池淺析
一定要點藍字關注我!!!
號外:給讀者送福利了。每天閱讀文章後,在文章底部掃碼簽到,即可獲得福利哦
作者@陳明
作為Executor框架中最核心的類,ThreadPoolExecutor代表著鼎鼎大名的執行緒池,它給了我們足夠的理由來弄清楚它。
下面我們就通過原始碼來一步一步弄清楚它。
內部狀態
執行緒有五種狀態:新建,就緒,執行,阻塞,死亡,執行緒池同樣有五種狀態:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。
變數 ctl 定義為AtomicInteger ,其功能非常強大,記錄了“執行緒池中的任務數量”和“執行緒池的狀態”兩個資訊。共32位,其中高3位表示"執行緒池狀態",低29位表示"執行緒池中的任務數量"。
RUNNING:處於RUNNING狀態的執行緒池能夠接受新任務,以及對新新增的任務進行處理。
SHUTDOWN:處於SHUTDOWN狀態的執行緒池不可以接受新任務,但是可以對已新增的任務進行處理。
STOP:處於STOP狀態的執行緒池不接收新任務,不處理已新增的任務,並且會中斷正在處理的任務。
TIDYING:當所有的任務已終止,ctl記錄的"任務數量"為0,執行緒池會變為TIDYING狀態。當執行緒池變為TIDYING狀態時,會執行鉤子函式terminated()。terminated()在ThreadPoolExecutor類中是空的,若使用者想線上程池變為TIDYING時,進行相應的處理;可以通過過載terminated()函式來實現。
TERMINATED:執行緒池徹底終止的狀態。
各個狀態的轉換如下:
建立執行緒池
我們可以通過ThreadPoolExecutor建構函式來建立一個執行緒池:
共有七個引數,每個引數含義如下:
corePoolSize
執行緒池中核心執行緒的數量。當提交一個任務時,執行緒池會新建一個執行緒來執行任務,直到當前執行緒數等於corePoolSize。如果呼叫了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有基本執行緒。
maximumPoolSize
執行緒池中允許的最大執行緒數。執行緒池的阻塞佇列滿了之後,如果還有任務提交,如果當前的執行緒數小於maximumPoolSize,則會新建執行緒來執行任務。注意,如果使用的是無界佇列,該引數也就沒有什麼效果了。
keepAliveTime
執行緒空閒的時間。執行緒的建立和銷燬是需要代價的。執行緒執行完任務後不會立即銷燬,而是繼續存活一段時間:keepAliveTime。預設情況下,該引數只有在執行緒數大於corePoolSize時才會生效。
unit
keepAliveTime的單位。TimeUnit
workQueue
用來儲存等待執行的任務的阻塞佇列,等待的任務必須實現Runnable介面。我們可以選擇如下幾種:
-
ArrayBlockingQueue:基於陣列結構的有界阻塞佇列,FIFO。【死磕Java併發】----J.U.C之阻塞佇列:ArrayBlockingQueue
-
LinkedBlockingQueue:基於連結串列結構的有界阻塞佇列,FIFO。
-
SynchronousQueue:不儲存元素的阻塞佇列,每個插入操作都必須等待一個移出操作,反之亦然。【死磕Java併發】----J.U.C之阻塞佇列:SynchronousQueue
-
PriorityBlockingQueue:具有優先界別的阻塞佇列。【死磕Java併發】----J.U.C之阻塞佇列:PriorityBlockingQueue
threadFactory
用於設定建立執行緒的工廠。該物件可以通過Executors.defaultThreadFactory(),如下:
返回的是DefaultThreadFactory物件,原始碼如下:
ThreadFactory的左右就是提供建立執行緒的功能的執行緒工廠。他是通過newThread()方法提供建立執行緒的功能,newThread()方法建立的執行緒都是“非守護執行緒”而且“執行緒優先順序都是Thread.NORM_PRIORITY”。
handler
RejectedExecutionHandler,執行緒池的拒絕策略。所謂拒絕策略,是指將任務新增到執行緒池中時,執行緒池拒絕該任務所採取的相應策略。當向執行緒池中提交任務時,如果此時執行緒池中的執行緒已經飽和了,而且阻塞佇列也已經滿了,則執行緒池會選擇一種拒絕策略來處理該任務。
執行緒池提供了四種拒絕策略:
-
AbortPolicy:直接丟擲異常,預設策略;
-
CallerRunsPolicy:用呼叫者所在的執行緒來執行任務;
-
DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,並執行當前任務;
-
DiscardPolicy:直接丟棄任務;
當然我們也可以實現自己的拒絕策略,例如記錄日誌等等,實現RejectedExecutionHandler介面即可。
執行緒池
Executor框架提供了三種執行緒池,他們都可以通過工具類Executors來建立。
FixedThreadPool
FixedThreadPool,可重用固定執行緒數的執行緒池,其定義如下:
corePoolSize 和 maximumPoolSize都設定為建立FixedThreadPool時指定的引數nThreads,意味著當執行緒池滿時且阻塞佇列也已經滿時,如果繼續提交任務,則會直接走拒絕策略,該執行緒池不會再新建執行緒來執行任務,而是直接走拒絕策略。FixedThreadPool使用的是預設的拒絕策略,即AbortPolicy,則直接丟擲異常。
keepAliveTime設定為0L,表示空閒的執行緒會立刻終止。
workQueue則是使用LinkedBlockingQueue,但是沒有設定範圍,那麼則是最大值(Integer.MAX_VALUE),這基本就相當於一個無界隊列了。使用該“無界佇列”則會帶來哪些影響呢?當執行緒池中的執行緒數量等於corePoolSize 時,如果繼續提交任務,該任務會被新增到阻塞佇列workQueue中,當阻塞佇列也滿了之後,則執行緒池會新建執行緒執行任務直到maximumPoolSize。由於FixedThreadPool使用的是“無界佇列”LinkedBlockingQueue,那麼maximumPoolSize引數無效,同時指定的拒絕策略AbortPolicy也將無效。而且該執行緒池也不會拒絕提交的任務,如果客戶端提交任務的速度快於任務的執行,那麼keepAliveTime也是一個無效引數。
其執行圖如下(參考《Java併發程式設計的藝術》):
SingleThreadExecutor
SingleThreadExecutor是使用單個worker執行緒的Executor,定義如下:
作為單一worker執行緒的執行緒池,SingleThreadExecutor把corePool和maximumPoolSize均被設定為1,和FixedThreadPool一樣使用的是無界佇列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣。
CachedThreadPool
CachedThreadPool是一個會根據需要建立新執行緒的執行緒池 ,他定義如下:
CachedThreadPool的corePool為0,maximumPoolSize為Integer.MAX VALUE,這就意味著所有的任務一提交就會加入到阻塞佇列中。keepAliveTime這是為60L,unit設定為TimeUnit.SECONDS,意味著空閒執行緒等待新任務的最長時間為60秒,空閒執行緒超過60秒後將會被終止。阻塞佇列採用的SynchronousQueue,而我們在【死磕Java併發】----J.U.C之阻塞佇列:SynchronousQueue中瞭解到SynchronousQueue是一個沒有元素的阻塞佇列,加上corePool = 0 ,maximumPoolSize = Integer.MAX VALUE,這樣就會存在一個問題,如果主執行緒提交任務的速度遠遠大於CachedThreadPool的處理速度,則CachedThreadPool會不斷地建立新執行緒來執行任務,這樣有可能會導致系統耗盡CPU和記憶體資源,所以在 使用該執行緒池是,一定要注意控制併發的任務數,否則建立大量的執行緒可能導致嚴重的效能問題 。
任務提交
執行緒池根據業務不同的需求提供了兩種方式提交任務:Executor.execute()、ExecutorService.submit()。其中ExecutorService.submit()可以獲取該任務執行的Future。 我們以Executor.execute()為例,來看看執行緒池的任務提交經歷了那些過程。
定義:
ThreadPoolExecutor提供實現:
執行流程如下:
-
如果執行緒池當前執行緒數小於corePoolSize,則呼叫addWorker建立新執行緒執行任務,成功返回true,失敗執行步驟2。
-
如果執行緒池處於RUNNING狀態,則嘗試加入阻塞佇列,如果加入阻塞佇列成功,則嘗試進行Double Check,如果加入失敗,則執行步驟3。
-
如果執行緒池不是RUNNING狀態或者加入阻塞佇列失敗,則嘗試建立新執行緒直到maxPoolSize,如果失敗,則呼叫reject()方法執行相應的拒絕策略。
在步驟2中如果加入阻塞佇列成功了,則會進行一個Double Check的過程。Double Check過程的主要目的是判斷加入到阻塞隊裡中的執行緒是否可以被執行。如果執行緒池不是RUNNING狀態,則呼叫remove()方法從阻塞佇列中刪除該任務,然後呼叫reject()方法處理任務。否則需要確保還有執行緒執行。
addWorker當執行緒中的當前執行緒數小於corePoolSize,則呼叫addWorker()建立新執行緒執行任務,當前執行緒數則是根據 ctl 變數來獲取的,呼叫workerCountOf(ctl)獲取低29位即可:
addWorker(Runnable firstTask, boolean core)方法用於建立執行緒執行任務,原始碼如下:
-
判斷當前執行緒是否可以新增任務,如果可以則進行下一步,否則return false;
-
rs >= SHUTDOWN ,表示當前執行緒處於SHUTDOWN ,STOP、TIDYING、TERMINATED狀態
-
rs == SHUTDOWN , firstTask != null時不允許新增執行緒,因為執行緒處於SHUTDOWN 狀態,不允許新增任務
-
rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true,不允許新增執行緒,因為firstTask == null是為了新增一個沒有任務的執行緒然後再從workQueue中獲取任務的,如果workQueue == null,則說明新增的任務沒有任何意義。
-
內嵌迴圈,通過CAS worker + 1
-
獲取主鎖mailLock,如果執行緒池處於RUNNING狀態獲取處於SHUTDOWN狀態且 firstTask == null,則將任務新增到workers Queue中,然後釋放主鎖mainLock,然後啟動執行緒,然後return true,如果中途失敗導致workerStarted= false,則呼叫addWorkerFailed()方法進行處理。
在這裡需要好好理論addWorker中的引數,在execute()方法中,有三處呼叫了該方法:
-
第一次:
workerCountOf(c)<corePoolSize==>addWorker(command,true)
,這個很好理解,當然執行緒池的執行緒數量小於 corePoolSize ,則新建執行緒執行任務即可,在執行過程core == true,內部與corePoolSize比較即可。 -
第二次:加入阻塞佇列進行Double Check時,
elseif(workerCountOf(recheck)==0)==>addWorker(null,false)
。如果執行緒池中的執行緒==0,按照道理應該該任務應該新建執行緒執行任務,但是由於已經該任務已經新增到了阻塞佇列,那麼就線上程池中新建一個空執行緒,然後從阻塞佇列中取執行緒即可。 -
第三次:執行緒池不是RUNNING狀態或者加入阻塞佇列失敗:
elseif(!addWorker(command,false))
,這裡core == fase,則意味著是與maximumPoolSize比較。
在新建執行緒執行任務時,將講Runnable包裝成一個Worker,Woker為ThreadPoolExecutor的內部類
Woker內部類
Woker的原始碼如下:
從Worker的原始碼中我們可以看到Woker繼承AQS,實現Runnable介面,所以可以認為Worker既是一個可以執行的任務,也可以達到獲取鎖釋放鎖的效果。這裡繼承AQS主要是為了方便執行緒的中斷處理。這裡注意兩個地方:建構函式、run()。建構函式主要是做三件事:1.設定同步狀態state為-1,同步狀態大於0表示就已經獲取了鎖,2.設定將當前任務task設定為firstTask,3.利用Worker本身物件this和ThreadFactory建立執行緒物件。
當執行緒thread啟動(呼叫start()方法)時,其實就是執行Worker的run()方法,內部呼叫runWorker()。
runWorker
執行流程
-
根據worker獲取要執行的任務task,然後呼叫unlock()方法釋放鎖,這裡釋放鎖的主要目的在於中斷,因為在new Worker時,設定的state為-1,呼叫unlock()方法可以將state設定為0,這裡主要原因就在於interruptWorkers()方法只有在state >= 0時才會執行;
-
通過getTask()獲取執行的任務,呼叫task.run()執行,當然在執行之前會呼叫worker.lock()上鎖,執行之後呼叫worker.unlock()放鎖;
-
在任務執行前後,可以根據業務場景自定義beforeExecute() 和 afterExecute()方法,則兩個方法在ThreadPoolExecutor中是空實現;
-
如果執行緒執行完成,則會呼叫getTask()方法從阻塞佇列中獲取新任務,如果阻塞佇列為空,則根據是否超時來判斷是否需要阻塞;
-
task == null或者丟擲異常(beforeExecute()、task.run()、afterExecute()均有可能)導致worker執行緒終止,則呼叫processWorkerExit()方法處理worker退出流程。
getTask()
timed == true,呼叫poll()方法,如果在keepAliveTime時間內還沒有獲取task的話,則返回null,繼續迴圈。timed == false,則呼叫take()方法,該方法為一個阻塞方法,沒有任務時會一直阻塞掛起,直到有任務加入時對該執行緒喚醒,返回任務。
在runWorker()方法中,無論最終結果如何,都會執行processWorkerExit()方法對worker進行退出處理。
processWorkerExit()
首先completedAbruptly的值來判斷是否需要對執行緒數-1處理,如果completedAbruptly == true,說明在任務執行過程中出現了異常,那麼需要進行減1處理,否則不需要,因為減1處理在getTask()方法中處理了。然後從HashSet中移出該worker,過程需要獲取mainlock。然後呼叫tryTerminate()方法處理,該方法是對最後一個執行緒退出做終止執行緒池動作。如果執行緒池沒有終止,那麼執行緒池需要保持一定數量的執行緒,則通過addWorker(null,false)新增一個空的執行緒。
addWorkerFailed()
在addWorker()方法中,如果執行緒t==null,或者在add過程出現異常,會導致workerStarted == false,那麼在最後會呼叫addWorkerFailed()方法:
整個邏輯顯得比較簡單。
tryTerminate()
當執行緒池涉及到要移除worker時候都會呼叫tryTerminate(),該方法主要用於判斷執行緒池中的執行緒是否已經全部移除了,如果是的話則關閉執行緒池。
在關閉執行緒池的過程中,如果執行緒池處於STOP狀態或者處於SHUDOWN狀態且阻塞佇列為null,則執行緒池會呼叫interruptIdleWorkers()方法中斷所有執行緒,注意ONLY_ONE== true,表示僅中斷一個執行緒。
interruptIdleWorkers
onlyOne==true僅終止一個執行緒,否則終止所有執行緒。
執行緒終止
執行緒池ThreadPoolExecutor提供了shutdown()和shutDownNow()用於關閉執行緒池。
shutdown():按過去執行已提交任務的順序發起一個有序的關閉,但是不接受新任務。
shutdownNow() :嘗試停止所有的活動執行任務、暫停等待任務的處理,並返回等待執行的任務列表。
shutdown
shutdownNow
與shutdown不同,shutdownNow會呼叫interruptWorkers()方法中斷所有執行緒。
同時會呼叫drainQueue()方法返回等待執行到任務列表。
( 全文完 )
熱門推薦文章:

長按二維碼,掃掃關注哦
✬關注即可得 Spring / SC / 原始碼等系列乾貨✬
今日簽到打卡:
(ps:今天簽到了,記得點選明天推文簽到哦)
萬水千山總是情,點個 “好看”行不行
↓↓↓↓