1. 程式人生 > >JAVA執行緒池(ThreadPoolExecutor)原始碼分析

JAVA執行緒池(ThreadPoolExecutor)原始碼分析

中使用ThreadPoolExecutor的常用方式:
    例項程式碼1
Java程式碼  收藏程式碼
  1. Runnable runnable = new CountService(intArr);  
  2.        ThreadPoolExecutor execute = (ThreadPoolExecutor)Executors.newFixedThreadPool(10);  
  3.        //或者使用:ThreadPoolExecutor execute = (ThreadPoolExecutor)Executors.newCachedThreadPool();
  4.        execute.submit(runnable);  


    在分析ThreadPoolExecutor原始碼前,先了解下面兩個概念:
     1.核心執行緒(任務):
我們定義的執行緒,即實現了Runnable介面的類,是我們將要放到執行緒池中執行的類,如例項程式碼中的CountService類
     2.工作執行緒:由執行緒池中建立的執行緒,是用來獲得核心執行緒並執行核心執行緒的執行緒(比較拗口哦,具體看程式碼就知道是什麼東東了)。

    Executors是一個執行緒池工廠,各種型別的執行緒池都是通過它來建立的,注意把它和Executor分開,感覺這個執行緒池工廠命名有點問題。
    我們主要分析下我們提交任務的處理邏輯,即’execute.submit(runnable)’的實現。
Submit()方法是在ThreadPoolExecutor繼承的抽象類AbstractExecutorService中實現的,具體程式碼如下:

   Java程式碼  收藏程式碼
  1. public Future<?> submit(Runnable task) {  
  2.         if (task == nullthrownew NullPointerException();  
  3.        //對核心執行緒的一個包裝,RunnableFuture還是一個Runnable
  4.         RunnableFuture<Object> ftask = newTaskFor(task, null);  
  5.        //核心執行緒執行邏輯
  6.         execute(ftask);  
  7.         return ftask;  
  8.     }  

    從程式碼中可以看出,執行緒的執行邏輯通過execute()完成,而execute是在AbstractExecutorService的子類ThreadPoolExecutor中實現的。看,一個典型的模板模式!廢話少說,下面看ThreadPoolExecutor中execute()方法中程式碼:
  
    Java程式碼  收藏程式碼
  1. publicvoid execute(Runnable command) {  
  2.         if (command == null)  
  3.             thrownew NullPointerException();  
  4.         /* 
  5.          * command執行緒執行的整個邏輯在 addIfUnderCorePoolSize(command)方法中實現 
  6.          * 一般適用於FixedThreadPool 
  7.          */
  8.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
  9.             /* 
  10.              * poolSize >= corePoolSize條件成立情景:當建立的為CacheThreadPool時,條件 
  11.              * 就能成立 
  12.              */
  13.             if (runState == RUNNING && workQueue.offer(command)) {  
  14.                 if (runState != RUNNING || poolSize == 0)  
  15.                     //兩種情況下執行該方法:1.執行緒池shutdown  2.CacheThreadPool中第一個核心執行緒的執行
  16.                     ensureQueuedTaskHandled(command);  
  17.             }  
  18.             //CacheThreadPool中執行緒的執行邏輯
  19.             elseif (!addIfUnderMaximumPoolSize(command))  
  20.                 reject(command); // is shutdown or saturated
  21.         }  
  22.     }  


    注意:CachedThreadPool和FixedThreadPool的邏輯實現都是在ThreadPoolExecutor中實現的。它兩的主要區別就是屬性corePoolSize以及workQueue的初始值的不同。具體可自己檢視工程類Executors的newFixedThreadPool()和newCachedThreadPool方法。由於這些初始值的不同,所以實現的邏輯也不同,具體的我在程式碼中已經註釋了。
    command執行緒執行的整個邏輯在 addIfUnderCorePoolSize(command)方法中實現的,
