1. 程式人生 > >JUC原始碼分析-執行緒池篇(一):ThreadPoolExecutor

JUC原始碼分析-執行緒池篇(一):ThreadPoolExecutor

在多執行緒程式設計中,任務都是一些抽象且離散的工作單元,而執行緒是使任務非同步執行的基本機制。隨著應用的擴張,執行緒和任務管理也變得非常複雜,為了簡化這些複雜的執行緒管理模式,我們需要一個“管理者”來統一管理執行緒及任務分配,這就是執行緒池。本章開始,我們將逐個分析 JUC 框架中幾種不同的執行緒池。首先來認識一下我們的老朋友—ThreadPoolExecutor

概述

ThreadPoolExecutor 是執行緒池的核心實現。執行緒的建立和終止需要很大的開銷,執行緒池中預先提供了指定數量的可重用執行緒,所以使用執行緒池會節省系統資源,並且每個執行緒池都維護了一些基礎的資料統計,方便執行緒的管理和監控。

執行緒池的使用想必大家都很熟悉了,這裡筆者也大概講一下。執行緒池的建立一般由工具類 Executors 來完成,當然我們也可以根據業務需求來定義自己需要的執行緒池。Executors 為執行緒池提供了三種不同的構造,每種構造也都可以自定義執行緒工廠(ThreadFactory):

  • newFixedThreadPool:指定大小的執行緒池,使用 LinkedBlockingQueue 作為等待佇列。
  • newSingleThreadExecutor:只有一個工作執行緒的執行緒池,使用 LinkedBlockingQueue 作為等待佇列。如果內部工作執行緒由於異常而被終止,則會新建一個執行緒替代它的位置。
  • newCachedThreadPool:無容量執行緒池,核心執行緒數為0,工作執行緒空閒60秒後會被自動回收。使用非公平模式的 SynchronousQueue 作為等待佇列(詳見JUC原始碼分析-集合篇(八):SynchronousQueue)。只有在需要時(新任務到來時)才建立新的執行緒,如果有空閒執行緒則會重用。適合執行週期較小的非同步任務。

注意newFixedThreadPool(1, threadFactory) 不等價於 newSingleThreadExecutornewSingleThreadExecutor建立的執行緒池保證內部只有一個執行緒執行任務,並且執行緒數不可擴充套件

;而通過newFixedThreadPool(1, threadFactory)建立的執行緒池可以通過setCorePoolSize方法來修改核心執行緒數。

corePoolSize & maximumPoolSize

核心執行緒數(corePoolSize)和最大執行緒數(maximumPoolSize)是執行緒池中非常重要的兩個概念,希望同學們能夠掌握。 當一個新任務被提交到池中,如果當前執行執行緒小於核心執行緒數(corePoolSize),即使當前有空閒執行緒,也會新建一個執行緒來處理新提交的任務;如果當前執行執行緒數大於核心執行緒數(corePoolSize)並小於最大執行緒數(maximumPoolSize),只有當等待佇列已滿的情況下才會新建執行緒。

等待佇列

任何阻塞佇列(BlockingQueue)都可以用來轉移或儲存提交的任務,執行緒池大小和阻塞佇列相互約束執行緒池:

  1. 如果執行執行緒數小於corePoolSize,提交新任務時就會新建一個執行緒來執行;
  2. 如果執行執行緒數大於或等於corePoolSize,新提交的任務就會入列等待;如果佇列已滿,並且執行執行緒數小於maximumPoolSize,也將會新建一個執行緒來執行;
  3. 如果執行緒數大於maximumPoolSize,新提交的任務將會根據拒絕策略來處理。

