1. 程式人生 > >執行緒池技術之:ThreadPoolExecutor 原始碼解析

執行緒池技術之:ThreadPoolExecutor 原始碼解析

  java中的所說的執行緒池,一般都是圍繞著 ThreadPoolExecutor 來展開的。其他的實現基本都是基於它,或者模仿它的。所以只要理解 ThreadPoolExecutor, 就相當於完全理解了執行緒池的精髓。

  其實要理解一個東西,一般地,我們最好是要抱著自己的疑問或者理解去的。否則,往往收穫甚微。

 

  理解 ThreadPoolExecutor, 我們可以先理解一個執行緒池的意義: 本質上是提供預先定義好的n個執行緒,供呼叫方直接執行任務的一個工具。

 

執行緒池解決的問題:

  1. 提高任務執行的響應速度,降低資源消耗。任務執行時,直接立即使用執行緒池提供的執行緒執行,避免了臨時建立執行緒的CPU/記憶體開銷,達到快速響應的效果。

  2. 提高執行緒的可管理性。執行緒總數可預知,避免使用者主動建立無限多執行緒導致宕機風險,還可以進行執行緒統一的分配、調優和監控。

  3. 避免對資源的過度使用。在超出預期的請求任務情況,響應策略可控。

 

執行緒池提供的核心介面:

  要想使用執行緒池,自然是要理解其介面的。一般我們使用 ExecotorService 進行執行緒池的呼叫。然而,我們並不針對初學者。

  整體的介面如下:

 

   我們就挑幾個常用介面探討下:

    submit(Runnable task): 提交一個無需返回結果的任務。
    submit(Callable<T> task): 提交一個有返回結果的任務。

    invokeAll(Collection<? extends Callable<T>> tasks, long, TimeUnit): 同時執行n個任務並返回結果列表。
    shutdown(): 關閉執行緒程池。
    awaitTermination(long timeout, TimeUnit unit): 等待關閉結果,最長不超過timeout時間。

 

以上是ThreadPoolExector 提供的特性,針對以上特性。

我們應該要有自己的幾個實現思路或疑問:

  1. 執行緒池如何接受任務?

  2. 執行緒如何執行任務?

  3. 執行緒池如何關閉?

 

接下來,就讓我們帶著疑問去看實現吧。

ThreadPoolExecutor 核心實現原理

1. 執行緒池的處理流程

  我們首先重點要看的是,如何執行提交的任務。我可以通過下圖來看看。

 

   總結描述下就是:

    1. 判斷核心執行緒池是否已滿,如果不是,則建立執行緒執行任務
    2. 如果核心執行緒池滿了,判斷佇列是否滿了,如果佇列沒滿,將任務放在佇列中
    3. 如果佇列滿了,則判斷執行緒池是否已滿,如果沒滿,建立執行緒執行任務
    4. 如果執行緒池也滿了,則按照拒絕策略對任務進行處理

 

  另外,我們來看一下 ThreadPoolExecutor 的構造方法,因為這裡會體現出每個屬性的含義。

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

  從構造方法可以看出 ThreadPoolExecutor 的主要引數 7 個,在其註釋上也有說明功能,咱們翻譯下每個引數的功能:

    corePoolSize: 執行緒池核心執行緒數(平時保留的執行緒數),使用時機: 在初始時刻,每次請求進來都會建立一個執行緒直到達到該size
    maximumPoolSize: 執行緒池最大執行緒數,使用時機: 當workQueue都放不下時,啟動新執行緒,直到最大執行緒數,此時到達執行緒池的極限
    keepAliveTime/unit: 超出corePoolSize數量的執行緒的保留時間,unit為時間單位
    workQueue: 阻塞佇列,當核心執行緒數達到或者超出後,會先嚐試將任務放入該佇列由各執行緒自行消費;  
        ArrayBlockingQueue: 建構函式一定要傳大小
        LinkedBlockingQueue: 建構函式不傳大小會預設為65536(Integer.MAX_VALUE ),當大量請求任務時,容易造成 記憶體耗盡。
        SynchronousQueue: 同步佇列,一個沒有儲存空間的阻塞佇列 ,將任務同步交付給工作執行緒。
        PriorityBlockingQueue: 優先佇列
    threadFactory:執行緒工廠,用於執行緒需要建立時,呼叫其newThread()生產新執行緒使用
    handler: 飽和策略,當佇列已放不下任務,且建立的執行緒已達到 maximum 後,則不能再處理任務,直接將任務交給飽和策略
        AbortPolicy: 直接拋棄(預設)
        CallerRunsPolicy: 用呼叫者的執行緒執行任務
        DiscardOldestPolicy: 拋棄佇列中最久的任務
        DiscardPolicy: 拋棄當前任務

 

