1. 程式人生 > >執行緒池執行原理分析

執行緒池執行原理分析

要想分析透徹整個執行緒池執行的邏輯,是個龐雜的工程,牽扯到執行緒池生命週期管理,佇列管理,拒絕策略,調配邏輯等等.這裡只是從一個Runnable任務釋出到執行緒池中以後,執行緒池內部的執行邏輯角度去嘗試分析.

先貼出整理的執行緒池操作流程圖 , 然後開始追原始碼:

執行緒池執行流程圖.png

執行緒數量控制策略

ThreadPoolExecutor是執行緒池的實現類,無論是自定義執行緒池,還是使用系統提供的執行緒池,都會使用到這個類.通過類的execute(Runnable command)方法來執行Runnable任務.
那麼一旦將一個Runnable任務execute()以後,到底發生了什麼? 直接看程式碼

/**
 * 將該Runnable任務加入執行緒池並在未來某個時刻執行
 * 該任務可能執行在一個新的執行緒 或 一個已存在的執行緒池中的執行緒
 * 如果該任務提交失敗,可能是因為執行緒池已關閉,或者已達到執行緒池佇列和執行緒數已滿.
 * 該Runnable將交給RejectedExecutionHandler處理,丟擲RejectedExecutionException
 */
public void execute(Runnable command) {
    if (command == null){
        //如果沒傳入Runnable任務,則丟擲空指標異常
        throw
new NullPointerException(); } int c = ctl.get(); //當前執行緒數 小於 核心執行緒數 if (workerCountOf(c) < corePoolSize) { //直接開啟新的執行緒,並將Runnable傳入作為第一個要執行的任務,成功返回true,否則返回false if (addWorker(command, true)){ return; } c = ctl.get(); } //c < SHUTDOWN代表執行緒池處於RUNNING狀態 + 將Runnable新增到任務佇列,如果新增成功返回true失敗返回false
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //成功加入佇列後,再次檢查是否需要新增新執行緒(因為已存在的執行緒可能在上次檢查後銷燬了,或者執行緒池在進入本方法後關閉了) if (! isRunning(recheck) && remove(command)){ //如果執行緒池處於非RUNNING狀態 並且 將該Runnable從任務佇列中移除成功,則拒絕執行此任務 //交給RejectedExecutionHandler呼叫rejectedExecution方法,拒絕執行此任務 reject(command); }else if (workerCountOf(recheck) == 0){ //如果執行緒池執行緒數量為0,則建立一條新執行緒,去執行 addWorker(null, false); } }else if (!addWorker(command, false)) //如果執行緒池處於非RUNNING狀態 或 將Runnable新增到佇列失敗(佇列已滿導致),則執行預設的拒絕策略 reject(command); }

整理流程如下:
1. 如果執行緒池中的執行緒數量少於corePoolSize(核心執行緒數量),那麼會直接開啟一個新的核心執行緒來執行任務,即使此時有空閒執行緒存在.
2. 如果執行緒池中執行緒數量大於等於corePoolSize(核心執行緒數量),那麼任務會被插入到任務佇列中排隊,等待被執行.此時並不新增新的執行緒.
3. 如果在步驟2中由於任務佇列已滿導致無法將新任務進行排隊,這個時候有兩種情況:

  • 執行緒數量 [未] 達到maximumPoolSize(執行緒池最大執行緒數) , 立刻啟動一個非核心執行緒來執行任務.
  • 執行緒數量 [已] 達到maximumPoolSize(執行緒池最大執行緒數) , 拒絕執行此任務.ThreadPoolExecutor會通過RejectedExecutionHandler,丟擲RejectExecutionException異常.

以上就是一旦將一個Runnable任務execute()以後,執行的一系列邏輯,理解起來並不難,下面再對其中呼叫的一些方法做一些追查,就更方便理解其中的執行邏輯.

執行緒數量及執行緒池狀態管理

我們發現在execute()方法中頻繁的執行這句c = ctl.get();程式碼,那麼這ctl是什麼,get()方法獲取到的是什麼,獲取到的c又用來做什麼?

上原始碼:

//建立AtomicInteger物件
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3 = 29
//最大執行緒容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //將1的二進位制向右位移29位,再減1

//執行狀態儲存在int值的高3位 (所有數值左移29位)
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

//執行狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//執行緒數量
private static int workerCountOf(int c)  { return c & CAPACITY; }
//是否正在執行
private static boolean isRunning(int c) { return c < SHUTDOWN;}

以上程式碼中的資訊整理如下:

  • clt是一個AtomicInteger物件,(提供原子操作進行Integer的使用,適用於高併發場景.該AtomicInteger的value可以自動重新整理,確保在高併發環境下的唯一性.),而ctl.get()獲取的就是該value值.
  • 執行緒池用一個AtomicInteger來儲存 [執行緒數量] 和 [執行緒池狀態] ,一個int數值一共有32位,高3位用於儲存執行狀態,低29位用於儲存執行緒數量
  • 系統預設的執行緒容量就是(2^29)-1 , 大約5億條執行緒-_-!

所以由此得知 :
頻繁的呼叫c = ctl.get();是為了獲取該AtomicInteger的最新值,進而通過位運算獲取執行緒池的最新執行狀態,執行緒數量.

[執行緒池狀態]:

  • RUNNING: 接收新任務,並執行佇列中的任務
  • SHUTDOWN: 不接收新任務,但是執行佇列中的任務
  • STOP: 不接收新任務,不執行佇列中的任務,中斷正在執行中的任務
  • TIDYING: 所有的任務都已結束,執行緒數量為0,處於該狀態的執行緒池即將呼叫terminated()方法
  • TERMINATED: terminated()方法執行完成

新執行緒的建立

在execute()方法中獲知通過addWorker()方法來新增新執行緒,那麼到底是如何新增和管理的?
開始追原始碼,一看究竟.

/**
 * 往執行緒池中新增Worker物件
 * @param  firstTask 執行緒中第一個要執行的任務 
 * @param  core      是否為核心執行緒
 * @return           新增是否成功
 */
 private boolean addWorker(Runnable firstTask, boolean core) {
    //這裡有兩層[死迴圈],外迴圈:不停的判斷執行緒池的狀態
    retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //一系列判斷條件:執行緒池關閉,Runnable為空,佇列為空,則直接return false,代表Runnable新增失敗
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
                return false;
            }

            //內迴圈:不停的檢查執行緒容量        
            for (;;) {
                int wc = workerCountOf(c);
                //超過執行緒數限制,則return false
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
                    return false;
                }
                //★ 新增執行緒成功,則直接跳出兩層迴圈,繼續往下執行.
                //注意:這裡只是把執行緒數成功新增到了AtomicInteger記錄的執行緒池數量中,真正的Runnable新增,在下面的程式碼中進行
                if (compareAndIncrementWorkerCount(c)){
                    break retry;
                }
                //再次判斷執行緒池最新狀態,如果狀態改變了(內迴圈和外迴圈記錄的狀態不符),則重新開始外層死迴圈
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs){
                    continue retry;
                }
            }
        }

    //結束迴圈之後,開始真正的建立執行緒.
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //建立一個Worker物件,並將Runnable當做引數傳入
        w = new Worker(firstTask);
        //從worker物件中取出執行緒
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //拿到鎖
            mainLock.lock();
            try {
                //再次檢查執行緒池最新狀態
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    //檢查準備執行Runnable的Thread的狀態,如果該Thread已處於啟動狀態,則丟擲狀態異常(因為目前還沒啟動呢)
                    if (t.isAlive()){
                        throw new IllegalThreadStateException();
                    } 
                    //將新建立的worker,新增到worker集合
                    workers.add(w);
                    ...
                    workerAdded = true;
                }
            } finally {
                //釋放鎖
                mainLock.unlock();
            }
            if (workerAdded) {
                //★Thread開始啟動
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //新增worker失敗
        if (! workerStarted){
            addWorkerFailed(w);
        }
    }
    return workerStarted;
}

