1. 程式人生 > >戲(細)說Executor框架線程池任務執行全過程(上)

戲(細)說Executor框架線程池任務執行全過程(上)

文章 空間 重點 urn 枯燥 ash 成功 創建 使用

原文鏈接:http://ifeve.com/executor-framework-thread-pool-task-execution-part-01/

內容綜述

基於Executor接口中將任務提交和任務執行解耦的設計,ExecutorService和其各種功能強大的實現類提供了非常簡便方式來提交任務並獲取任務執行結果,封裝了任務執行的全部過程。本文嘗試通過對j.u.c.下該部分源碼的解析以ThreadPoolExecutor為例來追蹤任務提交、執行、獲取執行結果的整個過程。為了避免陷入枯燥的源碼解釋,將該過程和過程中涉及的角色與我們工作中的場景和場景中涉及的角色進行映射,力圖生動和深入淺出。

一、前言

1.5後引入的Executor框架的最大優點是把任務的提交和執行解耦。要執行任務的人只需把Task描述清楚,然後提交即可。這個Task是怎麽被執行的,被誰執行的,什麽時候執行的,提交的人就不用關心了。具體點講,提交一個Callable對象給ExecutorService(如最常用的線程池ThreadPoolExecutor),將得到一個Future對象,調用Future對象的get方法等待執行結果就好了。

經過這樣的封裝,對於使用者來說,提交任務獲取結果的過程大大簡化,調用者直接從提交的地方就可以等待獲取執行結果。而封裝最大的效果是使得真正執行任務的線程們變得不為人知。有沒有覺得這個場景似曾相識?我們工作中當老大的老大(且稱作LD^2)把一個任務交給我們老大(LD)的時候,到底是LD自己幹,還是轉過身來拉來一幫苦逼的兄弟加班加點幹,那LD^2是不管的。LD^2只用把人描述清楚提及給LD,然後喝著咖啡等著收LD的report即可。等LD一封郵件非常優雅地報告LD^2report結果時,實際操作中是碼農A和碼農B幹了一個月,還是碼農ABCDE加班幹了一個禮拜,大多是不用體現的。這套機制的優點就是LD^2找個合適的LD出來提交任務即可,接口友好有效,不用為具體怎麽幹費神費力。

二、 一個最簡單的例子

看上去這個執行過程是這個樣子。調用這段代碼的是老大的老大了,他所需要幹的所有事情就是找到一個合適的老大(如下面例子中laodaA就榮幸地被選中了),提交任務就好了。

01 // 一個有7個作業線程的線程池,老大的老大找到一個管7個人的小團隊的老大
02 ExecutorService laodaA = Executors.newFixedThreadPool(7);
03 //提交作業給老大,作業內容封裝在Callable中,約定好了輸出的類型是String。
04 String outputs = laoda.submit(
05 new Callable<String>() {
06 public String call() throws Exception
07 {
08 return "I am a task, which submited by the so called laoda, and run by those anonymous workers";
09 }
10 //提交後就等著結果吧,到底是手下7個作業中誰領到任務了,老大是不關心的。
11 }).get();
12
13 System.out.println(outputs);

使用上非常簡單,其實只有兩行語句來完成所有功能:創建一個線程池,提交任務並等待獲取執行結果。

例子中生成線程池采用了工具類Executors的靜態方法。除了newFixedThreadPool可以生成固定大小的線程池,newCachedThreadPool可以生成一個無界、可以自動回收的線程池,newSingleThreadScheduledExecutor可以生成一個單個線程的線程池。newScheduledThreadPool還可以生成支持周期任務的線程池。一般用戶場景下各種不同設置要求的線程池都可以這樣生成,不用自己new一個線程池出來。

三、代碼剖析

這套機制怎麽用,上面兩句語句就做到了,非常方便和友好。但是submit的task是怎麽被執行的?是誰執行的?如何做到在調用的時候只有等待執行結束才能get到結果。這些都是1.5之後Executor接口下的線程池、Future接口下的可獲得執行結果的的任務,配合AQS和原有的Runnable來做到的。在下文中我們嘗試通過剖析每部分的代碼來了解Task提交,Task執行,獲取Task執行結果等幾個主要步驟。為了控制篇幅,突出主要邏輯,文章中引用的代碼片段去掉了異常捕獲、非主要條件判斷、非主要操作。文中只是以最常用的ThreadPoolExecutor線程池舉例,其實ExecutorService接口下定義了很多功能豐富的其他類型,有各自的特點,但風格類似。本文重點是介紹任務提交的過程,過程中涉及的ExecutorService、ThreadPoolExecutor、AQS、Future、FutureTask等只會介紹該過程中用到的內容,不會對每個類都詳細展開。

1、 任務提交

