1. 程式人生 > >聊聊高併發(四十)解析java.util.concurrent各個元件(十六) ThreadPoolExecutor原始碼分析

聊聊高併發(四十)解析java.util.concurrent各個元件(十六) ThreadPoolExecutor原始碼分析

ThreadPoolExecutor是Executor執行框架最重要的一個實現類,提供了執行緒池管理和任務管理是兩個最基本的能力。這篇通過分析ThreadPoolExecutor的原始碼來看看如何設計和實現一個基於生產者消費者模型的執行器。

生產者消費者模型

生產者消費者模型包含三個角色:生產者,工作佇列,消費者。對於ThreadPoolExecutor來說,

1. 生產者是任務的提交者,是外部呼叫ThreadPoolExecutor的執行緒

2. 工作佇列是一個阻塞佇列的介面,具體的實現類可以有很多種。BlockingQueue<Runnable> workQueue;

3. 消費者是封裝了執行緒的Worker類的集合。HashSet<Worker> workers = new HashSet<Worker>();

主要屬性

明確了ThreadPoolExecutor的基本執行模型之後,來看下它的幾個主要屬性:

1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    一個32位的原子整形作為執行緒池的狀態控制描述符。低29位作為工作者執行緒的數量。所以工作者執行緒最多有2^29 -1個。高3位來保持執行緒池的狀態。ThreadPoolExecutor總共有5種狀態:

     *   RUNNING:  可以接受新任務並執行
     *   SHUTDOWN: 不再接受新任務,但是仍然執行工作佇列中的任務
     *   STOP:     不再接受新任務,不執行工作佇列中的任務,並且中斷正在執行的任務
     *   TIDYING:  所有任務被終止,工作執行緒的數量為0,會去執行terminated()鉤子方法
     *   TERMINATED: terminated()執行結束

下面是一系列ctl這個變數定義和工具方法

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

2. private final BlockingQueue<Runnable> workQueue; 工作佇列,採用了BlockingQueue阻塞佇列的介面,具體實現類可以按照不同的策略來選擇,比如有邊界的ArrayBlockingQueue,無邊界的LinkedBlockingQueue。

3. private final ReentrantLock mainLock = new ReentrantLock();  控制ThreadPoolExecutor的全域性可重入鎖,所有需要同步的操作都要被這個鎖保護

4. private final Condition termination = mainLock.newCondition(); mainLock的條件佇列,來進行wait()和notify()等條件操作

5. private final HashSet<Worker> workers = new HashSet<Worker>();  工作執行緒集合

6. private volatile ThreadFactory threadFactory; 建立執行緒的工廠,可以自定義執行緒建立的邏輯

7. private volatile RejectedExecutionHandler handler;  拒絕執行任務的處理器,可以自定義拒絕的策略

8. private volatile long keepAliveTime;   空閒執行緒的存活時間。可以根據這個存活時間來判斷空閒執行緒是否等待超時,然後採取相應的執行緒回收操作

9. private volatile boolean allowCoreThreadTimeOut;  是否允許coreThread執行緒超時回收

10. private volatile int corePoolSize;  可存活的執行緒的最小值。如果設定了allowCoreThreadTimeOut, 那麼corePoolSize的值可以為0。

11. private volatile int maximumPoolSize;  可存活的執行緒的最大值

工作執行緒建立和回收策略

ThreadPoolExecutor通過corePoolSize,maximumPoolSize, allowCoreThreadTimeOut,keepAliveTime等幾個引數提供一個靈活的工作執行緒建立和回收的策略。

建立策略:

1. 當工作執行緒數量小於corePoolSize時,不管其他執行緒是否空閒,都建立新的工作執行緒來處理新加入的任務

2. 當工作執行緒數量大於corePoolSize,小於maximumPoolSize時,只有當工作佇列滿了,才會建立新的工作執行緒來處理新加入的任務。當工作佇列有空餘時,只把新任務加入佇列

3. 把corePoolSize和maximumPoolSize 設定成相同的值時,執行緒池就是一個固定(fixed)工作執行緒數的執行緒。

回收策略:

1. keepAliveTime變數設定了空閒工作執行緒超時的時間,當工作執行緒數量超過了corePoolSize後,空閒的工作執行緒等待超過了keepAliveTime後,會被回收。後面會說怎麼確定一個工作執行緒是否“空閒”。

2. 如果設定了allowCoreThreadTimeOut,那麼core Thread也可以被回收,即當core thread也空閒時,也可以被回收,直到工作執行緒集合為0。


工作佇列策略

工作佇列BlockingQueue<Runnable> workQueue 是用來存放提交的任務的。它有4個基本的策略,並且根據不同的阻塞佇列的實現類可以引入更多的工作佇列的策略。

4個基本策略:

1. 當工作執行緒數量小於corePoolSize時,新提交的任務總是會由新建立的工作執行緒執行,不入佇列

