1. 程式人生 > >執行緒池為什麼能維持執行緒不釋放,隨時執行各種任務?

執行緒池為什麼能維持執行緒不釋放,隨時執行各種任務?

執行緒池

之前一直有這個疑問:我們平時使用執行緒都是各種new Thread(),然後直接在run()方法裡面執行我們要做的各種操作,使用完後需要做什麼管理嗎?執行緒池為什麼能維持住核心執行緒不釋放,一直接收任務進行處理呢?

執行緒

執行緒無他,主要有兩個方法,我們先看看start()方法介紹:

    /**
     * Causes this thread to begin execution; the Java Virtual Machine
     * calls the <code>run</code> method of this thread.
     * <p>
     * The result is that two threads are running concurrently: the
     * current thread (which returns from the call to the
     * <code>start</code> method) and the other thread (which executes its
     * <code>run</code> method).
     * <p>
     * It is never legal to start a thread more than once.
     * In particular, a thread may not be restarted once it has completed
     * execution.
     *
     * @exception  IllegalThreadStateException  if the thread was already
     *               started.
     * @see        #run()
     * @see        #stop()
     */

    public synchronized void start() {
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);

        started = false;
        try {
            nativeCreate(this, stackSize, daemon);
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }
  • 從這個方法解釋上看,start()這個方法,最終會交給VM 去執行run()方法,所以一般情況下,我們在隨便一個執行緒上執行start(),裡面的run()操作都會交給VM 去執行。
  • 而且還說明,重複啟用執行緒是不合法的,當一個執行緒完成的時候,may not be restarted once。

    那麼這種情況下,執行緒池是怎麼做的?他為什麼就能夠重複執行各種任務呢?

帶著各種疑問,我們去看看執行緒池自己是怎麼實現的。

執行緒池

執行緒池常用的建立方法有那麼幾種: 
1. newFixedThreadPool() 
2. newSingleThreadExecutor() 
3. newCachedThreadPool() 
4. newScheduledThreadPool()

這4個方法建立的執行緒池例項具體就不一一介紹,無非是建立執行緒的多少,以及回收等問題,因為其實這4個方法最後都會呼叫統一的構造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

具體來說只是這幾個值的不同決定了4個執行緒池的作用: 
1. corePoolSize 代表核心執行緒池的個數,當執行緒池當前的個數大於核心執行緒池的時候,執行緒池會回收多出來的執行緒 
2. maximumPoolSize 代表最大的執行緒池個數,當執行緒池需要執行的任務大於核心執行緒池的時候,會建立更多的執行緒,但是最大不能超過這個數 
3. keepAliveTime 代表空餘的執行緒存活的時間,當多餘的執行緒完成任務的時候,需要多長時間進行回收,時間單位是unit 去控制 
4. workQueue 非常重要,這個工作佇列會存放所有待執行的Runnable物件

 @param workQueue the queue to use for holding tasks before they areexecuted. 
 This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method.

我們平時在使用執行緒池的時候,都是直接 例項.execute(Runnable),一起跟進去,看看這個方法具體做了什麼

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */

        //結合上文的註釋,我們得知,第一次,先判斷當前的核心執行緒數,
        //如果小於初始化的值,馬上建立;然後第二個if,將這個任務插入到工作執行緒,雙重判斷任務,
        //假定如果前面不能直接加入到執行緒池Worker集合裡,則加入到workQueue佇列等待執行。
        //裡面的if else判斷語句則是檢查當前執行緒池的狀態。如果執行緒池本身的狀態是要關閉並清理了,
        //我們則不能提交執行緒進去了。這裡我們就要reject他們。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

所以其實主要起作用的還是addWorker()方法,我們繼續跟蹤進去:

private boolean addWorker(Runnable firstTask, boolean core) {
        ···多餘程式碼


        try {
            w = new Worker(firstTask);  1.重點
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();   2. 重點
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我們看重點部分,其實最重要的是firstTask這個Runnable,我們一直跟蹤這個物件就可以了,這個物件會new Worker(),那麼這個wroker()就是一個包裝類,裡面帶著我們實際需要執行的任務,後面進行一系列的判斷就會執行t.start(); 這個t 就是包裝類worker類裡面的Thread,所以整個邏輯又轉化進去Worker內部。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

        ...省略程式碼
    }
  • 這個Worker包裝類,重要的屬性兩個,thread 就是剛才上面那個方法執行的start()物件,這個thread又是把這個worker物件本身作為一個Runnable物件構建出來的,那麼當我們呼叫thread.start()方法時候,實際呼叫的就是Worker類的run()方法。現在又要追蹤進去,看這個runWorker(this),做的是什麼鬼東西
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這個方法還是比較好懂的: 
1. 一個大迴圈,判斷條件是task != null || (task = getTask()) != null,task自然就是我們要執行的任務了,當task空而且getTask()取不到任務的時候,這個while()就會結束,迴圈體裡面進行的就是task.run(); 
2.這裡我們其實可以打個心眼,那基本八九不離十了,肯定是這個迴圈一直沒有退出,所以才能維持著這一個執行緒不斷執行,當有外部任務進來的時候,迴圈體就能getTask()並且執行。 
3.下面最後放getTask()裡面的程式碼,驗證猜想

   private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }


            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  • 真相大白了,裡面進行的也是一個死迴圈,主要看 Runnable r = timed ? 
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 
    workQueue.take();

  • 工作佇列workQueue會一直去拿任務,屬於核心執行緒的會一直卡在 workQueue.take()方法,直到拿到Runnable 然後返回,非核心執行緒會 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超時還沒有拿到,下一次迴圈判斷compareAndDecrementWorkerCount就會返回null,Worker物件的run()方法迴圈體的判斷為null,任務結束,然後執行緒被系統回收

總結

一句話可以概述了,執行緒池就是用一堆包裝住Thread的Wroker類的集合,在裡面有條件的進行著死迴圈,從而可以不斷接受任務來進行。