從類圖上可以看到,接口ExecutorService繼承自Executor。不像Executor中只定義了一個方法來執行任務,在ExecutorService中,正如其名字暗示的一樣,定義了一個服務,定義了完整的線程池的行為,可以接受提交任務、執行任務、關閉服務。抽象類AbstractExecutorService類實現了ExecutorService接口,也實現了接口定義的默認行為。

技術分享

AbstractExecutorService任務提交的submit方法有三個實現。第一個接收一個Runnable的Task,沒有執行結果;第二個是兩個參數:一個任務,一個執行結果;第三個一個Callable,本身就包含執任務內容和執行結果。 submit方法的返回結果是Future類型,調用該接口定義的get方法即可獲得執行結果。 V get() 方法的返回值類型V是在提交任務時就約定好了的。

除了submit任務的方法外,作為對服務的管理,在ExecutorService接口中還定義了服務的關閉方法shutdown和shutdownNow方法,可以平緩或者立即關閉執行服務,實現該方法的子類根據自身特征支持該定義。在ThreadPoolExecutor中,維護了RUNNING、SHUTDOWN、STOP、TERMINATED四種狀態來實現對線程池的管理。線程池的完整運行機制不是本文的重點,重點還是關註submit過程中的邏輯。

1) 看AbstractExecutorService中代碼提交部分,構造好一個FutureTask對象後,調用execute()方法執行任務。我們知道這個方法是頂級接口Executor中定義的最重要的方法。。FutureTask類型實現了Runnable接口,因此滿足Executor中execute()方法的約定。同時比較有意思的是,該對象在execute執行後,就又作為submit方法的返回值返回,因為FutureTask同時又實現了Future接口,滿足Future接口的約定。

1 public <T> Future<T> submit(Callable<T> task) {
2 if (task == null) throw new NullPointerException();
3 RunnableFuture<T> ftask = newTaskFor(task);
4 execute(ftask);
5 return ftask;
6 }

2) Submit傳入的參數都被封裝成了FutureTask類型來execute的,對應前面三個不同的參數類型都會封裝成FutureTask。

1 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
2 return new FutureTask<T>(callable);
3 }

3) Executor接口中定義的execute方法的作用就是執行提交的任務,該方法在抽象類AbstractExecutorService中沒有實現,留到子類中實現。我們觀察下子類ThreadPoolExecutor,使用最廣泛的線程池如何來execute那些submit的任務的。這個方法看著比較簡單,但是線程池什麽時候創建新的作業線程來處理任務,什麽時候只接收任務不創建作業線程,另外什麽時候拒絕任務。線程池的接收任務、維護工作線程的策略都要在其中體現。

作為必要的預備知識,先補充下ThreadPoolExecutor有兩個最重要的集合屬性,分別是存儲接收任務的任務隊列和用來幹活的作業集合。

1 //任務隊列
2 private final BlockingQueue<Runnable> workQueue;
3 //作業線程集合
4 private final HashSet<Worker> workers = new HashSet<Worker>();

其中阻塞隊列workQueue是來存儲待執行的任務的,在構造線程池時可以選擇滿足該BlockingQueue 接口定義的SynchronousQueue、LinkedBlockingQueue或者DelayedWorkQueue等不同阻塞隊列來實現不同特征的線程池。

關註下execute(Runnable command)方法中調用到的addIfUnderCorePoolSize,workQueue.offer(command) , ensureQueuedTaskHandled(command),addIfUnderMaximumPoolSize(command)這幾個操作。尤其幾個名字較長的private方法,把方法名的駝峰式的單詞分開,加上對方法上下文的了解就能理解其功能。

因為前面說到的幾個方法在裏面即是操作,又返回一個布爾值,影響後面的邏輯,所以不大方便在方法體中為每條語句加註釋來說明,需要大致關聯起來看。所以首先需要把execute方法的主要邏輯說明下,再看其中各自方法的作用。

  • 如果線程池的狀態是RUNNING,線程池的大小小於配置的核心線程數,說明還可以創建新線程,則啟動新的線程執行這個任務。
  • 如果線程池的狀態是RUNNING ,線程池的大小小於配置的最大線程數,並且任務隊列已經滿了,說明現有線程已經不能支持當前的任務了,並且線程池還有繼續擴充的空間,就可以創建一個新的線程來處理提交的任務。
  • 如果線程池的狀態是RUNNING,當前線程池的大小大於等於配置的核心線程數,說明根據配置當前的線程數已經夠用,不用創建新線程,只需把任務加入任務隊列即可。如果任務隊列不滿,則提交的任務在任務隊列中等待處理;如果任務隊列滿了則需要考慮是否要擴展線程池的容量。
  • 當線程池已經關閉或者上面的條件都不能滿足時,則進行拒絕策略,拒絕策略在RejectedExecutionHandler接口中定義,可以有多種不同的實現。