2. 當工作執行緒數量大於corePoolSize,如果工作佇列沒滿,新提交的任務就入佇列

3. 當工作執行緒數量大於corePoolSize,小於MaximumPoolSize時,如果工作佇列滿了,新提交的任務就交給新建立的工作執行緒,不入佇列

4. 當工作執行緒數量大於MaximumPoolSize,並且工作佇列滿了,那麼新提交的任務會被拒絕執行。具體看採用何種拒絕策略


根據不同的阻塞佇列的實現類,又有幾種額外的策略

1. 採用SynchronousQueue直接將任務傳遞給空閒的執行緒執行,不額外儲存任務。這種方式需要無限制的MaximumPoolSize,可以建立無限制的工作執行緒來處理提交的任務。這種方式的好處是任務可以很快被執行,適用於任務到達時間大於任務處理時間的情況。缺點是當任務量很大時,會佔用大量執行緒

2. 採用無邊界的工作佇列LinkedBlockingQueue。這種情況下,由於工作佇列永遠不會滿,那麼工作執行緒的數量最大就是corePoolSize,因為當工作執行緒數量達到corePoolSize時,只有工作佇列滿的時候才會建立新的工作執行緒。這種方式好處是使用的執行緒數量是穩定的,當記憶體足夠大時,可以處理足夠多的請求。缺點是如果任務直接有依賴,很有可能形成死鎖,因為當工作執行緒被消耗完時,不會建立新的工作現場,只會把任務加入工作佇列。並且可能由於記憶體耗盡引發記憶體溢位OOM

3. 採用有界的工作佇列AraayBlockingQueue。這種情況下對於記憶體資源是可控的,但是需要合理調節MaximumPoolSize和工作佇列的長度,這兩個值是相互影響的。當工作佇列長度比較小的時,必定會建立更多的執行緒。而更多的執行緒會引起上下文切換等額外的消耗。當工作佇列大,MaximumPoolSize小的時候,會影響吞吐量,並且會觸發拒絕機制。

拒絕執行策略

當Executor處於shutdown狀態或者工作執行緒超過MaximumPoolSize並且工作佇列滿了之後,新提交的任務將會被拒絕執行。RejectedExecutionHandler介面定義了拒絕執行的策略。具體的策略有

CallerRunsPolicy:由呼叫者執行緒來執行被拒絕的任務,屬於同步執行

AbortPolicy:中止執行,丟擲RejectedExecutionException異常

DiscardPolicy:丟棄任務

DiscardOldestPolicy:丟棄最老的任務

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always.
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

工作執行緒Worker的設計

工作執行緒沒有直接使用Thread,而是採用了Worker類封裝了Thread,目的是更好地進行中斷控制。Worker直接繼承了AbstractQueuedSynchronizer來進行同步操作,它實現了一個不可重入的互斥結構。當它的state屬性為0時表示unlock,state為1時表示lock。任務執行時必須在lock狀態的保護下,防止出現同步問題。因此當Worker處於lock狀態時,表示它正在執行,當它處於unlock狀態時,表示它“空閒”。當它空閒超過keepAliveTime時,就有可能被回收。

Worker還實現了Runnable介面, 執行它的執行緒是Worker包含的Thread物件,在Worker的建構函式可以看到Thread建立時,把Worker物件傳遞給了它。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;

            // 把Worker物件作為Runnable的例項傳遞給了新建立Thread物件
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Worker被它的執行緒執行時,run方法呼叫了ThreadPoolExecutor的runWorker方法。

1. wt指向當前執行Worker的run方法的執行緒,也就是指向了Worker包含的工作執行緒物件

2. task指向Worker包含的firstTask物件,表示當前要執行的任務

3. 當task不為null或者從工作佇列中取到了新任務,那麼先加鎖w.lock表示正在執行任務。在真正開始執行task.run()之前,先判斷執行緒池的狀態是否已經STOP,如果是,就中斷Worker的執行緒。

4. 一旦判斷當前執行緒不是STOP並且工作執行緒沒有中斷。那麼就開始執行task.run()了。Worker的interruptIfStarted方法可以中斷這個Worker的執行緒,從而中斷正在執行任務。

5. beforeExecute(wt, task)和afterExecute(wt,task)是兩個鉤子方法,支援在任務真正開始執行前就行擴充套件。

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

工作執行緒Worker建立和回收的原始碼

首先看一下ThreadPoolExecutor的execute方法,這個方式是任務提交的入口。可以看到它的邏輯符合之前說的工作執行緒建立的基本策略

1. 當工作執行緒數量小於corePoolSize時,通過addWorker(command,true)來新建工作執行緒處理新建的任務,不入工作佇列

