1. 程式人生 > >(十)ThreadPoolExecutor 原始碼分析 —— 執行緒池

(十)ThreadPoolExecutor 原始碼分析 —— 執行緒池

一、生產者消費者模式

我們在多執行緒開發中經常會使用到生產者消費者模式,所以在這邊先進行生產者消費者模式的簡單介紹。

為什麼要使用生產者消費者模式:

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這種生產消費能力不均衡的問題,所以便有了生產者和消費者模式。
引用自 https://my.oschina.net/xianggao/blog/390411

這邊採用同步進行實現一個簡單的生產者消費者模式。

Product :

public class Product {

    private ArrayList<String> list;
    private int maxSize;

    public Product(int maxSize) {
        this.list = new ArrayList<String>();
        this.maxSize = maxSize;
    }

    /**
     * 生產
     */
    public void produce() {
        try {
            synchronized
(list) { while (list.size() == maxSize) { list.wait(); } list.add(System.currentTimeMillis() + ""); System.out.println(Thread.currentThread().getName() + "生產一個"); list.notifyAll(); } } catch
(InterruptedException e) { } } /** * 消費 */ public String consume() { try { synchronized (list) { while (list.size() == 0) { list.wait(); } String consume = list.remove(0); System.out.println(Thread.currentThread().getName() + "消費一個"); list.notifyAll(); return consume; } } catch (InterruptedException e) { return null; } } }

Test :

public class Test {

    public static void main(String[] args) {
        // TODO Auto-generated method stub

        final Product product = new Product(10);

        new Thread(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                while(true){
                    product.produce();
                }
            }
        }).start();     

        new Thread(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                while(true){
                    product.consume();
                }
            }
        }).start();
    }
}

二、ThreadPoolExecutor

1.建構函式

ThreadPoolExecutor 建構函式有多個過載,這邊主要是講解一下各個引數的意義。

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) ;

引數說明:

corePoolSize:核心執行緒數
maximumPoolSize:最大執行緒數
keepAliveTime:執行緒超時時間,超時時候,執行緒會被釋放回收
unit:執行緒超時時間的單位,有TimeUnit.MILLISECONDS(ms)、TimeUnit. SECONDS(s)等
workQueue:任務佇列
threadFactory:執行緒工廠介面,只有 new Thread(Runnable r)方法,建立新執行緒
handler:當任務執行失敗時,使用 handler 進行通知

2.處理邏輯

一個執行緒池裡面的執行緒分為核心執行緒和非核心執行緒。
1.如果當前執行緒池中數量小於 corePoolSize,建立核心執行緒執行任務。
2.如果當前執行緒池中數量大於 corePoolSize,執行緒池中執行緒小於 maximumPoolSize,建立非核心執行緒執行任務。
3.如果當前執行緒池中數量大於 corePoolSize,執行緒池中執行緒等於 maximumPoolSize,任務佇列沒滿,加入任務佇列進行等待。
4.如果當前執行緒池中數量大於 corePoolSize,執行緒池中執行緒等於 maximumPoolSize,任務佇列滿了,交給錯誤 handler 進行處理。
5.非核心執行緒空閒時間超過 keepAliveTime,就會進行回收釋放。

3.執行緒池狀態與數量

在 ThreadPoolExecutor 中有一個 AtomicInteger 的變數 ctl,ctl 裡面儲存有兩個資料資訊,當前執行緒池的狀態 runState 和有效的執行緒數量 workCount 。AtomicInteger 是一個32位的整數,其中高3位表示執行緒池狀態,低29位表示執行緒數量。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

執行緒池狀態有5個。

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//接收新任務,並且可以處理佇列中的任務
private static final int RUNNING    = -1 << COUNT_BITS;
//不再接收新任務,但是可以處理佇列中的任務
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//不接受新任務,不再處理佇列中的任務,並且中斷正在執行的任務
private static final int STOP       =  1 << COUNT_BITS;
//所有狀態執行完畢,呼叫 terminated 方法(該方法需過載)
private static final int TIDYING    =  2 << COUNT_BITS;
//terminated 方法執行完畢
private static final int TERMINATED =  3 << COUNT_BITS;

ThreadPoolExecutor 提供了三個方法,可以對 ctl 資料進行封裝解析

