1. 程式人生 > >jdk1.7的Java執行緒池架構原理和原始碼解析(ThreadPoolExecutor)

jdk1.7的Java執行緒池架構原理和原始碼解析(ThreadPoolExecutor)

我們現在來看看ThreadPoolExecutor的原始碼是怎麼樣的,也許你剛開始看他的原始碼會很痛苦,因為你不知道作者為什麼是這樣設計的,所以本文就我看到的思想會給你做一個介紹,此時也許你通過知道了一些作者的思想,你也許就知道應該該如何去操作了。

這裡來看下構造方法中對那些屬性做了賦值:

原始碼段1:

public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler) {
     if (corePoolSize < 0 ||
         maximumPoolSize <= 0 ||
         maximumPoolSize < corePoolSize ||
         keepAliveTime < 0)
         throw new IllegalArgumentException();
     if (workQueue == null || threadFactory == null || handler == null)
         throw new NullPointerException();
     this.corePoolSize = corePoolSize;
     this.maximumPoolSize = maximumPoolSize;
     this.workQueue = workQueue;
     this.keepAliveTime = unit.toNanos(keepAliveTime);
     this.threadFactory = threadFactory;
     this.handler = handler;
 }


這裡你可以看到最終賦值的過程,可以先大概知道下引數的意思:

corePoolSize:核心執行的poolSize,也就是當超過這個範圍的時候,就需要將新的Thread放入到等待佇列中了;

maximumPoolSize:一般你用不到,當大於了這個值就會將Thread由一個丟棄處理機制來處理,但是當你發生:newFixedThreadPool的時候,corePoolSize和maximumPoolSize是一樣的,而corePoolSize是先執行的,所以他會先被放入等待佇列,而不會執行到下面的丟棄處理中,看了後面的程式碼你就知道了。

workQueue:等待佇列,當達到corePoolSize的時候,就向該等待佇列放入執行緒資訊(預設為一個LinkedBlockingQueue

),執行中的佇列屬性為:workers,為一個HashSet;內部被包裝了一層,後面會看到這部分程式碼。

keepAliveTime:預設都是0,當執行緒沒有任務處理後,保持多長時間,cachedPoolSize是預設60s,不推薦使用。

threadFactory:是構造Thread的方法,你可以自己去包裝和傳遞,主要實現newThread方法即可;

handler:也就是引數maximumPoolSize達到後丟棄處理的方法,java提供了5種丟棄處理的方法,當然你也可以自己弄,主要是要實現介面:RejectedExecutionHandler中的方法:

public void rejectedExecution

(Runnabler, ThreadPoolExecutor e)

java預設的是使用:AbortPolicy,他的作用是當出現這中情況的時候會丟擲一個異常;其餘的還包含:

1、CallerRunsPolicy:如果發現執行緒池還在執行,就直接執行這個執行緒

2、DiscardOldestPolicy:線上程池的等待佇列中,將頭取出一個拋棄,然後將當前執行緒放進去。

3、DiscardPolicy:什麼也不做

4、AbortPolicy:java預設,丟擲一個異常:RejectedExecutionException。

通常你得到執行緒池後,會呼叫其中的:submit方法或execute方法去操作;其實你會發現,submit方法最終會呼叫execute方法來進行操作,只是他提供了一個Future來託管返回值的處理而已,當你呼叫需要有返回值的資訊時,你用它來處理是比較好的;這個Future會包裝對Callable資訊,並定義一個Sync物件(),當你發生讀取返回值的操作的時候,會通過Sync物件進入鎖,直到有返回值的資料通知,具體細節先不要看太多,繼續向下:

來看看execute最為核心的方法吧:

原始碼段2:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}


這段程式碼看似簡單,其實有點難懂,很多人也是這裡沒看懂,沒事,我一個if一個if說:

首先第一個判定空操作就不用說了,下面判定的poolSize >= corePoolSize成立時候會進入if的區域,當然它不成立也有可能會進入,他會判定addIfUnderCorePoolSize是否返回false,如果返回false就會進去;

我們先來看下addIfUnderCorePoolSize方法的原始碼是什麼:

原始碼段3:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}


可以發現,這段原始碼是如果發現小雨corePoolSize就會建立一個新的執行緒,並且呼叫執行緒的start()方法將執行緒執行起來:這個addThread()方法,我們先不考慮細節,因為我們還要先看到前面是怎麼進去的,這裡可以發信啊,只有沒有建立成功Thread才會返回false,也就是噹噹前的poolSize > corePoolSize的時候,或執行緒池已經不是在running狀態的時候才會出現;

注意:這裡在外部判定一次poolSize和corePoolSize只是初步判定,內部是加鎖後判定的,以得到更為準確的結果,而外部初步判定如果是大於了,就沒有必要進入這段有鎖的程式碼了。

此時我們知道了,當前執行緒數量大於corePoolSize的時候,就會進入【程式碼段2】的第一個if語句中,回到【原始碼段2】,繼續看if語句中的內容:

這裡標記為

原始碼段4


if (runState == RUNNING && workQueue.offer(command)) {
   if (runState != RUNNING || poolSize == 0)
       ensureQueuedTaskHandled(command);
   }
   else if (!addIfUnderMaximumPoolSize(command))
       reject(command); // is shutdown or saturated