總結:
1. 先判斷執行緒池狀態和執行緒池中執行緒的容量,如果滿足執行緒新增的條件,則先把AtomicInteger中記錄的執行緒數量+1.然後再進行執行緒新增的工作.
2. 建立worker物件,並將Runnable作為引數傳遞進去,並從worker中取出Thread物件,進行一系列條件判斷後.開啟Thread的start()方法,執行緒開始執行.所以worker物件中必然包含了一個Thread和一個要被執行的Runnable.

那麼接下來繼續追原始碼,印證下第二點的推斷,看看Worker到底幹了什麼.

Worker類

//ThreadPoolExecutor的內部finial類
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    //當前worker要執行任務所用的執行緒(如果建立失敗,則可能是null)
    final Thread thread;
    //第一個要執行的任務(可能是null)
    Runnable firstTask;
    //當前執行緒執行完的任務總數
    volatile long completedTasks;

    //通過構造傳入Runnable任務
    Worker(Runnable firstTask) {
        ...
        this.firstTask = firstTask;
        //通過ThreadFactory()建立新執行緒
        this.thread = getThreadFactory().newThread(this);
    }

    //呼叫外部類runWorker()方法
    public void run() {
        runWorker(this);
    }
    ...
}

worker類中的內部實現也印證了我們的推斷:

  • 每個worker,都是一條執行緒,同時裡面包含了一個firstTask,即初始化時要被首先執行的任務.
  • 最終執行任務的,是runWorker()方法