// Packing and unpacking ctl
//獲取執行緒池狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//獲取執行緒數量
private static int workerCountOf(int c)  { return c & CAPACITY; }
//封裝
private static int ctlOf(int rs, int wc) { return rs | wc; }

3.原始碼分析

執行緒池 ThreadPoolExecutor 的執行,也是通過 execute 方法進行的,我們先檢視這個方法。

ThreadPoolExecutor 的 execute:

   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        //判斷當前執行緒數小於 corePoolSize,
        if (workerCountOf(c) < corePoolSize) {
            //新增核心執行緒(true 表示核心執行緒)
            if (addWorker(command, true))
                //新增成功,返回
                return;
            c = ctl.get();
        }
        //判斷當前執行緒池狀態為 RUNNING,只有 RUNNING 才可以接收新任務
        //然後把 command 新增到任務佇列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次判斷當前執行緒池狀態為 RUNNING,不是的話,移除 command 任務
            if (! isRunning(recheck) && remove(command))
                reject(command);

            //如果在執行階段,而且 workCount 為 0,呼叫 addWorker 方法新增新執行緒
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //不能接受新任務,就新增新執行緒
        else if (!addWorker(command, false))
            //新增新執行緒失敗,就 reject
            reject(command);
    }

在 execute 中有一大段註釋,這段註釋就是對三個判斷條件的講解。這邊進行簡易的翻譯。
註釋翻譯:
(1)如果正在執行的執行緒數小於 corePoolSize,試著開啟一個新的執行緒,並且把 command 作為第一個任務,然後呼叫 addWorker。檢查 runState 和 workerCount,為了防止線上程不應該被新增的時候進行了新增操作,將會返回 false。
(2)如果一個任務可以被新增到佇列中,我們仍然需要再次檢查是否我們真的需要要新增一個執行緒(有可能在上次檢查完之後,有執行緒被回收)或者我們進入該方法後,執行緒池關閉了。所以我們需要再次檢查狀態,如果有必要的話,進行回滾佇列(就是指 remove 操作)或者啟動一個新的執行緒。
(3)如果我們不能對任務進行排隊,那麼我們嘗試新增一個新的執行緒。如果失敗了,那麼我們可以知道,執行緒池關閉或飽和了,因此拒絕這項任務。

這裡主要呼叫了兩個方法,addWorker 和 reject,reject相對比較簡單,我們先來檢視。

ThreadPoolExecutor 的 reject:

    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

handler 是我們傳遞進來的,是介面 RejectedExecutionHandler 的一個實現。出現錯誤,通過 handler ,返回給使用者進行處理。預設實現是 AbortPolicy,AbortPolicy 直接丟擲一個異常。

AbortPolicy :

   public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

還有一個重要的方法是 addWorker,這個方法是新建一個執行緒,其中第二個引數表示建立的執行緒是否為核心執行緒。