下面來看一下三種通用的入隊策略:

  1. 直接傳遞:通過 SynchronousQueue 直接把任務傳遞給執行緒。如果當前沒可用執行緒,嘗試入隊操作會失敗,然後再建立一個新的執行緒。當處理可能具有內部依賴性的請求時,該策略會避免請求被鎖定。直接傳遞通常需要無界的最大執行緒數(maximumPoolSize),避免拒絕新提交的任務。當任務持續到達的平均速度超過可處理的速度時,可能導致執行緒的無限增長。
  2. 無界佇列:使用無界佇列(如 LinkedBlockingQueue)作為等待佇列,當所有的核心執行緒都在處理任務時, 新提交的任務都會進入佇列等待。因此,不會有大於 corePoolSize 的執行緒會被建立(maximumPoolSize 也將失去作用)。這種策略適合每個任務都完全獨立於其他任務的情況;例如網站伺服器。這種型別的等待佇列可以使瞬間爆發的高頻請求變得平滑。當任務持續到達的平均速度超過可處理速度時,可能導致等待佇列無限增長。
  3. 有界佇列:當使用有限的最大執行緒數時,有界佇列(如 ArrayBlockingQueue)可以防止資源耗盡,但是難以調整和控制。佇列大小和執行緒池大小可以相互作用:使用大的佇列和小的執行緒數可以減少CPU使用率、系統資源和上下文切換的開銷,但是會導致吞吐量變低,如果任務頻繁地阻塞(例如被I/O限制),系統就能為更多的執行緒排程執行時間。使用小的佇列通常需要更多的執行緒數,這樣可以最大化CPU使用率,但可能會需要更大的排程開銷,從而降低吞吐量。

拒絕策略

當執行緒池已經關閉或達到飽和(最大執行緒和佇列都已滿)狀態時,新提交的任務將會被拒絕。 ThreadPoolExecutor 定義了四種拒絕策略:

  1. AbortPolicy:預設策略,在需要拒絕任務時丟擲RejectedExecutionException;
  2. CallerRunsPolicy:直接在 execute 方法的呼叫執行緒中執行被拒絕的任務,如果執行緒池已經關閉,任務將被丟棄;
  3. DiscardPolicy:直接丟棄任務;
  4. DiscardOldestPolicy:丟棄佇列中等待時間最長的任務,並執行當前提交的任務,如果執行緒池已經關閉,任務將被丟棄。

我們也可以自定義拒絕策略,只需要實現 RejectedExecutionHandler; 需要注意的是,拒絕策略的執行需要指定執行緒池和佇列的容量。

執行緒池狀態

ThreadPoolExecutor 通過一個 int 型引數 ctl 來控制池狀態,並且封裝了兩個概念欄位:workerCount:表示工作執行緒數,最大為(2^29)-1runState:提供了對池生命週期的控制,包括以下幾種狀態:

  • RUNNING:可以接收新的任務和佇列任務
  • SHUTDOWN:不接收新的任務,但是會執行佇列任務
  • STOP:不接收新任務,也不會執行佇列任務,並且中斷正在執行的任務
  • TIDYING:所有任務都已經終止,workerCount為0,當池狀態為TIDYING時將會執行terminated()方法
  • TERMINATEDterminated函式完成執行。

執行緒池的狀態轉化如下:

執行緒池狀態轉化

鉤子方法

ThreadPoolExecutor 提供了可覆蓋的鉤子方法:beforeExecute、afterExecute 和 terminated,分別在每個任務呼叫之前/之後和池關閉之後執行。這些可以用來操作執行環境,例如,重新初始化 ThreadLocal、資料收集統計、日誌新增等。如果鉤子方法或回撥方法丟擲異常,內部工作執行緒也會失敗並銷燬。此外 ThreadPoolExecutor 也為 ScheduledThreadPoolExecutor 提供了一個專門的鉤子方法onShutdown,用來處理關閉執行緒池時的邏輯,後面我們介紹 ScheduledThreadPoolExecutor 的時候再詳細講解。

資料結構和核心引數

ThreadPoolExecutor 繼承關係

Worker: ThreadPoolExecutor 的內部類,繼承自 AQS,實現了不可重入的互斥鎖。線上程池中持有一個 Worker 集合,一個 Worker 對應一個工作執行緒。當執行緒池啟動時,對應的worker會執行池中的任務,執行完畢後從阻塞佇列裡獲取一個新的任務繼續執行。它本身實現了Runnable介面,也就是說 Worker 本身也作為一個執行緒任務執行。 Worker內部維護了三個變數,用來記錄每個工作執行緒的狀態:

//工作執行緒
final Thread thread;
//初始執行任務
Runnable firstTask;
//任務完成計數
volatile long completedTasks;

核心引數

