1. 程式人生 > >Java網際網路程式設計——深入分析java執行緒池的實現原理

Java網際網路程式設計——深入分析java執行緒池的實現原理

月亮姨的嘮叨:

執行緒是稀缺資源,如果被無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,合理的使用執行緒池對執行緒進行統一分配、調優和監控,有以下好處:

1、降低資源消耗;

2、提高響應速度;

3、提高執行緒的可管理性。

Java1.5中引入的Executor框架把任務的提交和執行進行解耦,只需要定義好任務,然後提交給執行緒池,而不用關心該任務是如何執行、被哪個執行緒執行,以及什麼時候執行。

demo

1、Executors.newFixedThreadPool(10)初始化一個包含10個執行緒的執行緒池executor;

2、通過executor.execute方法提交20個任務,每個任務列印當前的執行緒名;

3、負責執行任務的執行緒的生命週期都由Executor框架進行管理;

ThreadPoolExecutor

Executors是java執行緒池的工廠類,通過它可以快速初始化一個符合業務需求的執行緒池,如Executors.newFixedThreadPool方法可以生成一個擁有固定執行緒數的執行緒池。

其本質是通過不同的引數初始化一個ThreadPoolExecutor物件,具體引數描述如下:

corePoolSize

執行緒池中的核心執行緒數,當提交一個任務時,執行緒池建立一個新執行緒執行任務,直到當前執行緒數等於corePoolSize;如果當前執行緒數為corePoolSize,繼續提交的任務被儲存到阻塞佇列中,等待被執行;如果執行了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有核心執行緒。

maximumPoolSize

執行緒池中允許的最大執行緒數。如果當前阻塞佇列滿了,且繼續提交任務,則建立新的執行緒執行任務,前提是當前執行緒數小於maximumPoolSize;

keepAliveTime

執行緒空閒時的存活時間,即當執行緒沒有任務執行時,繼續存活的時間;預設情況下,該引數只在執行緒數大於corePoolSize時才有用;

unit

keepAliveTime的單位;

workQueue

用來儲存等待被執行的任務的阻塞佇列,且任務必須實現Runable介面,在JDK中提供瞭如下阻塞佇列:

1、ArrayBlockingQueue:基於陣列結構的有界阻塞佇列,按FIFO排序任務;

2、LinkedBlockingQuene:基於連結串列結構的阻塞佇列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQuene;

3、SynchronousQuene:一個不儲存元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene;

4、priorityBlockingQuene:具有優先順序的無界阻塞佇列;

threadFactory

建立執行緒的工廠,通過自定義的執行緒工廠可以給每個新建的執行緒設定一個具有識別度的執行緒名。

handler

執行緒池的飽和策略,當阻塞佇列滿了,且沒有空閒的工作執行緒,如果繼續提交任務,必須採取一種策略處理該任務,執行緒池提供了4種策略:

1、AbortPolicy:直接丟擲異常,預設策略;

2、CallerRunsPolicy:用呼叫者所在的執行緒來執行任務;

3、DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,並執行當前任務;

4、DiscardPolicy:直接丟棄任務;

當然也可以根據應用場景實現RejectedExecutionHandler介面,自定義飽和策略,如記錄日誌或持久化儲存不能處理的任務。

Exectors

Exectors工廠類提供了執行緒池的初始化介面,主要有如下幾種:

newFixedThreadPool

初始化一個指定執行緒數的執行緒池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞佇列,不過當執行緒池沒有可執行任務時,也不會釋放執行緒。

newCachedThreadPool

1、初始化一個可以快取執行緒的執行緒池,預設快取60s,執行緒池的執行緒數可達到Integer.MAX_VALUE,即2147483647,內部使用SynchronousQueue作為阻塞佇列;

2、和newFixedThreadPool建立的執行緒池不同,newCachedThreadPool在沒有任務執行時,當執行緒的空閒時間超過keepAliveTime,會自動釋放執行緒資源,當提交新任務時,如果沒有空閒執行緒,則建立新執行緒執行任務,會導致一定的系統開銷;

所以,使用該執行緒池時,一定要注意控制併發的任務數,否則建立大量的執行緒可能導致嚴重的效能問題。

newSingleThreadExecutor

初始化的執行緒池中只有一個執行緒,如果該執行緒異常結束,會重新建立一個新的執行緒繼續執行任務,唯一的執行緒可以保證所提交任務的順序執行,內部使用LinkedBlockingQueue作為阻塞佇列。

newScheduledThreadPool

初始化的執行緒池可以在指定的時間內週期性的執行所提交的任務,在實際的業務場景中可以使用該執行緒池定期的同步資料。

實現原理

除了newScheduledThreadPool的內部實現特殊一點之外,其它幾個執行緒池都是基於ThreadPoolExecutor類實現的。

執行緒池內部狀態

其中AtomicInteger變數ctl的功能非常強大:利用低29位表示執行緒池中執行緒數,通過高3位表示執行緒池的執行狀態:

1、RUNNING:-1 << COUNT_BITS,即高3位為111,該狀態的執行緒池會接收新任務,並處理阻塞佇列中的任務;

2、SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態的執行緒池不會接收新任務,但會處理阻塞佇列中的任務;

3、STOP : 1 << COUNT_BITS,即高3位為001,該狀態的執行緒不會接收新任務,也不會處理阻塞佇列中的任務,而且會中斷正在執行的任務;

4、TIDYING : 2 << COUNT_BITS,即高3位為010;

5、TERMINATED: 3 << COUNT_BITS,即高3位為011;

任務提交

執行緒池框架提供了兩種方式提交任務,根據不同的業務需求選擇不同的方式。