ThreadPoolExecutor 的 addWorker:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //rs >= SHUTDOWN,即執行緒池狀態不為 RUNNING
            //! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty())
            //當執行緒池狀態為 SHUTDOWN 的時候,前面說過,這時候不再接收新任務,但是可以處理佇列中的任務。
            //所以要求 firstTask 為空,並且佇列 workQueue 不為空
            //firstTask 不為空,表示新增任務,與 SHUTDOWN 狀態不符,所以 firstTask 必須為空
            //workQueue 為空,表示任務都已經處理完,所以不允許 workQueue 為空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            //進入內迴圈
            for (;;) {
                int wc = workerCountOf(c);
                //CAPACITYA 為 2^29-1,一般不會超過
                //當前執行緒數與核心執行緒數或者最大執行緒數進行比較(根據 core)
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //只是對計數器進行 +1 操作
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //走到這,表示計數器達到了最大值
                c = ctl.get();  // Re-read ctl
                //判斷當前狀態與剛才狀態是否一致
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //只有在這兩種情況下,程式才會跳出上方的內迴圈:
        //核心執行緒,當前執行緒數量小於 CAPACITY 和 corePoolSize 中的較小值 
        //非核心執行緒,當前執行緒數量小於 CAPACITY 和 maximumPoolSize 中的較小值。 

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //建立新執行緒,主要是通過 Worker 類,
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    //rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)
                    //判斷執行緒池是否允許處理任務
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //往工作集合 workers 中新增新建的 Worker
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //新增成功,就直接啟動
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //啟動執行緒失敗,則進行回滾
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

在這裡,啟動執行緒主要是依賴於 Worker 這個類,我們先檢視這個類的實現。

Worker:

    //Worker 繼承於 AbstractQueuedSynchronizer 是為了實現阻塞鎖和其他同步工具
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
         //在這裡獲取到 ThreadPoolExecutor 的執行緒工廠來建立執行緒
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        //實現的 Runnable 介面,真正的執行任務。
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

在 Worker 的 run 方法中,我們可以看到,真正執行任務的方法是 runWorker 這個方法。

ThreadPoolExecutor 的 runWorker:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();

        //w.firstTask 就是我們建立 Worker 時候傳遞的 Runnable 
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //拿到的任務不為 null
            //第一次進來肯定不為空
            //完成任務後 task 為空,就會執行 getTask,從任務佇列中去拿取任務
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //如果執行緒池已經停止了,那麼中斷當前執行緒,停止任務
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //執行任務,真正的執行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    //task 置空,再次迴圈時,就會去佇列中獲取任務
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

當第一次進入 while 迴圈的時候,task 不為空。runWorker 先執行一個 task 任務,執行完畢之後,會把 task 置空,然後再次 進入 while 迴圈,這時候 task 為空,會呼叫 getTask 從任務佇列中獲取任務,如果獲取到了,則繼續執行,執行完畢置空,再次迴圈。當 getTask 返回 null 的時候,那麼就會跳出 while 迴圈,結束 runWorker 這個方法,從而結束 run 方法,即當前執行緒結束。

我們前面提到,非核心執行緒在空閒一段時間後才會進行釋放,核心執行緒一直不會釋放,這是因為在 getTask 中進行了堵塞。接下來檢視 getTask 的實現。

getTask:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //必要時,檢查任務佇列是否為空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //當前執行緒是否可以被釋放
            //allowCoreThreadTimeOut 是表示核心執行緒是否有超時時間,可以通過方法進行設定
            // wc > corePoolSize 即當前執行緒池中的執行緒數大於核心執行緒數
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //當前執行緒是否可以被釋放
                //可以的話,呼叫 poll 方法,等待 keepAliveTime 
                //poll 傳下去的時間單位寫死,是在 ThreadPoolExecutor 建構函式中的進行了轉換 
                //不可以被釋放的話,呼叫 take 進行堵塞等待
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

三、執行緒池

1.執行緒池的優點

執行緒池相比執行緒有以下優點:

1、避免每次new Thread 新建物件
2、執行緒統一管理,重用存在的執行緒,減少物件建立、消亡的開銷。
3、可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。

2.常見的執行緒池

執行緒池一般都是通過 Executors 的工廠方法進行建立,我們常用的執行緒有4種:

CachedThreadPool:
Executors 的建立方法:

   public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool 沒有核心執行緒,最大執行緒數為 Integer.MAX_VALUE,基本上執行緒數達不到這個數,所以只要有需要,就會馬上新建執行緒。SynchronousQueue 不持有任務,只要一有任務,馬上交給執行緒去執行,設定的超時時間為 60s,如果有執行緒在等待的話,就使用這個執行緒,如果沒有執行緒在等待,就會去新建立執行緒。每個執行緒空閒 60s 後就會被釋放。

FixedSizeThreadPoolc:

Executors 的建立方法:

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

建立一個定長執行緒池,所有執行緒都是核心執行緒,這樣所有執行緒建立後都不會銷燬。可控制執行緒最大併發數,由於使用 LinkedBlockingQueue,任務佇列沒有限制,所有超出的任務會在佇列中等待。

ScheduledThreadPool:
Executors 的建立方法:

  public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

核心執行緒數量固定,為corePoolSize ,最大執行緒數為 Integer.MAX_VALUE,該執行緒池主要用於執行週期性的任務或在延時一段時間後執行任務。

SingleThreadPool:
Executors 的建立方法:

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。由於使用 LinkedBlockingQueue,任務佇列沒有限制,所有超出的任務會在佇列中等待。