2. submit 流程詳解

  當呼叫 submit 方法,就是向執行緒池中提交一個任務,處理流程如步驟1所示。但是我們需要更深入理解。

  submit 方法是定義在 AbstractExecutorService 中,最終呼叫 ThreadPoolExecutor 的 execute 方法,即是模板方法模式的應用。

    // java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T)
    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        // 封裝任務和返回結果為 RunnableFuture, 統一交由具體的子類執行
        RunnableFuture<T> ftask = newTaskFor(task, result);
        // execute 將會呼叫 ThreadPoolExecutor 的實現,是我們討論的重要核心
        execute(ftask);
        return ftask;
    }
    // FutureTask 是個重要的執行緒池元件,它承載了具體的任務執行流
    /**
     * Returns a {@code RunnableFuture} for the given runnable and default
     * value.
     *
     * @param runnable the runnable task being wrapped
     * @param value the default value for the returned future
     * @param <T> the type of the given value
     * @return a {@code RunnableFuture} which, when run, will run the
     * underlying runnable and which, as a {@code Future}, will yield
     * the given value as its result and provide for cancellation of
     * the underlying task
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    
    // ThreadPoolExecutor 的任務提交過程
    // java.util.concurrent.ThreadPoolExecutor#execute
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        // ctl 是一個重要的控制全域性狀態的資料結構,定義為一個執行緒安全的 AtomicInteger
        // ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        int c = ctl.get();
        // 當還沒有達到核心執行緒池的數量時,直接新增1個新執行緒,然後讓其執行任務即可
        if (workerCountOf(c) < corePoolSize) {
            // 2.1. 新增新執行緒,且執行command任務
            // 新增成功,即不需要後續操作了,新增失敗,則說明外部環境變化了
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 當核心執行緒達到後,則嘗試新增到阻塞佇列中,具體新增方法由阻塞佇列實現
        // isRunning => c < SHUTDOWN;
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 2.2. 新增佇列成功後,還要再次檢測執行緒池的執行狀態,決定啟動執行緒或者狀態過期
            // 2.2.1. 當執行緒池已關閉,則將剛剛新增的任務移除,走reject策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 2.2.2. 當一個worker都沒有時,則新增worker
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 當佇列滿後,則直接再建立新的執行緒執行,如果不能再建立執行緒了,則 reject
        else if (!addWorker(command, false))
            // 2.3. 拒絕策略處理
            reject(command);
    }

  通過上面這一小段程式碼,我們就已經完整地看到了。通過一個 ctl 變數進行全域性狀態控制,從而保證了執行緒安全性。整個框架並沒有使用鎖,但是卻是執行緒安全的。

  整段程式碼剛好完整描述了執行緒池的執行流程:

    1. 判斷核心執行緒池是否已滿,如果不是,則建立執行緒執行任務;
    2. 如果核心執行緒池滿了,判斷佇列是否滿了,如果佇列沒滿,將任務放在佇列中;
    3. 如果佇列滿了,則判斷執行緒池是否已滿,如果沒滿,建立執行緒執行任務;
    4. 如果執行緒池也滿了,則按照拒絕策略對任務進行處理;

 

2.1. 新增新的worker

  一個worker,即是一個工作執行緒。

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // 為確保執行緒安全,進行CAS反覆重試
        retry:
        for (;;) {
            int c = ctl.get();
            // 獲取runState , c 的高位儲存
            // c & ~CAPACITY;
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 已經shutdown, firstTask 為空的新增並不會成功
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 如果超出最大允許建立的執行緒數,則直接失敗
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS 更新worker+1數,成功則說明佔位成功退出retry,後續的新增操作將是安全的,失敗則說明已有其他執行緒變更該值
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // runState 變更,則退出到 retry 重新迴圈 
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 以下為新增 worker 過程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 使用 Worker 封閉 firstTask 任務,後續執行將由 Worker 接管
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 新增 worker 的過程,需要保證執行緒安全
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // SHUTDOWN 情況下還是會建立 Worker, 但是後續檢測將會失敗
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 既然是新新增的執行緒,就不應該是 alive 狀態
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // workers 只是一個工作執行緒的容器,使用 HashSet 承載
                        // private final HashSet<Worker> workers = new HashSet<Worker>();
                        workers.add(w);
                        int s = workers.size();
                        // 維護一個全域性達到過的最大執行緒數計數器
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // worker 新增成功後,進行將worker啟起來,裡面應該是有一個 死迴圈,一直在獲取任務
                // 不然怎麼執行新增到佇列裡的任務呢?
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果任務啟動失敗,則必須進行清理,返回失敗
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    // 大概新增 worker 的框架明白了,重點物件是 Worker, 我們稍後再講
    // 現在先來看看,新增失敗的情況,如何進行
    /**
     * Rolls back the worker thread creation.
     * - removes worker from workers, if present
     * - decrements worker count
     * - rechecks for termination, in case the existence of this
     *   worker was holding up termination
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            // ctl 中的 workerCount - 1 , CAS 實現
            decrementWorkerCount();
            // 嘗試處理空閒執行緒
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
    /**
     * Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
    // 停止可能啟動的 worker
    /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 執行緒池正在執行、正在清理、已關閉但佇列還未處理完,都不會進行 terminate 操作
            if (isRunning(c) ||
                // c >= TIDYING
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // 停止執行緒的兩個方式之一,只中斷一個 worker
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            // 以下為整個執行緒池的後置操作
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 設定正在清理標識
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 執行緒池已終止的鉤子方法,預設實現為空
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 此處 termination 為喚醒等待關閉的執行緒
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
    /**
     * Interrupts threads that might be waiting for tasks (as
     * indicated by not being locked) so they can check for
     * termination or configuration changes. Ignores
     * SecurityExceptions (in which case some threads may remain
     * uninterrupted).
     *
     * @param onlyOne If true, interrupt at most one worker. This is
     * called only from tryTerminate when termination is otherwise
     * enabled but there are still other workers.  In this case, at
     * most one waiting worker is interrupted to propagate shutdown
     * signals in case all threads are currently waiting.
     * Interrupting any arbitrary thread ensures that newly arriving
     * workers since shutdown began will also eventually exit.
     * To guarantee eventual termination, it suffices to always
     * interrupt only one idle worker, but shutdown() interrupts all
     * idle workers so that redundant workers exit promptly, not
     * waiting for a straggler task to finish.
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 迭代所有 worker
            for (Worker w : workers) {
                Thread t = w.thread;
                // 獲取到 worker 的鎖之後,再進行 interrupt
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                // 只中斷一個 worker, 立即返回, 不保證 interrupt 成功
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

 

2.2. 當新增佇列成功後,發現執行緒池狀態變更,需要進行移除佇列操作

    /**
     * Removes this task from the executor's internal queue if it is
     * present, thus causing it not to be run if it has not already
     * started.
     *
     * <p>This method may be useful as one part of a cancellation
     * scheme.  It may fail to remove tasks that have been converted
     * into other forms before being placed on the internal queue. For
     * example, a task entered using {@code submit} might be
     * converted into a form that maintains {@code Future} status.
     * However, in such cases, method {@link #purge} may be used to
     * remove those Futures that have been cancelled.
     *
     * @param task the task to remove
     * @return {@code true} if the task was removed
     */
    public boolean remove(Runnable task) {
        // 此移除不一定能成功
        boolean removed = workQueue.remove(task);
        // 上面已經看過,它會嘗試停止一個 worker 執行緒
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }

 