/**當核心執行緒數已滿,新增任務的儲存佇列*/
private final BlockingQueue<Runnable> workQueue;
/**執行緒執行期間的鎖,在呼叫shutdown和shutdownNow之後依然持有*/
private final ReentrantLock mainLock = new ReentrantLock();
/**工作執行緒池,只有在持有mainLock才儲存*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**awaitTermination的等待條件*/
private final Condition termination = mainLock.newCondition();
/**最大池容量*/
private int largestPoolSize;
/**已完成任務數量*/
private long completedTaskCount;
/**執行緒工廠,所有執行緒都是用它來建立(通過addWorker方法)*/
private volatile ThreadFactory threadFactory;
/**在執行期間呼叫飽和或關閉時的處理*/
private volatile RejectedExecutionHandler handler;
/**空閒執行緒保活時長*/
private volatile long keepAliveTime;
/**預設false,表示core執行緒空閒依然保活;
 * 如果為true,使用keepAliveTime確定等待超時時間*/
private volatile boolean allowCoreThreadTimeOut;
/**核心執行緒池大小
 * 超過核心執行緒數之後提交的任務將被放到等待佇列中
 * */
private volatile int corePoolSize;
/**最大執行緒池大小
 * 如果當前等待佇列任務已滿,繼續提交的任務將繼續建立新的執行緒執行,這個執行緒數最大為maximumPoolSize
 * */
private volatile int maximumPoolSize;
/**預設拒絕策略*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
/**針對shutdown和shutdownNow的執行許可權許可*/
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");

ctl 變數

ctl封裝了兩個概念欄位:workerCount(有效執行緒數)和runState(執行緒池狀態)ctl使用低29位表示執行緒池中的執行緒數,高3位表示執行緒池的執行狀態,取值範圍見下方原始碼:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//任務執行緒數量所佔的int的位數
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//最大任務執行緒數量為2^29-1

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;//對應高三位111
private static final int SHUTDOWN   =  0 << COUNT_BITS;//對應高三位000
private static final int STOP       =  1 << COUNT_BITS;//對應高三位001
private static final int TIDYING    =  2 << COUNT_BITS;//對應高三位010
private static final int TERMINATED =  3 << COUNT_BITS;//對應高三位011

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }//執行狀態
private static int workerCountOf(int c)  { return c & CAPACITY; }//執行的任務執行緒數
private static int ctlOf(int rs, int wc) { return rs | wc; }//封裝執行狀態和任務執行緒

原始碼解析

本章我們主要針對execute方法進行講解,submit方法在之後對 FutureTask 進行解析的時候再詳細分析。

首先來看一下 ThreadPoolExecutor 的建構函式:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

ThreadPoolExecutor 為使用者提供了更廣闊的控制權限,所以理解 ThreadPoolExecutor 中每個引數的涵義可以使我們更加得心應手的根據業務需求制定我們自己的執行緒池。

下面我們將從方法execute開始,逐步深入解析 ThreadPoolExecutor。

execute(Runnable command)

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
  
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))//新增到工作執行緒
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {//池正在執行,新增到等待佇列
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))//池狀態>=SHUTDOWN,移除任務,執行拒絕策略
            reject(command);
        else if (workerCountOf(recheck) == 0)//工作執行緒為空,新增新的工作執行緒
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

說明:提交一個任務到執行緒池,任務不一定會立即執行。提交的任務可能在一個新的執行緒中執行,也可能在已經存在的空閒執行緒中執行。如果由於池關閉或者池容量已經飽和導致任務無法提交,那麼就根據拒絕策略RejectedExecutionHandler處理提交過來的任務。execute的執行分三種情況:

  1. 如果正在執行執行緒少於corePoolSize,通過addWorker方法嘗試開啟一個新的執行緒並把提交的任務作為它的firstTask執行。addWorker會檢查ctl狀態的狀態(runStateworkerCount)來判斷是否可以新增新的執行緒。

  2. 如果addWorker執行失敗(返回false),就把任務新增到等待佇列。這裡需要對ctl進行雙重檢查,因為從任務入隊到入隊完成可能有執行緒死掉,或者在進入此方法後執行緒池被關閉。所以我們要在入隊後重新檢查池狀態,如果有必要,就回滾入隊操作。

  3. 如果任務不能入隊,我們再次嘗試增加一個新的執行緒。如果新增失敗,就意味著池被關閉或已經飽和,這種情況就需要根據拒絕策略來處理任務。

addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) {
    //自旋,判斷可以新增執行緒的前提條件
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);//獲取runState

        // Check if queue empty only if necessary.
        //檢查池狀態
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {//檢查工作執行緒數是否飽和
            int wc = workerCountOf(c);//獲取工作執行緒數
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))//可以新增新的執行緒,遞增ctl,跳出retry自旋
                break retry;
            //更新ctl失敗,重讀ctl繼續迴圈檢查
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //建立新的工作執行緒
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();//加鎖,準備新增新的工作執行緒
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                //重新檢查runState
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();//執行緒已經被啟動,丟擲異常
                    workers.add(w);//新增工作執行緒
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;//更新最大池容量
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {//新增成功,啟動執行緒
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);//新增失敗,回滾操作
    }
    return workerStarted;
}

說明addWorker用來嘗試新增一個新的工作執行緒。在任務提交(execute方法)、更新核心執行緒數(setCorePoolSize方法)、預啟動執行緒(prestartCoreThread方法)中都有用到。函式執行邏輯如下:

首先檢查當前池狀態和給定界限中(核心執行緒數或最大執行緒數)是否可以新增新工作執行緒。如果可以,需要對workercount做出相應調整。新增完畢後,啟動給定任務firstTask。如果執行緒池已停止或正在關閉或Threadfactory建立執行緒失敗返回false。 最後,如果由於Threadfactory返回null或建立執行緒過程中丟擲異常導致工作執行緒建立失敗,則呼叫addWorkerFailed回滾新增工作執行緒操作。addWorkerFailed原始碼如下:

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();//workerCount-1
        tryTerminate();//嘗試終止執行緒池
    } finally {
        mainLock.unlock();
    }
}

Worker.runWorker(Worker w)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //任務執行緒的鎖狀態預設為-1,此時解鎖+1,變為0,即鎖開啟狀態,允許中斷,在任務未執行之前不允許中斷。
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;//工作執行緒是否因異常而退出
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();//加鎖
            
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();//中斷執行緒
            try {
                beforeExecute(wt, task);//執行前邏輯,自定義實現
                Throwable thrown = null;
                try {
                    task.run();//執行任務
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);//執行後邏輯,自定義實現
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //處理工作執行緒退出邏輯
        processWorkerExit(w, completedAbruptly);
    }
}

說明runWorker是工作執行緒執行的核心方法,迴圈從佇列中獲取任務並執行。 工作執行緒啟動後,會首先執行內部持有的任務firstTask,如果firstTasknull,則迴圈呼叫getTask方法從佇列中獲取任務執行。在任務執行前後可呼叫beforeExecuteafterExecute處理執行前後的邏輯,這兩個方法線上程池中都是空方法,可根據業務需求自定義實現。 如果執行緒池正在停止(stopping),需要確保執行緒被中斷;否則的話需要確保執行緒沒有被中斷。這裡針對兩種情況需要進行復查,以處理在清除中斷時的shutdownNow事件。

任務獲取方法getTask原始碼如下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);//獲取runState

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//執行緒池已經關閉或等待佇列為null
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);//獲取工作執行緒數workerCount

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//是否允許超時

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;//修改ctl失敗,繼續迴圈
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //等待指定超時時間
                workQueue.take();//出隊,等待直到元素可用
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {

            timedOut = false;
        }
    }
}

說明:獲取等待佇列中的任務,基於當前執行緒池的配置來決定執行任務阻塞、等待或返回null。在以下四個情況下會引起worker退出,並返回null

  1. 工作執行緒數大於maximumPoolSize
  2. 執行緒池已停止(STOP)
  3. 執行緒池已關閉(SHUTDOWN)並且等待佇列為空
  4. 工作執行緒等待任務超時

由於上述條件返回null後,需要遞減workerCount

回到runWorker方法,當工作執行緒處理完所有的任務之後,會呼叫processWorkerExit處理工作執行緒退出的邏輯,原始碼如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        //如果任務執行緒被中斷,則工作執行緒數量減1
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;//更新完成任務數
        workers.remove(w);//移除工作執行緒
    } finally {
        mainLock.unlock();
    }

    tryTerminate();//嘗試終止執行緒池

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {//執行緒池尚未完全停止
        if (!completedAbruptly) {//工作執行緒非異常退出
            //獲取當前核心執行緒數
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                //如果允許空閒工作執行緒等待任務,且任務佇列不為空,則min為1
                min = 1;
            if (workerCountOf(c) >= min)//工作執行緒數大於核心執行緒數,直接返回
                return; // replacement not needed
        }
        addWorker(null, false);//繼續嘗試新增新的工作執行緒
    }
}

