1. 程式人生 > >徹底弄懂 Java 執行緒池原理

徹底弄懂 Java 執行緒池原理

概述

這篇文章是我在閱讀原始碼時整理的一些筆記,對原始碼的關鍵點進行了比較詳細的註釋,然後加上一些自己對執行緒池機制的理解。最終目的是要弄清楚下面這些問題:

  • 執行緒池有 execute() 和 submit() 方法,執行機制分別是什麼?
  • 如何新建執行緒?
  • 任務如何執行?
  • 執行緒如何銷燬?超時機制如何實現?

首先需要介紹一下執行緒池的兩個重要成員:

ctl

AtomInteger 型別。高3位儲存執行緒池狀態,低29位儲存當前執行緒數量。workerCountOf(c) 返回當前執行緒數量。runStateOf(c) 返回當前執行緒池狀態。 執行緒池有如下狀態:

  • RUNNING:接收新任務,處理佇列任務。
  • SHUTDOWN:不接收新任務,但處理佇列任務。
  • STOP:不接收新任務,也不處理佇列任務,並且中斷所有處理中的任務。
  • TIDYING:所有任務都被終結,有效執行緒為0。會觸發terminated()方法。
  • TERMINATED:當terminated()方法執行結束。

Worker

這個執行緒線上程池中的包裝類。一個 Worker 代表一個執行緒。執行緒池用一個 HashSet 管理這些執行緒。

需要注意的是,Worker 本身並不區分核心執行緒和非核心執行緒,核心執行緒只是概念模型上的叫法,特性是依靠對執行緒數量的判斷來實現的

Worker 特性如下:

  • 繼承自 AQS,本身實現了一個最簡單的不公平的不可重入鎖。
  • 構造方法傳入 Runnable,代表第一個執行的任務,可以為空。構造方法中新建一個執行緒。
  • 實現了 Runnable 介面,在新建執行緒時傳入 this。因此執行緒啟動時,會執行 Worker 本身的 run 方法。
  • run 方法呼叫了 ThreadPoolExecutor 的 runWorker 方法,負責實際執行任務。

submit() 方法的執行機制

submit 返回一個 Future 物件,我們可以呼叫其 get 方法獲取任務執行的結果。程式碼很簡單,就是將 Runnable 包裝成 FutureTask 而已。可以看到,最終還是呼叫 Execute 方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
複製程式碼

FutureTask 的程式碼就不貼了,簡述一下原理:

  • FutureTask 實現了 RunnableFuture 介面,RunnableFuture 繼承自Runnable。執行任務時會呼叫 FutureTask 的 run 方法,run 方法中執行真正的任務程式碼,執行完後呼叫 set 方法設定結果。
  • 如果任務執行完畢,get 方法會直接返回結果,如果沒有,get 方法會阻塞並等待結果。
  • set 方法中設定結果後會取消阻塞,使 get 方法返回結果。

execute() 方法的執行機制

這個機制大家應該都很熟了,再簡述一遍:

  1. 工作執行緒數小於核心執行緒數時,直接新建核心執行緒執行任務;
  2. 大於核心執行緒數時,將任務新增進等待佇列;
  3. 佇列滿時,建立非核心執行緒執行任務;
  4. 工作執行緒數大於最大執行緒數時,拒絕任務

具體的程式碼分析如下:

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //小於核心執行緒數
    if (addWorker(command, true)) //啟動核心執行緒並執行任務
        return;
    c = ctl.get(); //執行失敗時重新獲取值
}
if (isRunning(c) && workQueue.offer(command)) { //檢查執行狀態並將任務新增到佇列
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command)) //重新檢查,防止狀態有變化。如果有,移出佇列並拒絕任務
        reject(command);
    else if (workerCountOf(recheck) == 0) //如果執行緒數為0,建立非核心執行緒,第一個引數為空時會從佇列中取任務執行
        addWorker(null, false);
}
else if (!addWorker(command, false)) //新增到佇列失敗,說明佇列已滿,建立非核心執行緒執行任務
    reject(command); //執行失敗說明達到最大執行緒數,拒絕任務
複製程式碼

新任務如何新增進佇列?

執行緒池使用 addWorker 方法新建執行緒,第一個引數代表要執行的任務,執行緒會將這個任務執行完畢後再從佇列取任務執行。第二引數是核心執行緒的標誌,它並不是 Worker 本身的屬性,在這裡只用來判斷工作執行緒數量是否超標。