3. 新增失敗進行執行拒絕策略

    /**
     * Invokes the rejected execution handler for the given command.
     * Package-protected for use by ScheduledThreadPoolExecutor.
     */
    final void reject(Runnable command) {
        // 拒絕策略是在構造方法時傳入的,預設為 RejectedExecutionHandler
        // 即使用者只需實現 rejectedExecution 方法,即可以自定義拒絕策略了
        handler.rejectedExecution(command, this);
    }

 

4. Worker 的工作機制

  從上面的實現中,我們可以看到,主要是對 Worker 的新增和 workQueue 的新增,所以具體的工作是由誰完成呢?自然就是 Worker 了。

        // Worker 的構造方法,主要是接受一個 task, 可以為 null, 如果非null, 將在不久的將來被執行
        // private final class Worker extends AbstractQueuedSynchronizer implements Runnable
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 將 Worker 自身當作一個 任務,繫結到 worker.thread 中
            // thread 啟動時,worker 就啟動了
            this.thread = getThreadFactory().newThread(this);
        }
        // Worker 的主要工作實現,通過一個迴圈掃描實現        
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            // 呼叫 ThreadPoolExecutor 外部實現的 runWorker 方法
            runWorker(this);
        }
        
    /**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 不停地從 workQueue 中獲取任務,然後執行,就是這麼個邏輯
            // getTask() 會阻塞式獲取,所以 Worker 往往不會立即退出 
            while (task != null || (task = getTask()) != null) {
                // 執行過程中是不允許併發的,即同時只能一個 task 在執行,此時也不允許進行 interrupt
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 檢測是否已被執行緒池是否停止 或者當前 worker 被中斷
                // STOP = 1 << COUNT_BITS;
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // 中斷資訊傳遞
                    wt.interrupt();
                try {
                    // 任務開始前 切點,預設為空執行
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 直接呼叫任務的run方法, 具體的返回結果,會被 FutureTask 封裝到 某個變數中
                        // 可以參考以前的文章 (FutureTask是怎樣獲取到非同步執行結果的? https://www.cnblogs.com/yougewe/p/11666284.html)
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任務開始後 切點,預設為空執行
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 正常退出,有必要的話,可能重新將 Worker 新增進來
            completedAbruptly = false;
        } finally {
            // 處理退出後下一步操作,可能重新新增 Worker
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    /**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     *
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            // 在 Worker 正常退出的情況下,檢查是否超時導致,維持最小執行緒數
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果滿足最小執行緒要求,則直接返回
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 否則再新增一個Worker到執行緒池中備用
            // 非正常退出,會直接再新增一個Worker
            addWorker(null, false);
        }
    }
    
    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 如果進行了 shutdown, 且佇列為空, 則需要將 worker 退出
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // do {} while (! compareAndDecrementWorkerCount(ctl.get()));
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 執行緒資料大於最大允許執行緒,需要刪除多餘的 Worker
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 如果開戶了超時刪除功能,則使用 poll, 否則使用 take() 進行阻塞獲取
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 獲取到任務,則可以進行執行了
                if (r != null)
                    return r;
                // 如果有超時設定,則會在下一迴圈時退出
                timedOut = true;
            }
            // 忽略中斷異常
            // 在這種情況下,Worker如何響應外部的中斷請求呢??? 思考
            catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

  所以,Worker的作用就體現出來了,一個迴圈取任務執行任務過程:

    1. 有一個主迴圈一直進行任務的獲取;
    2. 針對有超時的設定,會使用poll進行獲取任務,如果超時,則 Worker 將會退出迴圈結束執行緒;
    3. 無超時的設定,則會使用 take 進行阻塞式獲取,直到有值;
    4. 獲取任務執行前置+業務+後置任務;
    5. 當獲取到null的任務之後,當前Worker將會結束;
    6. 當前Worker結束後,將會判斷是否有必要維護最低Worker數,從而決定是否再新增Worker進來。

  還是借用一個網上同學比較通用的一個圖來表述下 Worker/ThreadPoolExecutor 的工作流程吧(已經很完美,不需要再造這輪子了)

 

5. shutdown 操作實現

  ThreadPoolExecutor 是通過 ctl 這個變數進行全域性狀態維護的,shutdown 線上程池中也是表現為一個狀態,所以應該是比較簡單的。

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        // 為保證執行緒安全,使用 mainLock
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // SecurityManager 檢查
            checkShutdownAccess();
            // 設定狀態為 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中斷空閒的 Worker, 即相當於依次關閉每個空閒執行緒
            interruptIdleWorkers();
            // 關閉鉤子,預設實現為空操作,為方便子類實現自定義清理功能
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 再
        tryTerminate();
    }
    /**
     * Transitions runState to given target, or leaves it alone if
     * already at least the given target.
     *
     * @param targetState the desired state, either SHUTDOWN or STOP
     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
     */
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            // 自身CAS更新成功或者被其他執行緒更新成功
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
    // 關閉空閒執行緒(非 running 狀態)
    /**
     * Common form of interruptIdleWorkers, to avoid having to
     * remember what the boolean argument means.
     */
    private void interruptIdleWorkers() {
        // 上文已介紹, 此處 ONLY_ONE 為 false, 即是最大可能地中斷所有 Worker
        interruptIdleWorkers(false);
    }
    
    與 shutdown 對應的,有一個 shutdownNow, 其語義是 立即停止所有任務。
    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 與 shutdown 的差別,設定的狀態不一樣
            advanceRunState(STOP);
            // 強行中斷執行緒
            interruptWorkers();
            // 將未完成的任務返回
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                // 呼叫 worker 的提供的中斷方法
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
        // ThreadPoolExecutor.Worker#interruptIfStarted
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    // 直接呼叫任務的 interrupt
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

 