第一個if,也就是噹噹前狀態為running的時候,就會去執行workQueue.offer(command),這個workQueue其實就是一個BlockingQueue,offer()操作就是在佇列的尾部寫入一個物件,此時寫入的物件為執行緒的物件而已;所以你可以認為只有執行緒池在RUNNING狀態,才會在佇列尾部插入資料,否則就執行else if,其實else if可以看出是要做一個是否大於MaximumPoolSize的判定,如果大於這個值,就會做reject的操作,關於reject的說明,我們在【原始碼段1】的解釋中已經非常明確的說明,這裡可以簡單看下原始碼,以應徵結果:

原始碼段5:

    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                //在corePoolSize = maximumPoolSize下,該程式碼幾乎不可能執行
                t = addThread(firstTask); 
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
}
void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }


也就是如果執行緒池滿了,而且執行緒池呼叫了shutdown後,還在呼叫execute方法時,就會丟擲上面說明的異常:RejectedExecutionException

再回頭來看下【程式碼段4】中進入到等待佇列後的操作:

if (runState != RUNNING || poolSize == 0)

                   ensureQueuedTaskHandled(command);

這段程式碼是要線上程池執行狀態不是RUNNING或poolSize == 0才會呼叫,他是幹啥呢?

他為什麼會不等於RUNNING呢?外面那一層不是判定了他== RUNNING了麼,其實有時間差就是了,如果是poolSize == 0也會執行這段程式碼,但是裡面的判定條件是如果不是RUNNING,就做reject操作,在第一個執行緒進去的時候,會將第一個執行緒直接啟動起來;很多人也是看這段程式碼很繞,因為不斷的迴圈判定類似的判定條件,你主要記住他們之間有時間差,要取最新的就好了。

此時貌似程式碼看完了?咦,此時有問題了:

1、  等待中的執行緒在後來是如何跑起來的呢?執行緒池是不是有類似Timer一樣的守護程序不斷掃描執行緒佇列和等待佇列?還是利用某種鎖機制,實現類似wait和notify實現的?

2、  執行緒池的執行佇列和等待佇列是如何管理的呢?這裡還沒看出影子呢!

NO,NO,NO!

Java在實現這部分的時候,使用了怪異的手段,神馬手段呢,還要再看一部分程式碼才曉得。

在前面【原始碼段3】中,我們看到了一個方法叫:addThread(),也許很少有人會想到關鍵在這裡,其實關鍵就是在這裡:

我們看看addThread()方法到底做了什麼。

原始碼段6:

private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);
    if (t != null) {
        w.thread = t;
        workers.add(w);
        int nt = ++poolSize;
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}


這裡建立了一個Work,其餘的操作,就是講poolSize疊加,然後將將其放入workers的執行佇列等操作;

我們主要關心Worker是幹什麼的,因為這個threadFactory對我們用途不大,只是做了Thread的命名處理;而Worker你會發現它的定義也是一個Runnable,外部開始在程式碼段中發現了呼叫哪個這個Worker的start()方法,也就是執行緒的啟動方法,其實也就是呼叫了Worker的run()方法,那麼我們重點要關心run方法是如何處理的

原始碼段7:

public void run() {
     try {
         Runnable task = firstTask;
         firstTask = null;
         while (task != null || (task = getTask()) != null) {
             runTask(task);
             task = null;
         }
     } finally {
         workerDone(this);
     }
 }

FirstTask其實就是開始在建立work的時候,由外部傳入的Runnable物件,也就是你自己的Thread,你會發現它如果發現task為空,就會呼叫getTask()方法再判定,直到兩者為空,並且是一個while迴圈體。

那麼看看getTask()方法的實現為:

原始碼段8:

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}


你會發現它是從workQueue佇列中,也就是等待佇列中獲取一個元素出來並返回!

回過頭來根據程式碼段6理解下:

當前執行緒執行完後,在到workQueue中去獲取一個task出來,繼續執行,這樣就保證了執行緒池中有一定的執行緒一直在執行;此時若跳出了while迴圈,只有workQueue佇列為空才會出現或出現了類似於shutdown的操作,自然執行佇列會減少1,當再有新的執行緒進來的時候,就又開始向worker裡面放資料了,這樣以此類推,實現了執行緒池的功能。

這裡可以看下run方法的finally中呼叫的workerDone方法為:

原始碼段9:

void workerDone(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
        if (--poolSize == 0)
            tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

注意這裡將workers.remove(w)掉,並且呼叫了—poolSize來做操作。

至於tryTerminate是做了更多關於回收方面的操作。

最後我們還要看一段程式碼就是在【原始碼段6】中出現的程式碼呼叫為:runTask(task);這個方法也是執行的關鍵。

原始碼段10:

private void runTask(Runnable task) {
       final ReentrantLock runLock = this.runLock;
       runLock.lock();
       try {
           if (runState < STOP &&
               Thread.interrupted() &&
               runState >= STOP)
               thread.interrupt();
 
           boolean ran = false;
           beforeExecute(thread, task);
           try {
               task.run();
               ran = true;
               afterExecute(task, null);
               ++completedTasks;
           } catch (RuntimeException ex) {
               if (!ran)
                   afterExecute(task, ex);
               throw ex;
           }
       } finally {
           runLock.unlock();
       }
   }


你可以看到,這裡面的task為傳入的task資訊,呼叫的不是start方法,而是run方法,因為run方法直接呼叫不會啟動新的執行緒,也是因為這樣,導致了你無法獲取到你自己的執行緒的狀態,因為執行緒池是直接呼叫的run方法,而不是start方法來執行。

這裡有個beforeExecuteafterExecute方法,分別代表在執行前和執行後,你可以做一段操作,在這個類中,這兩個方法都是【空body】的,因為普通執行緒池無需做更多的操作。

如果你要實現類似暫停等待通知的或其他的操作,可以自己extends後進行重寫構造;