1. 程式人生 > >Java多執行緒之ThreadPoolExecutor實現原理和原始碼分析(五)

Java多執行緒之ThreadPoolExecutor實現原理和原始碼分析(五)

章節概覽、

1、概述

執行緒池的顧名思義,就是執行緒的一個集合。需要用到執行緒,從集合裡面取出即可。這樣設計主要的作用是優化執行緒的建立和銷燬而造成的資源浪費的情況。Java中的執行緒池的實現主要是JUC下面的ThreadPoolExecutor類完成的。下面我們做的原始碼分析都是基於ThreadPoolExecutor類進行分析。

2、執行緒池實現類圖UML

在這裡插入圖片描述

從類繼承圖可以看到,ThreadPoolExecutor 繼承 AbstractExecutorService 抽象類。而AbstractExecutorService 實現了ExecutorService介面。ExecutorService 介面又繼承了Executor介面。下面分析下這幾個介面。

2.1、 核心介面分析
2.1.1、 Executor介面原始碼分析
public interface Executor {
// 執行一個任務。任務都被封裝成Runnable的實現
    void execute(Runnable command);
}
2.1.2 ExecutorService介面原始碼分析
public interface ExecutorService extends Executor {
	
// 啟動有序的關閉,之前提交的任務將會被執行,但不會接受新的任務。
    void shutdown();

// 嘗試停止所有正在執行的任務,停止等待處理的任務,病返回任務列表
    List<Runnable> shutdownNow();

// 判斷執行緒池是否已經關閉
    boolean isShutdown();

// 如果關閉後所有任務都已完成。 但是前提是必須先執行:shutdown 或者 shutdownNow
    boolean isTerminated();

// 在開啟shutdown之後,阻止所有的任務知道執行完成
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
        
// 提交任務,帶返回結果的
    <T> Future<T> submit(Callable<T> task);

// 提交任務,封裝返回結果為T
    <T> Future<T> submit(Runnable task, T result);

 // 提交一個普通任務,返回結果任意
    Future<?> submit(Runnable task);

// 執行一批任務,返回結果為 List<Future<T>>
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
2.2 核心內部類分析

從繼承圖來看,其具有5個核心的內部類。其中4內部類對應的是拒絕策略。Worker是核心的執行程式碼。下面我們看下拒絕策略類的結構以及策略的運用場景

2.2.1、 RejectedExecutionHandler 介面
public interface RejectedExecutionHandler {
// 拒絕執行策略
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
2.2.2、 AbortPolicy 策略

Java執行緒池預設的阻塞策略,不執行此任務,而且直接丟擲一個執行時異常。

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        // 直接丟擲異常,描述前執行緒的基本資訊
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
2.2.3、DiscardPolicy策略

空方法,不做任何處理

public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
2.2.4、DiscardOldestPolicy 策略

從佇列裡面拋棄一個最老的任務,並再次execute 此task

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }
       
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
            	// 從佇列裡面取出最老的一個任務
                e.getQueue().poll();
                // 手動呼叫execute方法執行,將任務新增到佇列中
                e.execute(r);
            }
        }
    }
2.2.5、CallerRunsPolicy 策略
 public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

      // 如果當前執行緒池沒有關閉,則呼叫執行緒的run方法
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

3、ThreadPoolExecutor建構函式核心成員變數分析

3.1、建構函式詳解
public class ThreadPoolExecutor extends AbstractExecutorService {
	public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
}

建構函式引數說明

  • corePoolSize
    執行緒池中的核心執行緒數,空閒時候執行緒也不會回收,除非把allowCoreThreadTimeOut設定為 true,這時核心執行緒才會被回收。
  • maximumPoolSize
    執行緒池中可以建立的最大執行緒數,限定為2^29-1。
  • keepAliveTime
    當執行緒池中建立的執行緒超過了核心執行緒數的時候,在沒有新任務加入的等待時間。
  • unit
    keepAliveTime的時間單位,可以是納秒,微秒,毫秒,秒,分鐘,小時,天。
  • workQueue
    存放任務的佇列,只有當執行緒數 > 核心執行緒數,才會把其他的任務放入queue,一般常用的是queue就是ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue。
  • threadFactory
    建立執行緒的工廠類。
  • handler
    當queue滿了和執行緒數達到最大限制,對於繼續到達的任務採取的策略。預設採取AbortPolicy , 也就是拒絕策略,直接丟擲異常