6. invokeAll 的實現方式

  invokeAll, 望文生義,即是呼叫所有給定的任務。想來應該是一個個地新增任務到執行緒池佇列吧。

    // invokeAll 的方法直接在抽象方便中就實現了,它的語義是同時執行n個任務,並同步等待結果返回
    // java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection<? extends java.util.concurrent.Callable<T>>)
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                // 依次呼叫各子類的實現,新增任務
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    try {
                        // 依次等待執行結果
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

  實現很簡單,都是些外圍呼叫。

 

7. ThreadPoolExecutor 的狀態值的設計

  通過上面的過程,可以看到,整個ThreadPoolExecutor 非狀態的依賴是非常強的。所以一個好的狀態值的設計就顯得很重要了,runState 代表執行緒池或者 Worker 的執行狀態。如下:

    // runState is stored in the high-order bits
    // 整個狀態使值使用 ctl 的高三位值進行控制, COUNT_BITS=29
    // 1110 0000 0000 0000
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 0000 0000 0000 0000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 0010 0000 0000 0000
    private static final int STOP       =  1 << COUNT_BITS;
    // 0100 0000 0000 0000
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 0110 0000 0000 0000
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 整個狀態值的大小順序主: RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
    
    // 而低 29位,則用來儲存 worker 的數量,當worker增加時,只要將整個 ctl 增加即可。
    // 0001 1111 1111 1111, 即是最大的 worker 數量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 整個 ctl 描述為一個 AtomicInteger, 功能如下:
    /**
     * The main pool control state, ctl, is an atomic integer packing
     * two conceptual fields
     *   workerCount, indicating the effective number of threads
     *   runState,    indicating whether running, shutting down etc
     *
     * In order to pack them into one int, we limit workerCount to
     * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
     * billion) otherwise representable. If this is ever an issue in
     * the future, the variable can be changed to be an AtomicLong,
     * and the shift/mask constants below adjusted. But until the need
     * arises, this code is a bit faster and simpler using an int.
     *
     * The workerCount is the number of workers that have been
     * permitted to start and not permitted to stop.  The value may be
     * transiently different from the actual number of live threads,
     * for example when a ThreadFactory fails to create a thread when
     * asked, and when exiting threads are still performing
     * bookkeeping before terminating. The user-visible pool size is
     * reported as the current size of the workers set.
     *
     * The runState provides the main lifecycle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     *
     * Detecting the transition from SHUTDOWN to TIDYING is less
     * straightforward than you'd like because the queue may become
     * empty after non-empty and vice versa during SHUTDOWN state, but
     * we can only terminate if, after seeing that it is empty, we see
     * that workerCount is 0 (which sometimes entails a recheck -- see
     * below).
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

 

8. awaitTermination 等待關閉完成

  從上面的 shutdown, 可以看到,只是寫了 SHUTDOWN 標識後,嘗試儘可能地中斷停止Worker執行緒,但並不保證中斷成功。要想保證停止完成,需要有另外的機制來保證。從 awaitTermination 的語義來說,它是能保證任務停止完成的,那麼它是如何保證的呢?

    // ThreadPoolExecutor.awaitTermination()
    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                // 只是迴圈 ctl 狀態, 只要 狀態為 TERMINATED 狀態,則說明已經關閉成功
                // 此處 termination 的狀態觸發是在 tryTerminate 中觸發的
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
    

  看起來, awaitTermination 並沒有什麼特殊操作,而是一直在等待。所以 TERMINATED 是 Worker 自行發生的動作。

  那是在哪裡做的操作呢?其實是在獲取任務的時候,會檢測當前狀態是否是 SHUTDOWN, 如果是SHUTDOWN且 佇列為空,則會觸發獲取任務的返回null.從而結束當前 Worker. 

  Worker 在結束前會呼叫 processWorkerExit() 方法,裡面會再次呼叫 tryTerminate(), 當所有 Worker 都執行到這個點後, awaitTermination() 就會收到通知了。(注意: processWorkerExit() 會在每次執行後進行 addWorker() 嘗試,但是在 SHUTDOWN 狀態的新增操作總是失敗的,所以不用考慮)

 

  到此,你是否可以解答前面的幾個問題了呢?