執行緒的複用

繼續追runWorker()方法的原始碼

//ThreadPoolExecutor的final類,該方法由內部類Worker的run()方法呼叫
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //取出Worker物件中的Runnable任務
    Runnable task = w.firstTask;
    boolean completedAbruptly = true;
    ...
    try {
        //★注意這個while迴圈,在這裡實現了 [執行緒複用]
        while (task != null || (task = getTask()) != null) {
            //上鎖
            w.lock();
            //檢查Thread狀態的程式碼
            ...
            try {
                ...
                try {
                    //執行Worker中的Runnable任務
                    task.run();
                } catch (...) {
                   ...catch各種異常
                } 
            } finally {
                //置空任務(這樣下次迴圈開始時,task依然為null,需要再通過getTask()取) + 記錄該Worker完成任務數量 + 解鎖
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        //該執行緒已經從佇列中取不到任務了,改變標記
         completedAbruptly = false;
    } finally {
        //執行緒移除
        processWorkerExit(w, completedAbruptly);
    }
}

通過上面的原始碼,發現通過一個while迴圈,不斷的getTask()取任務出來執行,以這種方式實現了執行緒的複用.

執行緒複用邏輯整理如下:
1. 如果task不為空,則開始執行task
2. 如果task為空,則通過getTask()再去取任務,並賦值給task,如果取到的Runnable不為空,則執行該任務
3. 執行完畢後,通過while迴圈繼續getTask()取任務
4. 如果getTask()取到的任務依然是空,那麼整個runWorker()方法執行完畢

上面只是從getTask()方法名和其返回值來猜測此方法的作用,下面就繼續追原始碼,來證實和研究getTask()到底是怎麼取任務的,從哪取,怎麼取.

getTask()

private Runnable getTask() {
    ...
    for (;;) {
        ...
        // 如果執行緒池已關閉 或 任務佇列為空,則AtomicInteger中記錄的執行緒數量-1,並return null,結束本方法
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        //獲取當前執行緒池中的匯流排程數
        int wc = workerCountOf(c);
        //allowCoreThreadTimeOut引數是使用者自行設定的(預設false),用來設定:是否允許核心執行緒有超時策略
        //條件1:核心執行緒超時 條件2:當前執行緒數 > 核心執行緒數,滿足任何一個條件則timed標記為true 
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //超過最大執行緒數 或 超時 或 任務佇列為空...  執行緒數量-1 + return null
        ...
        try {
            //根據timed標記,使用不同的方式(限時等待 or 阻塞)從BlockingQueue<Runnable> workQueue 佇列中取任務
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null){
                //如果取到了,就將Runnable返回
                return r;
            }
            //如果沒取到,則重新for迴圈
            ...
        }
    }
}