3.2、核心成員變數分析

執行緒池中設計非常巧妙的一個地方是把執行緒池的狀態和執行的執行緒數量用一個int型別進行儲存。這樣一來可以保持執行緒池狀態和執行緒池活躍執行緒數量的一致性。因為AtomicInteger是執行緒安全的。

  1. workerCount:執行緒池中當前活動的執行緒數量,佔據ctl的低29位;
  2. runState:執行緒池執行狀態,佔據ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五種狀態

為了將執行緒池的狀態和執行緒池中的工作執行緒的數量放到一個int裡面進行管理。他們利用了二進位制資料進行位運算。其中int型別有4個位元組,一個位元組8位。總共有32位。其中高的3位表示執行緒的狀態。低29位代表執行緒的數量。

其中32位中,高三位代表的是狀態:

  • 111 > RUNNING
  • 000 > SHUTDOWN
  • 001 > STOP
  • 010 > TIDYING
  • 110 > TERMINATED

低29位代表執行緒的數量。所以最大的執行緒數為 2^29 -1 = 536870911

// 記錄執行緒池狀態和執行緒數量(總共32位,前三位表示執行緒池狀態,後29位表示執行緒數量),保證執行緒安全性
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// int 位元組32位,COUNT_BITS代表的是29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
// 執行緒的最大容量: 000 11111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 執行狀態: 111 00000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
// 關閉狀態: 000 00000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 停止狀態: 001 00000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
// 整理狀態: 010 00000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
// 終止狀態: 011 00000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;

/**
* 是按位取反的意思,CAPACITY表示的是高位的3個0,和低位的29個1,而~CAPACITY則表示高位的3個1,2低位的9個0,
* 然後再與入參c執行按位與操作,即高3位保持原樣,低29位全部設定為0,也就獲取了執行緒池的執行狀態runState
*/
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
/**
* 返回當前執行緒的數量。其中c代表執行緒池的狀態,即是高三位。:
* 而CAPACITY 代表的是執行緒的容量,即000 11111111111111111111111111111
* c & CAPACITY ,只有當都為1的時候,才為真,這樣直接捨棄高位
*/
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
/**
* 傳入的rs表示執行緒池執行狀態runState,其是高3位有值,低29位全部為0的int,
* 而wc則代表執行緒池中有效執行緒的數量workerCount,其為高3位全部為0,而低29位有值得int,
* 將runState和workerCount做或操作|處理,即用runState的高3位,workerCount的低29位填充的數字,而預設傳入的
*/
    private static int ctlOf(int rs, int wc) { return rs | wc; }

執行緒池的狀態轉換:

// 呼叫了shutdown()方法 
RUNNING -> SHUTDOWN 

// 呼叫了shutdownNow() 
(RUNNING 或 SHUTDOWN) -> STOP 

// 當佇列和執行緒池為空 
SHUTDOWN -> TIDYING 

// 當執行緒池為空 
STOP -> TIDYING 

// 當terminated()鉤子方法執行完成 
TIDYING -> TERMINATED 

4、執行流程核心原始碼分析

4.1、程式入口:execute 方法
/*
 * 我們以execute 方法作為程式的入口開始分析
 */