這個方法可以分成兩部分,第一部分進行一些前置判斷,並使用迴圈 CAS 結構將執行緒數量加1。程式碼如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry: //這個語法不常用,用於給外層 for 迴圈命名。方便巢狀 for 迴圈中,breakcontinue 指定是外層還是內層迴圈
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // firstTask 不為空代表這個方法用於新增任務,為空代表新建執行緒。SHUTDOWN 狀態下不接受新任務,但處理佇列中的任務。這就是第二個判斷的邏輯。
        if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;
        
        // 使用迴圈 CAS 自旋,增加執行緒數量直到成功為止
        for (;;) {
        int wc = workerCountOf(c);
        //判斷是否超過執行緒容量
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        //使用 CAS 將執行緒數量加1
        if (compareAndIncrementWorkerCount(c))
            break retry;
        //修改不成功說明執行緒數量有變化
        //重新判斷執行緒池狀態,有變化時跳到外層迴圈重新獲取執行緒池狀態
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        //到這裡說明狀態沒有變化,重新嘗試增加執行緒數量
        }
    }
    ... ...
}
複製程式碼

第二部分負責新建並啟動執行緒,並將 Worker 新增至 Hashset 中。程式碼很簡單,沒什麼好註釋的,用了 ReentrantLock 確保執行緒安全。

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s; //這個引數是測試用的,不用管它
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w); //新增失敗時移除 Worker 並將執行緒數量減 1
}
return workerStarted;
}
複製程式碼

任務如何執行?

在 addWorker 方法中,執行緒會被啟動。新建執行緒時,Worker 將自身傳入,所以執行緒啟動後會執行 Worker 的 run 方法,這個方法呼叫了 ThreadPoolExecutor 的 runWorker 方法執行任務,runWorker 中會迴圈取任務執行,執行邏輯如下:

  • 如果 firstTask 不為空,先執行 firstTask,執行完畢後置空;
  • firstTask 為空後呼叫 getTask() 從佇列中取任務執行;
  • 一直執行到沒有任務後,退出 while 迴圈
  • 呼叫 processWorkerExit() 方法,將 Worker 移除出 HashSet,此時執行緒執行完畢,也不再被引用,會自動銷燬。

具體程式碼分析如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    //task 為我們傳給 execute 的任務。task 為空時從佇列中取任務執行
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //這段邏輯非常繞。實際上它實現了以下邏輯:
            //1.如果執行緒池已停止且執行緒未中斷,條件成立,中斷執行緒
            //2.如果執行緒池未停止,執行緒為中斷狀態,將執行緒狀態重置,並重新進行1的判斷
            //3.如果執行緒池未停止,執行緒不為中斷狀態,條件不成立
            //Thread.interrupted() 會重置中斷狀態,保證
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            //beforeExecute 和 afterExecute 為空方法,交給子類實現
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); //執行任務
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //執行到這裡時說明執行緒執行完畢,此方法將執行緒從 HashSet 中移出。執行緒終止且沒有引用,會被自動回收。
        processWorkerExit(w, completedAbruptly);
    }
}
複製程式碼

執行緒如何銷燬?超時機制如何實現?

在 runWorker 方法中 getTask 方法返回 null 之後會導致執行緒執行完畢,被移除出 HashSet,從而被系統銷燬。 執行緒的超時機制也是在這個方法實現的,藉助於 BlockingQueue 的 poll 和 take 方法。

  • poll 方法可以設定一個超時時間,當佇列為空時,在此時間內阻塞等待,超時後返回 null
  • take 方法在佇列為空時直接丟擲異常

超時機制實現原理如下:

  • 當 allowCoreThreadTimeOut 為 true,所有執行緒都會超時,全部呼叫 poll 方法,傳入 keepAliveTime 引數。
  • 當 allowCoreThreadTimeOut 為 false 時,如果工作執行緒數量大於核心執行緒數,將此執行緒當作非核心執行緒處理,呼叫 poll 方法
  • 當 allowCoreThreadTimeOut 為 false 且工作執行緒數量小於等於核心執行緒數時,將此執行緒當作核心執行緒處理,呼叫 take 方法,佇列為空時丟擲異常,進入下一次迴圈。如果佇列一直為空,核心執行緒會一直在此迴圈等待任務進行處理。

具體程式碼如下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
    
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
    
        int wc = workerCountOf(c);
    
        // 允許核心執行緒超時或者執行緒數大於核心執行緒
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
        // timed && timedOut 這兩個引數結合起來控制超時機制
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
    
        try {
            // 佇列為空時,poll 方法會阻塞等待,超過 keepAliveTime 時返回空值。take 方法會直接返回異常。
            // 當 allowCoreThreadTimeOut 為 true 時,核心執行緒和非核心執行緒沒有區別,一律呼叫poll方法
            // 當 allowCoreThreadTimeOut 為 false 時,執行緒數量超過核心執行緒數才會進入超時機制,如果不超過,則將當前執行緒當作核心執行緒處理,呼叫 take,丟擲異常後進入下一次迴圈。如果佇列為空,此處會一直迴圈。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
複製程式碼