將以上原始碼中的資訊整理如下:

  • 執行緒池使用BlockingQueue來管理整個執行緒池中的Runnable任務,變數workQueue存放的都是待執行的任務
  • BlockingQueue是個阻塞佇列,BlockingQueue.take()方法如果得到的是空,則進入等待狀態,直到BlockingQueue有新的物件被加入時,才可以正常將Runnable取出並返回,執行緒開始正常運轉,正常執行Runnable任務。
/**
 * 先進先出的阻塞佇列
 */
public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 檢索並移除佇列的頂部元素,如果該元素不可用則等待,直至元素可用
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     *
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting
     */
    E take()
    ...
}

讓我們整理一下上面幾段原始碼的邏輯順序:
1. execute()方法執行之後,進行一系列的邏輯判斷來控制執行緒池中的執行緒數量,並通過addWorker()方法建立新執行緒
2. 一旦Worker裡的Thread開始start()之後,執行的其實是Worker裡的run()方法,run()方法呼叫runWorker(Worker w)方法.
3. 在runWorker()方法裡面通過getTask()方法不停的取workQueue佇列中的任務來執行,如果取到了就執行,如果沒取到就等待.

結論:

  • 一旦一個執行緒開啟之後,會一直執行下去,直至任務佇列中的任務執行完畢,達成了執行緒的複用
  • 以Runnable佇列為目標的worker雖然是序列操作,但是由於可以通過addWorker()新增多個worker,並且多個worker取的是同一個BlockingQueue中的Runnable,所以就實現了並行處理.

執行緒的移除

在runWorker()方法中有如下程式碼:

final void runWorker(Worker w) {
    boolean completedAbruptly = true;
    ...
    try {
        while (getTask()...) {
            ...
            處理任務
        }
        //該執行緒已經從佇列中取不到任務了,改變標記,該標記表示:該執行緒是否因使用者因素導致的異常而終止
         completedAbruptly = false;
    } finally {
        //執行緒移除
        processWorkerExit(w, completedAbruptly);
    }
}

processWorkerExit這裡用來將worker從worker集合中移除,步驟如下:
1. 先移除傳入的Worker(執行緒)
2. 判斷執行緒池裡的最少執行緒數,如果最少執行緒數為0條,但是佇列裡依然有任務未執行完畢.那麼必須確保執行緒池中至少有1條執行緒.(將最小執行緒數置為1)
3. 如果當前執行緒數 > 最小執行緒數,本方法結束,不再往下執行
4. 否則新增一條新執行緒,來替代當前執行緒,繼續去執行佇列中的任務.

/**
 * @param w the worker 執行緒
 * @param completedAbruptly 該執行緒是否因使用者因素導致的異常而終止
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    ...
    try {
        //記錄該執行緒完成任務的總數
        completedTaskCount += w.completedTasks;
        //從worker集合中移除本worker(執行緒)
        workers.remove(w);
    }
    ...
    //如果在runWoker()中正常執行任務完畢,這裡completedAbruptly傳入的就是false
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        //如果執行緒池裡最少執行緒數為0,但是此時任務佇列裡依然還有任務
        if (min == 0 && ! workQueue.isEmpty()){
            //那麼必須保留一條執行緒,所以將最小值設定為1
            min = 1;
        }
        //如果當前執行緒數>= 最小執行緒數,則直接return
        if (workerCountOf(c) >= min){
            return; 
        }
    }
    //否則新增一條新執行緒,來替代當前執行緒,繼續去執行佇列中的任務.
    addWorker(null, false);
}

這次原始碼分析就先到這裡,一路從execute()開始,走到執行緒移除.其實執行緒池裡面涉及到的問題很多,以後有時間再慢慢研究.