1. 程式人生 > >深入分析java線程池的實現原理

深入分析java線程池的實現原理

51cto 產生 read 記錄 epo 內部實現 9.png 方法 單位

前言

線程是稀缺資源,如果被無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,合理的使用線程池對線程進行統一分配、調優和監控,有以下好處:
1、降低資源消耗;
2、提高響應速度;
3、提高線程的可管理性。

Java1.5中引入的Executor框架把任務的提交和執行進行解耦,只需要定義好任務,然後提交給線程池,而不用關心該任務是如何執行、被哪個線程執行,以及什麽時候執行。

如果你也想在IT行業拿高薪,可以參加我們的訓練營課程,選擇最適合自己的課程學習,技術大牛親授,7個月後,進入名企拿高薪。我們的課程內容有:Java工程化、高性能及分布式、高性能、深入淺出。高架構。性能調優、Spring,MyBatis,Netty源碼分析和大數據等多個知識點。如果你想拿高薪的,想學習的,想就業前景好的,想跟別人競爭能取得優勢的,想進阿裏面試但擔心面試不過的,你都可以來,群號為:575745314

demo

技術分享圖片

1、Executors.newFixedThreadPool(10)初始化一個包含10個線程的線程池executor;
2、通過executor.execute方法提交20個任務,每個任務打印當前的線程名;
3、負責執行任務的線程的生命周期都由Executor框架進行管理;

ThreadPoolExecutor

Executors是java線程池的工廠類,通過它可以快速初始化一個符合業務需求的線程池,如Executors.newFixedThreadPool方法可以生成一個擁有固定線程數的線程池。
技術分享圖片

其本質是通過不同的參數初始化一個ThreadPoolExecutor對象,具體參數描述如下:

corePoolSize

線程池中的核心線程數,當提交一個任務時,線程池創建一個新線程執行任務,直到當前線程數等於corePoolSize;如果當前線程數為corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有核心線程。

maximumPoolSize

線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小於maximumPoolSize;

keepAliveTime

線程空閑時的存活時間,即當線程沒有任務執行時,繼續存活的時間;默認情況下,該參數只在線程數大於corePoolSize時才有用;

unit

keepAliveTime的單位;

workQueue

用來保存等待被執行的任務的阻塞隊列,且任務必須實現Runable接口,在JDK中提供了如下阻塞隊列:
1、ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO排序任務;
2、LinkedBlockingQuene:基於鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQuene;
3、SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene;
4、priorityBlockingQuene:具有優先級的無界阻塞隊列;

threadFactory

創建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設置一個具有識別度的線程名。

技術分享圖片

handler

線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續提交任務,必須采取一種策略處理該任務,線程池提供了4種策略:
1、AbortPolicy:直接拋出異常,默認策略;
2、CallerRunsPolicy:用調用者所在的線程來執行任務;
3、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
4、DiscardPolicy:直接丟棄任務;
當然也可以根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務。

Exectors

Exectors工廠類提供了線程池的初始化接口,主要有如下幾種:

newFixedThreadPool

技術分享圖片

初始化一個指定線程數的線程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞隊列,不過當線程池沒有可執行任務時,也不會釋放線程。

newCachedThreadPool

技術分享圖片

1、初始化一個可以緩存線程的線程池,默認緩存60s,線程池的線程數可達到Integer.MAX_VALUE,即2147483647,內部使用SynchronousQueue作為阻塞隊列;
2、和newFixedThreadPool創建的線程池不同,newCachedThreadPool在沒有任務執行時,當線程的空閑時間超過keepAliveTime,會自動釋放線程資源,當提交新任務時,如果沒有空閑線程,則創建新線程執行任務,會導致一定的系統開銷;

所以,使用該線程池時,一定要註意控制並發的任務數,否則創建大量的線程可能導致嚴重的性能問題。

newSingleThreadExecutor

技術分享圖片

初始化的線程池中只有一個線程,如果該線程異常結束,會重新創建一個新的線程繼續執行任務,唯一的線程可以保證所提交任務的順序執行,內部使用LinkedBlockingQueue作為阻塞隊列。

newScheduledThreadPool

技術分享圖片

初始化的線程池可以在指定的時間內周期性的執行所提交的任務,在實際的業務場景中可以使用該線程池定期的同步數據。

實現原理

除了newScheduledThreadPool的內部實現特殊一點之外,其它幾個線程池都是基於ThreadPoolExecutor類實現的。

線程池內部狀態

技術分享圖片

其中AtomicInteger變量ctl的功能非常強大:利用低29位表示線程池中線程數,通過高3位表示線程池的運行狀態:
1、RUNNING:-1 << COUNT_BITS,即高3位為111,該狀態的線程池會接收新任務,並處理阻塞隊列中的任務;
2、SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務;
3、STOP : 1 << COUNT_BITS,即高3位為001,該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務;
4、TIDYING : 2 << COUNT_BITS,即高3位為010;
5、TERMINATED: 3 << COUNT_BITS,即高3位為011;

任務提交

線程池框架提供了兩種方式提交任務,根據不同的業務需求選擇不同的方式。

Executor.execute()

技術分享圖片

通過Executor.execute()方法提交的任務,必須實現Runnable接口,該方式提交的任務不能獲取返回值,因此無法判斷任務是否執行成功。

ExecutorService.submit()

技術分享圖片