說明:工作執行緒處理完所有的任務之後,呼叫此方法處理工作執行緒退出邏輯。為已經死亡的工作執行緒執行相關的清除操作。此方法會從執行緒池內的工作執行緒集合(workers)中移除當前工作執行緒,並會嘗試終止執行緒池。 在下面幾種情況下,可能會替換當前工作執行緒:

  1. 使用者任務執行異常導致執行緒退出
  2. 工作執行緒數少於corePoolSize
  3. 等待佇列不為空但沒有工作執行緒

tryTerminate()

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) || //正在執行
            runStateAtLeast(c, TIDYING) || //狀態大於TIDYING
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //狀態為shutdown並且等待佇列不為空
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);//中斷空閒執行緒
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //執行緒池已經關閉,等待佇列為空,並且工作執行緒等於0,更新池狀態為TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();//執行緒池終止後操作,需自定義實現
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();//喚醒等待池結束的執行緒
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

說明tryTerminate用於嘗試終止執行緒池,在shutdow()、shutdownNow()、remove()中均是通過此方法來終止執行緒池。此方法必須在任何可能導致終止的行為之後被呼叫,例如減少工作執行緒數,移除佇列中的任務,或者是在工作執行緒執行完畢後處理工作執行緒退出邏輯的方法processWorkerExit。 如果執行緒池可被終止(狀態為SHUTDOWN並且等待佇列和池任務都為空,或池狀態為STOP且池任務為空),呼叫此方法轉換執行緒池狀態為TERMINATED。 如果執行緒池可以被終止,但是當前工作執行緒數大於0,則呼叫interruptIdleWorkers方法先中斷一個空閒的工作執行緒,用來保證池關閉操作繼續向下傳遞。interruptIdleWorkers原始碼如下:

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

shutdown()

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//檢查關閉許可權
        advanceRunState(SHUTDOWN);//修改執行狀態runState
        interruptIdleWorkers();//中斷空閒工作執行緒
        //為ScheduledThreadPoolExecutor提供的關閉鉤子程式
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();//銷燬執行緒池
}

說明:啟動一個有序的關閉方式,在關閉之前已提交的任務會被執行,但不會接收新任務。此方法不會等待已提交任務執行完畢(通過awaitTermination方法可以等待任務完成之後再關閉)。方法內部的呼叫在上面都已經介紹過,不多贅述。

shutdownNow()

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//檢查關閉許可權
        advanceRunState(STOP);//修改執行狀態runState
        interruptWorkers();//中斷所有執行緒
        tasks = drainQueue();//移除所有等待佇列的任務
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

說明:停止執行緒池內所有的任務(包括正在執行和正在等待的任務),並返回正在等待執行的任務列表。此任務不會等待活躍任務(正在執行的任務)執行完畢之後再關閉(通過awaitTermination方法可以等待任務完成之後再關閉)。

注意:此方法並不能保證一定會停止每個任務,因為我們是通過Thread.interrupt來中斷執行緒,如果中斷失敗,就可能無法終止執行緒池。

shutdcown 和 shutdownNow的區別

  • shutdown 會把當前池狀態改為SHUTDOWN,表示還會繼續執行池內已經提交的任務,然後中斷所有的空閒工作執行緒 ;但 shutdownNow 直接把池狀態改為STOP,也就是說不會再執行已存在的任務,然後會中斷所有工作執行緒

awaitTermination(long timeout, TimeUnit unit)

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {//自旋等待任務完成或超時
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);//等待給定的超時時間
        }
    } finally {
        mainLock.unlock();
    }
}

說明awaitTermination一般是用來配合shutdown來使用。在對執行緒池傳送一個shutdown請求後開始阻塞,直到所有任務都完成執行/超時/執行緒被中斷才返回。如果在等待時間內執行緒池終止(TERMINATED)就返回true,如果等待超時後執行緒池還未終止就返回false

到此,ThreadPoolExecutor 中幾個比較重要的方法就講完了,如果同學們對此原始碼解析有任何疑問,歡迎大家在評論中提出。

小結

本章重點:執行緒池裡面的引數涵義,池狀態的轉換,也要了解JUC中的各種阻塞佇列

作者:泰迪的bagwell 連結:https://www.jianshu.com/p/7be43712ef21 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。