Executor.execute()

通過Executor.execute()方法提交的任務,必須實現Runnable介面,該方式提交的任務不能獲取返回值,因此無法判斷任務是否執行成功。

ExecutorService.submit()

通過ExecutorService.submit()方法提交的任務,可以獲取任務執行完的返回值。

任務執行

當向執行緒池中提交一個任務,執行緒池會如何處理該任務?

execute實現

具體的執行流程如下:

1、workerCountOf方法根據ctl的低29位,得到執行緒池的當前執行緒數,如果執行緒數小於corePoolSize,則執行addWorker方法建立新的執行緒執行任務;否則執行步驟(2);

2、如果執行緒池處於RUNNING狀態,且把提交的任務成功放入阻塞佇列中,則執行步驟(3),否則執行步驟(4);

3、再次檢查執行緒池的狀態,如果執行緒池沒有RUNNING,且成功從阻塞佇列中刪除任務,則執行reject方法處理任務;

4、執行addWorker方法建立新的執行緒執行任務,如果addWoker執行失敗,則執行reject方法處理任務;

addWorker實現

從方法execute的實現可以看出:addWorker主要負責建立新的執行緒並執行任務,程式碼實現如下:

這只是addWoker方法實現的前半部分:

1、判斷執行緒池的狀態,如果執行緒池的狀態值大於或等SHUTDOWN,則不處理提交的任務,直接返回;

2、通過引數core判斷當前需要建立的執行緒是否為核心執行緒,如果core為true,且當前執行緒數小於corePoolSize,則跳出迴圈,開始建立新的執行緒,具體實現如下:

執行緒池的工作執行緒通過Woker類實現,在ReentrantLock鎖的保證下,把Woker例項插入到HashSet後,並啟動Woker中的執行緒,其中Worker類設計如下:

1、繼承了AQS類,可以方便的實現工作執行緒的中止操作;

2、實現了Runnable介面,可以將自身作為一個任務在工作執行緒中執行;

3、當前提交的任務firstTask作為引數傳入Worker的構造方法;

從Woker類的構造方法實現可以發現:執行緒工廠在建立執行緒thread時,將Woker例項本身this作為引數傳入,當執行start方法啟動執行緒thread時,本質是執行了Worker的runWorker方法。

runWorker實現

runWorker方法是執行緒池的核心:

1、執行緒啟動之後,通過unlock方法釋放鎖,設定AQS的state為0,表示執行中斷;

2、獲取第一個任務firstTask,執行任務的run方法,不過在執行任務之前,會進行加鎖操作,任務執行完會釋放鎖;

3、在執行任務的前後,可以根據業務場景自定義beforeExecute和afterExecute方法;

4、firstTask執行完成之後,通過getTask方法從阻塞佇列中獲取等待的任務,如果佇列中沒有任務,getTask方法會被阻塞並掛起,不會佔用cpu資源;

getTask實現

整個getTask操作在自旋下完成:

1、workQueue.take:如果阻塞佇列為空,當前執行緒會被掛起等待;當佇列中有任務加入時,執行緒被喚醒,take方法返回任務,並執行;

2、workQueue.poll:如果在keepAliveTime時間內,阻塞佇列還是沒有任務,則返回null;

所以,執行緒池中實現的執行緒可以一直執行由使用者提交的任務。

Future和Callable實現

通過ExecutorService.submit()方法提交的任務,可以獲取任務執行完的返回值。

在實際業務場景中,Future和Callable基本是成對出現的,Callable負責產生結果,Future負責獲取結果。

1、Callable介面類似於Runnable,只是Runnable沒有返回值。

2、Callable任務除了返回正常結果之外,如果發生異常,該異常也會被返回,即Future可以拿到非同步執行任務各種結果;

3、Future.get方法會導致主執行緒阻塞,直到Callable任務執行完成;

submit實現

通過submit方法提交的Callable任務會被封裝成了一個FutureTask物件。

FutureTask

1、FutureTask在不同階段擁有不同的狀態state,初始化為NEW;

2、FutureTask類實現了Runnable介面,這樣就可以通過Executor.execute方法提交FutureTask到執行緒池中等待被執行,最終執行的是FutureTask的run方法;

FutureTask.get實現

內部通過awaitDone方法對主執行緒進行阻塞,具體實現如下:

1、如果主執行緒被中斷,則丟擲中斷異常;

2、判斷FutureTask當前的state,如果大於COMPLETING,說明任務已經執行完成,則直接返回;

3、如果當前state等於COMPLETING,說明任務已經執行完,這時主執行緒只需通過yield方法讓出cpu資源,等待state變成NORMAL;

4、通過WaitNode類封裝當前執行緒,並通過UNSAFE新增到waiters連結串列;

5、最終通過LockSupport的park或parkNanos掛起執行緒;

FutureTask.run實現

FutureTask.run方法是線上程池中被執行的,而非主執行緒

1、通過執行Callable任務的call方法;

2、如果call執行成功,則通過set方法儲存結果;

3、如果call執行有異常,則通過setException儲存異常;

set

setException

set和setException方法中,都會通過UnSAFE修改FutureTask的狀態,並執行finishCompletion方法通知主執行緒任務已經執行完成;

finishCompletion

1、執行FutureTask類的get方法時,會把主執行緒封裝成WaitNode節點並儲存在waiters連結串列中;

2、FutureTask任務執行完成後,通過UNSAFE設定waiters的值,並通過LockSupport類unpark方法喚醒主執行緒;

需要了解更多可以加大牛交流群進行討論,本群提供免費架構視訊資料!!

QQ群:836442475(架構華山論劍)