2. 當工作執行緒數量大於等於corePoolSize時,先入佇列,使用的是BlockingQueue的offer方法。當工作執行緒數量為0時,還會通過addWorker(null, false)新增一個新的工作執行緒

3. 當工作佇列滿了並且工作執行緒數量在corePoolSize和MaximumPoolSize之間,就建立新的工作執行緒去執行新新增的任務。當工作執行緒數量超過了MaximumPoolSize,就拒絕任務。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
   
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

可以看到addWorker方法是建立Worker工作執行緒的所在。

1. retry這個迴圈判斷執行緒池的狀態和當前工作執行緒數量的邊界。如果允許建立工作現場,首先修改ctl變量表示的工作執行緒的數量

2. 把工作執行緒新增到workers集合中的操作要在mainLock這個鎖的保護下進行。所有和ThreadPoolExecutor狀態相關的操作都要在mainLock鎖的保護下進行

3. w = new Worker(firstTask); 建立Worker例項,把firstTask作為它當前的任務。firstTask為null時表示先只建立Worker執行緒,然後去工作佇列中取任務執行

4. 把新建立的Worker例項加入到workers集合,修改相關統計變數。

5. 當加入集合成功後,開始啟動這個Worker例項。啟動的方法是呼叫Worker封裝的Thread的start()方法。之前說了,這個Thread對應的Runnable是Worker本身,會去呼叫Worker的run方法,然後呼叫ThreadPoolExecutor的runWorker方法。在runWorker方法中真正去執行任務。

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

工作執行緒回收的方法是processWorkerExit(),它在runWorker方法執行結束的時候被呼叫。之前說了空閒的工作執行緒可能會在keepAliveTime時間之後被回收。這個邏輯隱含在runWorker方法和getTask方法中,會在下面說如何從工作佇列取任務時說明。processWorkerExit方法單純只是處理工作執行緒的回收。

1. 結合runWorker方法看,如果Worker執行task.run()的時候丟擲了異常,那麼completedAbruptly為true,需要從workers集合中把這個工作執行緒移除掉。

2. 如果是completedAbruptly為true,並且執行緒池不是STOP狀態,那麼就建立一個新的Worker工作執行緒

3. 如果是completedAbruptly為false,並且執行緒池不是STOP狀態,首先檢查是否allowCoreThreadTimeout,如果執行,那麼最少執行緒數可以為0,否則是corePoolSize。如果最少執行緒數為0,並且工作佇列不為空,那麼最小值為1。最後檢查當前的工作執行緒數量,如果小於最小值,就建立新的工作執行緒。

 private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

任務的獲取

工作執行緒從工作佇列中取任務的程式碼在getTask方法中

1. timed變量表示是否要計時,當計時超過keepAliveTime後還沒取到任務,就返回null。結合runWorker方法可以知道,當getTask返回null時,該Worker執行緒會被回收,這就是如何回收空閒工作執行緒的方法。

timed變數當allowCoreThreadTimeout為true或者當工作執行緒數大於corePoolSize時為true。

2. 如果timed為true,就用BlockingQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法來計時從隊頭取任務,否則直接用take()方法從隊頭取任務

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

執行緒池的關閉

執行緒池有SHUTDOWN, STOP, TIDYING, TERMINATED這幾個狀態和執行緒池關閉相關。通常我們把關閉分為優雅的關閉和強制立刻關閉。

所謂優雅的關閉就是呼叫shutdown()方法,執行緒池進入SHUTDOWN狀態,不在接收新的任務,會把工作佇列的任務執行完畢後再結束。

強制立刻關閉就是呼叫shutdownNow()方法,執行緒池直接進入STOP狀態,會中斷正在執行的工作執行緒,清空工作佇列。

1. 在shutdown方法中,先設定執行緒池狀態為SHUTDOWN,然後先去中斷空閒的工作執行緒,再呼叫onShutdown鉤子方法。最後tryTerminate()

2. 在shutdownNow方法中,先設定執行緒池狀態為STOP,然後先中斷所有的工作執行緒,再清空工作佇列。最後tryTerminate()。這個方法會把工作佇列中的任務返回給呼叫者處理。

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

      public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

interruptIdleWorkers方法會去中斷空閒的工作執行緒,所謂空閒的工作執行緒即沒有上鎖的Worker。

而interruptWorkers方法直接去中斷所有的Worker,呼叫Worker.interruptIfStarted()方法

  private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

  void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

tryTerminate方法會嘗試終止執行緒池,根據執行緒池的狀態,在相應狀態會中斷空閒工作執行緒,呼叫terminated()鉤子方法,設定狀態為TERMINATED。
final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

最後說明一下,JVM的守護程序只有當所有派生出來的執行緒都結束後才會退出,使用ThreadPoolExecutor執行緒池時,如果有的任務一直執行,並且不響應中斷,那麼會一直佔用執行緒,那麼JVM也會一直工作,不會退出。