通過ExecutorService.submit()方法提交的任務,可以獲取任務執行完的返回值。

任務執行

當向線程池中提交一個任務,線程池會如何處理該任務?

execute實現

技術分享圖片

具體的執行流程如下:
1、workerCountOf方法根據ctl的低29位,得到線程池的當前線程數,如果線程數小於corePoolSize,則執行addWorker方法創建新的線程執行任務;否則執行步驟(2);
2、如果線程池處於RUNNING狀態,且把提交的任務成功放入阻塞隊列中,則執行步驟(3),否則執行步驟(4);
3、再次檢查線程池的狀態,如果線程池沒有RUNNING,且成功從阻塞隊列中刪除任務,則執行reject方法處理任務;
4、執行addWorker方法創建新的線程執行任務,如果addWoker執行失敗,則執行reject方法處理任務;
技術分享圖片

addWorker實現

從方法execute的實現可以看出:addWorker主要負責創建新的線程並執行任務,代碼實現如下:
技術分享圖片

這只是addWoker方法實現的前半部分:
1、判斷線程池的狀態,如果線程池的狀態值大於或等SHUTDOWN,則不處理提交的任務,直接返回;
2、通過參數core判斷當前需要創建的線程是否為核心線程,如果core為true,且當前線程數小於corePoolSize,則跳出循環,開始創建新的線程,具體實現如下:
技術分享圖片

線程池的工作線程通過Woker類實現,在ReentrantLock鎖的保證下,把Woker實例插入到HashSet後,並啟動Woker中的線程,其中Worker類設計如下:
1、繼承了AQS類,可以方便的實現工作線程的中止操作;
2、實現了Runnable接口,可以將自身作為一個任務在工作線程中執行;
3、當前提交的任務firstTask作為參數傳入Worker的構造方法;
技術分享圖片

從Woker類的構造方法實現可以發現:線程工廠在創建線程thread時,將Woker實例本身this作為參數傳入,當執行start方法啟動線程thread時,本質是執行了Worker的runWorker方法。

runWorker實現

技術分享圖片

runWorker方法是線程池的核心:
1、線程啟動之後,通過unlock方法釋放鎖,設置AQS的state為0,表示運行中斷;
2、獲取第一個任務firstTask,執行任務的run方法,不過在執行任務之前,會進行加鎖操作,任務執行完會釋放鎖;
3、在執行任務的前後,可以根據業務場景自定義beforeExecute和afterExecute方法;
4、firstTask執行完成之後,通過getTask方法從阻塞隊列中獲取等待的任務,如果隊列中沒有任務,getTask方法會被阻塞並掛起,不會占用cpu資源;

getTask實現

技術分享圖片

整個getTask操作在自旋下完成:
1、workQueue.take:如果阻塞隊列為空,當前線程會被掛起等待;當隊列中有任務加入時,線程被喚醒,take方法返回任務,並執行;
2、workQueue.poll:如果在keepAliveTime時間內,阻塞隊列還是沒有任務,則返回null;

所以,線程池中實現的線程可以一直執行由用戶提交的任務。

Future和Callable實現

通過ExecutorService.submit()方法提交的任務,可以獲取任務執行完的返回值。
技術分享圖片

在實際業務場景中,Future和Callable基本是成對出現的,Callable負責產生結果,Future負責獲取結果。
1、Callable接口類似於Runnable,只是Runnable沒有返回值。
2、Callable任務除了返回正常結果之外,如果發生異常,該異常也會被返回,即Future可以拿到異步執行任務各種結果;
3、Future.get方法會導致主線程阻塞,直到Callable任務執行完成;

submit實現

技術分享圖片

通過submit方法提交的Callable任務會被封裝成了一個FutureTask對象。

FutureTask

技術分享圖片

1、FutureTask在不同階段擁有不同的狀態state,初始化為NEW;
2、FutureTask類實現了Runnable接口,這樣就可以通過Executor.execute方法提交FutureTask到線程池中等待被執行,最終執行的是FutureTask的run方法;

FutureTask.get實現

技術分享圖片

內部通過awaitDone方法對主線程進行阻塞,具體實現如下:
技術分享圖片

1、如果主線程被中斷,則拋出中斷異常;
2、判斷FutureTask當前的state,如果大於COMPLETING,說明任務已經執行完成,則直接返回;
3、如果當前state等於COMPLETING,說明任務已經執行完,這時主線程只需通過yield方法讓出cpu資源,等待state變成NORMAL;
4、通過WaitNode類封裝當前線程,並通過UNSAFE添加到waiters鏈表;
5、最終通過LockSupport的park或parkNanos掛起線程;

FutureTask.run實現

技術分享圖片

FutureTask.run方法是在線程池中被執行的,而非主線程
1、通過執行Callable任務的call方法;
2、如果call執行成功,則通過set方法保存結果;
3、如果call執行有異常,則通過setException保存異常;

set

技術分享圖片

setException

技術分享圖片

set和setException方法中,都會通過UnSAFE修改FutureTask的狀態,並執行finishCompletion方法通知主線程任務已經執行完成;

finishCompletion

技術分享圖片

1、執行FutureTask類的get方法時,會把主線程封裝成WaitNode節點並保存在waiters鏈表中;
2、FutureTask任務執行完成後,通過UNSAFE設置waiters的值,並通過LockSupport類unpark方法喚醒主線程;

深入分析java線程池的實現原理