public void execute(Runnable command) {
// 判斷當前任務是否為null,如果為null,直接丟擲異常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 有以下3個步驟
         *
         * 1.如果少於corePoolSize的執行緒在執行,那麼試著啟動一個新執行緒,其中用給定指令作為first task。
         * 這會呼叫addWorker去原子性得檢查runState和workerCoune,因此可以防止錯誤報警,在錯誤報警不應該時通過返回false來新增執行緒
         * 2.如果任務被成功排隊,我們任然應該第二次檢查是否新增一個新執行緒(因為可能存在在最後一次檢查後掛掉的情況)
         * 或者在進入這個方法期間執行緒池shutdown。所以我們再次檢查狀態,如果已關閉和有必要則退出佇列,或者如果沒有的話就開始一個新的執行緒。
         * 3.如果我們無法將task入隊,那麼我們試圖新增新執行緒。如果失敗,那麼知道我們shutdown或者是飽和的並拒絕task。
         */
		
      // 獲取ctl的初始值。其初始值是:rs | wc,即狀態位和執行緒數量高低位互補  
        int c = ctl.get();
        // 獲取當前執行緒的數量,初始化的時候數量為0,和當前 corePoolSize 比較
        if (workerCountOf(c) < corePoolSize) {
       	    // 如果條件成立,呼叫addWorker(command, true)
       	    // 原始碼分析請看:4.2、boolean addWorker(Runnable firstTask, boolean core)
       	    // 從addWorker原始碼分析有得出,只要當前的workerCountOf(c) < corePoolSize 條件成立,就會往執行緒池裡面加入一個執行緒
            // 當前加入的執行緒會被初始化到Worker中,通過firstTask進行設定
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 當前執行緒池的數量達到corePoolSize的時候
        // 驗證當前執行緒池是否處於執行狀態。如果處於執行狀態。將當前的任務新增到任務佇列中。
        // offer方法新增一個元素並返回為true。如果佇列已滿,這返回false
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 新增任務到佇列成功以後,在此判斷當前執行緒池是否執行狀態
            // 如果執行緒池沒有處於執行狀態,則從佇列中移除當前任務,同時執行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 由於存在核心執行緒的過期策略,可能這個時候當前執行緒池中的執行緒已經都過期清理了
            // 所以這裡進一步的進行檢測,獲取當前執行緒的個數。如果執行緒個數為0的話,則新建一個執行緒worker
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果當前執行緒池執行正常,且新增任務到佇列失敗。這時重新啟動一個worker執行緒去執行
        // 此時執行緒池的最多的執行緒數量,wc < maximumPoolSize
        // 嘗試直接添一個新的worker執行緒。如果新增失敗,執行拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }
4.2、 boolean addWorker(Runnable firstTask, boolean core)

addWorker 中的兩個方法引數,firstTask代表當前需要執行的任務。
core的含義有如下:

  1. 如果滿足workerCountOf© < corePoolSize ,則為true
  2. 如果不滿足 workerCountOf© < corePoolSize 且新增任務workQueue.offer(command) 失敗。這時候傳入的為false。
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
           // 獲取當前ctl的最新值
            int c = ctl.get();
            // 獲取當前執行緒池的執行狀態
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 判斷當前執行緒是否是否已經結束。檢查當前佇列是否為null
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 獲取當前執行緒的數量
                int wc = workerCountOf(c);
                // 判斷當前執行緒的數量是否超過最大值
 				// 這裡從core的細節上面已經說明。如果為true 則使用 corePoolSize,反之使用 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               // 通過CAS設定當前的workerCount:ctl.compareAndSet(expect, expect + 1);
               // 當前的執行緒數量 + 1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           // 初始化Worker物件,傳入第一個需要執行的任務
           // Woker類的分析,請看: 4.3、Woker類原始碼分析
            w = new Worker(firstTask);
            // 獲取 worker物件內部封裝的thread執行緒
            final Thread t = w.thread;
            if (t != null) {
                // 同步程式碼塊,保證當前執行緒池狀態的一致性。因為workers是共享變數
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
          
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 檢查當前執行緒是否被啟動。
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將當前執行緒加入到執行緒池中
                        workers.add(w);
                        int s = workers.size();
                        // 重置當前執行緒池擁有的執行緒個數
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 新增成功,啟動執行緒
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 新增失敗,做失敗清理
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
4.3、Woker類原始碼分析
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        // 完成的任務數量
        volatile long completedTasks;
        // 建構函式
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            // 初始化成員變數 firstTask
            this.firstTask = firstTask;
            // 初始化當前執行緒,通過執行緒工場。具體原始碼請參考:4.4、ThreadFactory 原始碼分析
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            // 執行當前傳入的任務,具體實現,請參考:4.5、void runWorker(Worker w) 原始碼分析
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

4.4、ThreadFactory 原始碼分析
static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
	
       // 建立執行緒
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            // 設定當前執行緒為非後臺執行緒
            if (t.isDaemon())
                t.setDaemon(false);
            // 設定當前執行緒的優先順序為正常優先順序
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
4.5、void runWorker(Worker w) 原始碼分析
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 當執行緒池中的執行緒數量未達到corePoolSize大小的時候,每次都會先建立一個Worker物件,把當前的任務複製給firstTask
        // 直到當前的執行緒池中的執行緒數量和corePoolSize大小相等,每次新加任務都會存入到任務佇列中
        Runnable task = w.firstTask;
        // 置空當前的firstTask,主要是為了方便GC
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           // 當前的firstTask != null 
           // getTask 獲取的任務也不為空,getTask方法詳解,請參考:4.6、Runnable getTask() 原始碼分析
            while (task != null || (task = getTask()) != null) {
                // 獲取當前鎖資源
                w.lock();
                // 如果池正在停止,請確保執行緒被中斷;
				// 如果沒有,請確保執行緒不被中斷。 這個
				// 需要在第二種情況下重新檢查才能處理
				// shutdown在清除中斷時正在比賽
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                    
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 執行當前的任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 置空當前的 task,方便GC
                    task = null;
                    // 當前的完成任務++
                    w.completedTasks++;
                    // 釋放當前的鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        	// 當判斷條件 task != null || (task = getTask()) != null 不成立的時候,刪除當前Woker
            processWorkerExit(w, completedAbruptly);
        }
    }
4.6、Runnable getTask() 原始碼分析
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 判斷當前的任務佇列是否為null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            // 獲取當前的執行緒數量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
           
            // 如果當前的 allowCoreThreadTimeOut 設定為 ture,或者 wc > corePoolSize 的情況 為 ture
            // 當前執行緒佇列已滿,才會出現 wc > corePoolSize的
            // 通過這段設定,用於判斷核心執行緒空閒時,是否需要清理
            // 其次當執行緒數高於核心執行緒數時,是否需要清理執行緒
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize 的情況

            // 判斷當前的執行緒數是否大於最大的執行緒數
            // wc > 1 或者 workQueue為空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 減少當前的執行緒數量,通過CAS
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 根據  timed判斷,通過哪種方式獲取當前的任務
                Runnable r = timed ?
                    // timed 為false,說明當前執行緒池執行緒數量超過了核心數量
                    //  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 表示等待 keepAliveTime 的時間之後
                    // 沒有任務的話,直接返回 null。而這個返回為null,直接影響當前執行緒是否被回收的前提條件。
                    // 執行緒迴圈條件:while (task != null || (task = getTask()) != null) 如果 返回 task = null。則直接跳出迴圈
                    // 通過 processWorkerExit(w, completedAbruptly) 進行執行緒的回收
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // 移除並返回佇列的頭部元素,如果佇列為空,則阻塞
                    workQueue.take();
                // 如果 r != null。 則返回
                if (r != null)
                    return r;
                // 如果獲取失敗,設定當前timeOut為超時,接著迴圈
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

ThreadPoolExecutor 的核心理財原始碼已經分析完了

5、總結

  1. 當往執行緒池中新增任務的時候,每次新增一個任務都回去新增一個執行緒。直到不滿足 wc < corePoolSize
  2. 當前執行緒池的大小已經達到了corePoolSize的時候,每次新增任務會被存放到阻塞任務佇列中。等待執行
  3. 等等待任務佇列也滿的時候,且新增失敗。此時在來新的任務,就會接著增加執行緒的個數,直到滿足:wc >= maximumPoolSize ,新增執行緒失敗執行拒絕策略。
  4. 執行緒池中,把執行緒的狀態和數量通過int型別進行維護,高三位表示狀態,低29位表示執行緒數量。這樣可以保證執行緒的狀態和數量的一致性