上面其實也是對最主要思路的解析,詳細展開可能還會更復雜。簡單梳理下思路:構建線程池時定義了一個額定大小,當線程池內工作線程數小於額定大小,有新任務進來就創建新工作線程,如果超過該閾值,則一般就不創建了,只是把接收任務加到任務隊列裏面。但是如果任務隊列裏的任務實在太多了,那還是要申請額外的工作線程來幫忙。如果還是不夠用就拒絕服務。這個場景其實也是每天我們工作中會碰到的場景。我們管人的老大,手裏都有一定HC(Head Count),當上面老大有活分下來,手裏人不夠,但是不超過HC,我們就自己招人;如果超過了還是忙不過來,那就向上門老大申請借調人手來幫忙;如果還是幹不完,那就沒辦法了,新任務咱就不接了。

01 public void execute(Runnable command) {
02 if (command == null)
03 throw new NullPointerException();
04 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
05 if (runState == RUNNING && workQueue.offer(command)) {
06 if (runState != RUNNING || poolSize == 0)
07 ensureQueuedTaskHandled(command);
08 }
09 else if (!addIfUnderMaximumPoolSize(command))
10 reject(command); // is shutdown or saturated
11 }
12 }

4) addIfUnderCorePoolSize方法檢查如果當前線程池的大小小於配置的核心線程數,說明還可以創建新線程,則啟動新的線程執行這個任務。

1 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
2 Thread t = null;
3 //如果當前線程池的大小小於配置的核心線程數,說明還可以創建新線程
4 if (poolSize < corePoolSize && runState == RUNNING)
5 // 則啟動新的線程執行這個任務
6 t = addThread(firstTask);
7 return t != null;
8 }

5) 和上一個方法類似,addIfUnderMaximumPoolSize檢查如果線程池的大小小於配置的最大線程數,並且任務隊列已經滿了(就是execute方法試圖把當前線程加入任務隊列時不成功),說明現有線程已經不能支持當前的任務了,但線程池還有繼續擴充的空間,就可以創建一個新的線程來處理提交的任務。

1 private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
2 // 如果線程池的大小小於配置的最大線程數,並且任務隊列已經滿了(就是execute方法中試圖把當前線程加入任務隊列workQueue.offer(command)時候不成功),說明現有線程已經不能支持當前的任務了,但線程池還有繼續擴充的空間
3 if (poolSize < maximumPoolSize && runState == RUNNING)
4 //就可以創建一個新的線程來處理提交的任務
5 t = addThread(firstTask);
6 return t != null;
7 }

6) 在ensureQueuedTaskHandled方法中,判斷如果當前狀態不是RUNING,則當前任務不加入到任務隊列中,判斷如果狀態是停止,線程數小於允許的最大數,且任務隊列還不空,則加入一個新的工作線程到線程池來幫助處理還未處理完的任務。

01 private void ensureQueuedTaskHandled(Runnable command) {
02 // 如果當前狀態不是RUNING,則當前任務不加入到任務隊列中,判斷如果狀態是停止,線程數小於允許的最大數,且任務隊列還不空
03 if (state < STOP &&
04 poolSize < Math.max(corePoolSize, 1) &&
05 !workQueue.isEmpty())
06 //則加入一個新的工作線程到線程池來幫助處理還未處理完的任務
07 t = addThread(null);
08 if (reject)
09 reject(command);
10 }

7) 在前面方法中都會調用adThread方法創建一個工作線程,差別是創建的有些工作線程上面關聯接收到的任務firstTask,有些沒有。該方法為當前接收到的任務firstTask創建Worker,並將Worker添加到作業集合HashSet<Worker> workers中,並啟動作業。

01 private Thread addThread(Runnable firstTask) {
02 //為當前接收到的任務firstTask創建Worker
03 Worker w = new Worker(firstTask);
04 Thread t = threadFactory.newThread(w);
05 w.thread = t;
06 //將Worker添加到作業集合HashSet&lt;Worker&gt; workers中,並啟動作業
07 workers.add(w);
08 t.start();
09 return t;
10 }

至此,任務提交過程簡單描述完畢,並介紹了任務提交後ExecutorService框架下線程池的主要應對邏輯,其實就是接收任務,根據需要創建或者維護管理線程。

維護這些工作線程幹什麽用?先不用看後面的代碼,想想我們老大每月辛苦地把老板豐厚的薪水遞到我們手裏,定期還要領著大家出去happy下,又是定期的關心下個人生活,所有做的這些都是為什麽呢?木訥的代碼工不往這邊使勁動腦子,但是猜還是能猜的到的,就讓幹活唄。本文想著重表達細節,諸如線程池裏的Worker是怎麽工作的,Task到底是不是在這些工作線程中執行的,如何保證執行完成後,外面等待任務的老大拿到想要結果,我們將在下篇文章中詳細介紹。

戲(細)說Executor框架線程池任務執行全過程(上)