詳細請看addIfUnderCorePoolSize(command)原始碼:


  Java程式碼  收藏程式碼
  1. privateboolean addIfUnderCorePoolSize(Runnable firstTask) {  
  2.        Thread t = null;  
  3.        final ReentrantLock mainLock = this.mainLock;  
  4.        mainLock.lock();  
  5.        try {  
  6.         //poolSize < corePoolSize 即當前工作執行緒的數量一定要小於你設定的執行緒最大數量
  7.         //CachedThreadPool永遠也不會進入該方法,因為它的corePoolSize初始為0
  8.            if (poolSize < corePoolSize && runState == RUNNING)  
  9.                t = addThread(firstTask);  
  10.        } finally {  
  11.            mainLock.unlock();  
  12.        }  
  13.        if (t == null)  
  14.            returnfalse;  
  15.        t.start();   //執行緒執行了
  16.        returntrue;  
  17.    }  


    看’t.start()’,這表示工作執行緒啟動了,工作執行緒t啟動的前提條件是’t = addThread(firstTask); ‘返回值t必須不為null。好了,現在想看看java執行緒池中工作執行緒是怎麼樣的嗎?請看addThread方法:
    Java程式碼  收藏程式碼
  1. private Thread addThread(Runnable firstTask) {  
  2.     //Worker就是典型的工作執行緒,所以的核心執行緒都在工作執行緒中執行
  3.        Worker w = new Worker(firstTask);  
  4.        //採用預設的執行緒工廠生產出一執行緒。注意就是設定一些執行緒的預設屬性,如優先順序、是否為後臺執行緒等
  5.        Thread t = threadFactory.newThread(w);   
  6.        if (t != null) {  
  7.            w.thread = t;  
  8.            workers.add(w);  
  9.          //沒生成一個工作執行緒 poolSize加1,但poolSize等於最大執行緒數corePoolSize時,則不能再生成工作執行緒
  10.            int nt = ++poolSize;    
  11.            if (nt > largestPoolSize)  
  12.                largestPoolSize = nt;  
  13.        }  
  14.        return t;  
  15.    }  


   看見沒,Worker就是工作執行緒類,它是ThreadPoolExecutor中的一個內部類。下面,我們主要分析Worker類,如瞭解了Worker類,那基本就瞭解了java執行緒池的整個原理了。不用怕,Worker類的邏輯很簡單,它其實就是一個執行緒,實現了Runnable介面的,所以,我們先從run方法入手,run方法原始碼如下:

  Java程式碼  收藏程式碼
  1. publicvoid run() {  
  2.             try {  
  3.                 Runnable task = firstTask;  
  4.                 firstTask = null;  
  5.                 /** 
  6.                  * 注意這段while迴圈的執行邏輯,沒執行完一個核心執行緒後,就會去執行緒池 
  7.                  * 佇列中取下一個核心執行緒,如取出的核心執行緒為null,則當前工作執行緒終止 
  8.                  */
  9.                 while (task != null || (task = getTask()) != null) {  
  10.                     runTask(task);  //你所提交的核心執行緒(任務)的執行邏輯
  11.                     task = null;  
  12.                 }  
  13.             } finally {  
  14.                 workerDone(this); // 當前工作執行緒退出
  15.             }  
  16.         }  
  17.     }  


    從原始碼中可看出,我們所提交的核心執行緒(任務)的邏輯是在Worker中的runTask()方法中實現的。這個方法很簡單,自己可以開啟看看。這裡要注意一點,在runTask()方法中執行核心執行緒時是呼叫核心執行緒的run()方法,這是一個尋常方法的呼叫,千萬別與執行緒的啟動(start())混合了。這裡還有一個比較重要的方法,那就是上述程式碼中while迴圈中的getTask()方法,它是一個從池佇列中取的核心執行緒(任務)的方法。具體程式碼如下:

    Java程式碼  收藏程式碼
  1. Runnable getTask() {  
  2.         for (;;) {  
  3.             try {  
  4.                 int state = runState;  
  5.                 if (state > SHUTDOWN)    
  6.                     returnnull;  
  7.                 Runnable r;  
  8.                 if (state == SHUTDOWN)  //幫助清空佇列
  9.                     r = workQueue.poll();  
  10.                /* 
  11.                 * 對於條件1,如果可以超時,則在等待keepAliveTime時間後,則返回一null物件,這時就 
  12.                 *  銷燬該工作執行緒,這就是CachedThreadPool為什麼能回收空閒執行緒的原因了。 
  13.                 * 注意以下幾點:1.這種功能情況一般不可能在fixedThreadPool中出現 
  14.                 *            2.在使用CachedThreadPool時,條件1一般總是成立,因為CachedThreadPool的corePoolSize 
  15.                 *              初始為0 
  16.                 */
  17.                 elseif (poolSize > corePoolSize || allowCoreThreadTimeOut)  //------------------條件1
  18.                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);    
  19.                 else
  20.                     r = workQueue.take();       //如果佇列不存在任何元素 則一直等待。 FiexedThreadPool典型模式----------條件2
  21.                 if (r != null)  
  22.                     return r;  
  23.                 if (workerCanExit()) {       //--------------------------條件3
  24.                     if (runState >= SHUTDOWN) // Wake up others
  25.                         interruptIdleWorkers();  
  26.                     returnnull;  
  27.                 }  
  28.                 // Else retry
  29.             } catch (InterruptedException ie) {  
  30.                 // On interruption, re-check runState
  31.             }  
  32.         }  
  33.     }  


    從這個方法中,我們需要了解一下幾點:
    1.CachedThreadPool獲得任務邏輯是條件1,條件1的處理邏輯請看註釋,CachedThreadPool執行條件1的原因是:CachedThreadPool的corePoolSize時刻為0。

    2.FixedThreadPool執行的邏輯為條件2,從’workQueue.take()’中我們就明白了為什麼FixedThreadPool不會釋放工作執行緒的原因了(除非你關閉執行緒池)。

    最後,我們瞭解下Worker(工作執行緒)終止時的處理吧,這個對理解CachedThreadPool有幫助,具體程式碼如下:

    Java程式碼  收藏程式碼
  1. /** 
  2.     * 工作執行緒退出要處理的邏輯 
  3.     * @param w 
  4.     */
  5.    void workerDone(Worker w) {  
  6.        final ReentrantLock mainLock = this.mainLock;  
  7.        mainLock.lock();  
  8.        try {  
  9.            completedTaskCount += w.completedTasks;   
  10.            workers.remove(w);  //從工作執行緒快取中刪除
  11.            if (--poolSize == 0//poolSize減一,這時其實又可以建立工作執行緒了
  12.                tryTerminate(); //嘗試終止
  13.        } finally {  
  14.            mainLock.unlock();  
  15.        }  
  16.    }  


    注意workDone()方法中的tyrTerminate()方法,它是你以後理解執行緒池中shuDown()以及CachedThreadPool原理的關鍵,具體程式碼如下:  

    Java程式碼  收藏程式碼
  1. privatevoid tryTerminate() {  
  2.     //終止的前提條件就是執行緒池裡已經沒有工作執行緒(Worker)了
  3.        if (poolSize == 0) {  
  4.            int state = runState;  
  5.            /** 
  6.             * 如果當前已經沒有了工作執行緒(Worker),但是執行緒佇列裡還有等待的執行緒任務,則建立一個 
  7.             * 工作執行緒來執行執行緒佇列中等待的任務 
  8.             */
  9.            if (state < STOP && !workQueue.isEmpty()) {      
  10.                state = RUNNING; // disable termination check below
  11.                Thread t = addThread(null);  
  12.                if (t != null)  
  13.                    t.start();  
  14.            }  
  15.            //設定池狀態為終止狀態
  16.            if (state == STOP || state == SHUTDOWN) {  
  17.                runState = TERMINATED;  
  18.                termination.signalAll();   
  19.                terminated();   
  20.            }  
  21.        }  
  22.    }  


    第一次寫這麼長的博文,還是躲著專案經理寫的,真不容易,希望能對想了解java執行緒池原理的朋友們有一點幫助。