1. 程式人生 > >執行緒池之Executor框架

執行緒池之Executor框架

# 執行緒池之Executor框架 Java的執行緒既是工作單元,也是執行機制。從JDK5開始,把工作機單元和執行機制分離開來。**工作單元包括Runnable和Callable,而執行機制由Executor框架提供。** ### 1. Executor框架簡介 #### 1.1 Executor框架的兩級排程模型 在上層,Java多執行緒程式通常把應用分解為若干個任務,然後使用使用者級的排程器(Executor框架)將這些任務對映為固定數量的執行緒。 在底層,作業系統核心將這些執行緒對映到硬體處理器上。 ![image-20200820220148034](https://chiaki.oss-cn-beijing.aliyuncs.com/image-20200820220148034.png) #### 1.2 Executor框架的結構 Executor框架主要由3部分組成: - **任務**。包括被執行任務需要實現的介面:Runnable介面或者Callable介面。 - **任務的執行**。包括任務執行機制的核心介面Executor,以及繼承自Executor的ExecutorService介面。Executor框架有兩個關鍵類實現了ExecutorService介面(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。 - **非同步計算的結果**。包括Future和實現Future的FutureTask類。 Executor框架的成員及其關係可以用一下的關係圖表示: ![image-20200820221319222](https://chiaki.oss-cn-beijing.aliyuncs.com/image-20200820221319222.png) Executor框架的使用示意圖: ![image-20200820222719384](https://chiaki.oss-cn-beijing.aliyuncs.com/image-20200820222719384.png) 使用步驟: - 主執行緒首先建立實現Runnable或Callable介面的任務物件。工具類Executors可以把一個Runnable物件封裝為一個Callable物件(`Executors.callable(Runnable task)`或`Executors.callable(Runnable task, Object result)`)。 - 建立Executor介面的實現類ThreadPoolExecutor類或者ScheduledThreadPoolExecutor類的物件,然後呼叫其execute()方法或者submit()方法把工作任務新增到執行緒中,如果有返回值則返回Future物件。其中Callable物件有返回值,因此使用submit()方法;而Runnable可以使用execute()方法,此外還可以使用submit()方法,只要使用callable(Runnable task)或者callable(Runnable task, Object result)方法把Runnable物件包裝起來就可以,使用callable(Runnable task)方法返回的null,使用callable(Runnable task, Object result)方法返回result。 - 主執行緒可以執行Future物件的get()方法獲取返回值,也可以呼叫cancle()方法取消當前執行緒的執行。 #### 1.3 Executor框架的使用案例 ```java import java.util.concurrent.*; public class ExecutorDemo { // 建立ThreadPoolExecutor實現類 private static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), ); public static void main(String[] args) { // 採用submit()方法提交Callable物件並返回Future物件 Future future = executor.submit(new callableDemo()); try { // get()方法獲取返回值 System.out.println(future.get()); } catch (InterruptedException | ExecutionException e) { // 處理異常 e.printStackTrace(); } finally { // 關閉執行緒池 executor.shutdown(); } } } /** * 建立Callable介面的實現類 */ class callableDemo implements Callable { @Override public String call() throws Exception { Thread.sleep(1000); String s = "return string"; return s; } } ``` ### 2. Executor框架成員 ![image-20200820225956625](https://chiaki.oss-cn-beijing.aliyuncs.com/image-20200820225956625.png) #### 2.1 ThreadPoolExecutor 直接建立ThreadPoolExecutor的例項物件,見https://www.cnblogs.com/chiaki/p/13536624.html ThreadPoolExecutor通常使用工廠類Executors建立,可以建立3種類型的ThreadPoolExecutor,即FixedThreadPool、SingleThreadExecutor以及CachedThreadPool。 - **FixedThreadPool**:**適用於為了滿足資源管理的需求,而需要限制當先執行緒數量的應用場景,適用於負載比較重的伺服器。** ```java public static ExecutorService es = Executors.newFixedThreadPool(int threadNums); public static ExecutorService es = Executors.newFixedThreadPool(int threadNums, ThreadFactory threadFactory); ``` ```java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } ``` 把執行緒池最大執行緒數量maxmumPoolSize和核心執行緒池的數量corePoolSize設定為`threadNums`,將引數keepAliveTime設定為`0L`。使用無界佇列`LinkedBlockingQueue`作為阻塞佇列,因此當任務不能立刻執行時,都會新增到阻塞佇列中,而且maximumPoolSize,keepAliveTime都是無效的。 - **SingleThreadExecutor:**適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個執行緒是活動地應用場景。** ```java public static ExecutorService es = Executors.newSingleThreadExecutor(); public static ExecutorService es = Executors.newSingleThreadExecutor(ThreadFactory threadFactory); ``` ```java public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } ``` 因為阻塞佇列使用的是`LinkedBlockingQueue`,因此和FixedThreadPool一樣,引數maximumPoolSize以及keepAliveTime都是無效的。corePoolSize為`1`,因此**最多隻能建立一個執行緒**。 - **CachedThreadPool**:**大小無界的執行緒池,適用於執行很多的短期非同步任務的小程式,或者是負載較輕的伺服器。** ```java public static ExecutorService es = Executors.newCachedThreadPool(); public static ExecutorService es = Executors.newCachedThreadPool(ThreadFactory threadFactory); ``` ```java public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } ``` CachedThreadPool使用`SynchronizedQueue`作為阻塞佇列,SynchronizedQueue是不儲存元素的阻塞佇列,實現**“一對一的交付”**,也就是說,每次向佇列中put一個任務必須等有執行緒來take這個任務,否則就會一直阻塞該任務,如果一個執行緒要take一個任務就要一直阻塞知道有任務被put進阻塞佇列。 因為CachedThreadPool的maximumPoolSize為`Integer.MUX_VALUE`,因此**CachedThreadPool是無界的執行緒池,也就是說可以一直不斷的建立執行緒,這樣可能會使CPU和記憶體資源耗盡**。corePoolSize為`0 `,因此在**CachedThreadPool中直接通過阻塞佇列來進行任務的提交**。 #### 2.2 ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor類繼承了ThreadPoolExecutor並實現了ScheduledExecutorService介面。**主要用於在給定的延遲後執行任務或者定期執行任務。** ScheduledThreadPoolExecutor通常使用Executors工廠類來建立,可建立2種類型的ScheduledThreadPoolExecutor,即ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor。 - **ScheduledThreadPoolExecutor:適用於若干個(固定)執行緒延時或者定期執行任務,同時為了滿足資源管理的需求而需要限制後臺執行緒數量的場景。** ```java public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums); public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums, ThreadFactory threadFactory); ``` - **SingleThreadScheduledExecutor:適用於需要單個執行緒延時或者定期的執行任務,同時需要保證各個任務順序執行的應用場景。** ```java public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums); public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums, ThreadFactory threadFactory); ``` ScheduledThreadPoolExecutor的實現: ScheduledThreadPoolExecutor的實現主要是通過**把任務封裝為ScheduledFutureTask來實現**。通過呼叫scheduledAtFixedTime()方法或者scheduledWithFixedDelay()方法向阻塞佇列新增一個實現了RunnableScheduledFutureTask介面的ScheduledFutureTask類物件。ScheduledFutureTask主要包括3個成員變數: ```java // 序列號,用於儲存任務新增到阻塞佇列的順序 private final long sequenceNumber; // 用於儲存該任務將要被執行的具體時間 private long time; // 週期,用於儲存任務直線的間隔週期 private final long period; ``` ScheduledTreadPoolExecutor的阻塞佇列是用**無界佇列**`DelayQueue`實現的,可以實現元素延時delayTime後才能獲取元素,在ScheduledThreadPoolExecutor中,DelayQueue內部封裝了一個PriorityQueue,來對任務進行排序,首先對time排序,time小的在前,如果time一樣,則sequence小的在前,也就是說如果time一樣,那麼先被提交的任務先執行。 因為DelayQueue是一個無界的佇列,因此執行緒池的maximumPoolSize是無效的。ScheduledThreadPoolExecutor的工作流程大致如下: ![image-20200820233614169](https://chiaki.oss-cn-beijing.aliyuncs.com/image-20200820233614169.png) #### 2.3 Future介面/FutureTask實現類 Future介面和實現Future介面的FutureTask實現類,代表非同步計算的結果。 ##### 2.3.1 FutureTask的使用 FutureTask除了實現Future介面外還實現了Runnable介面。因此,FutureTask可以交給Executor執行,也可以條用執行緒直接執行(`FutureTask.run()`)。根據FutureTask.run()方法被執行的時機,FutureTask可處於以下3種狀態: - 未啟動:建立了一個FutureTask物件但沒有執行FutureTask.run(); - 已啟動:FutureTask.run()方法被執行的過程中; - 已完成:FutureTask.run()正常執行結束,或者FutureTask被取消(`FutureTask.cancel()`),或者執行FutureTask.run()時丟擲異常而異常結束; 狀態遷移示意圖: ![image-20200820234612077](https://chiaki.oss-cn-beijing.aliyuncs.com/image-20200820234612077.png) FutureTask的get和cancle執行示意圖: ![image-20200820234818187](https://chiaki.oss-cn-beijing.aliyuncs.com/image-20200820234818187.png) ##### 2.3.2 FutureTask的實現 **FutureTask是一個基於AQS同步佇列實現的一個自定義同步元件**,通過對同步狀態state的競爭實現acquire或者release操作。 FutureTask的內部類Sync實現了AQS介面,通過**對tryAcquire等抽象方法的重寫和模板方法的呼叫來實現內部類Sync的tryAcquireShared等方法**,然後聚合Sync的方法來實現FutureTask的get和cancel等方法。 FutureTask的設計示意圖: ![image-20200820235358434](https://chiaki.oss-cn-beijing.aliyuncs.com/image-20200820235358434.png) FutureTask的get方法最終會呼叫`AQS.acquireSharedInterruptibly(int arg)`方法: - 呼叫`AQS.acquireSharedInterruptibly(int arg)`方法會首先呼叫`tryAcquireShared()`方法判斷acquire操作是否可以成功,可以成功的條件是state為執行完成狀態RAN或者已取消狀態CANCELLED,且runner不為null; - 如果成功則`get()`方法立即返回,如果失敗則到執行緒等待佇列執行release操作; - 當其他執行緒執行release操作喚醒當前執行緒後(比如`FutureTask.run()`或`FutureTask.cancle(...)`),當前執行緒再次執行`tryAcquireShared()`將返回正值1,當前執行緒離開現場等待佇列並喚醒它的後繼執行緒(級聯喚醒); - 最後返回計算的結果或丟擲異常。 ![image-20200821000310543](https://chiaki.oss-cn-beijing.aliyuncs.com/image-20200821000310543.png) ##### 2.3.3 FutureTask的使用場景 - 當一個執行緒需要等待另一個執行緒把某個任務執行完以後它才能繼續執行時; - 有若干執行緒執行若干任務,每個任務最多隻能被執行一次; - 當多個執行緒師徒執行同一個任務,但只能允許一個執行緒執行此任務,其它執行緒需要等這個任務被執行完畢以後才能繼續執行時。 #### 2.4 Runnable和Callable介面 用於實現執行緒要執行的工作單元。 #### 2.5 Executors工廠類 提供了常見配置執行緒池的方法,因為ThreadPoolExecutor的引數眾多且意義重大,為了避免配置出錯,才有了Executors工廠類。 ### 3. 為什麼不建議使用Executors建立執行緒池? **FixedThreadPool**和**SingleThreadExecutor**:允許請求的佇列長度為Integer.MAX_VALUE(無界的阻塞佇列),可能堆積大量的請求,從而導致OOM。 **CachedThreadPool**和**ScheduledThreadPool**:允許建立的執行緒數量為Integer.MAX_VALUE(無界的阻塞佇列),可能會建立大量執行緒,從而導