1. 程式人生 > >java多線程系列:ThreadPoolExecutor源碼分析

java多線程系列:ThreadPoolExecutor源碼分析

構造 exce current ads cut interface time urn 控制

前言

這篇主要講述ThreadPoolExecutor的源碼分析,貫穿類的創建、任務的添加到線程池的關閉整個流程,讓你知其然所以然。希望你可以通過本篇博文知道ThreadPoolExecutor是怎麽添加任務、執行任務的,以及延伸的知識點。那麽先來看看ThreadPoolExecutor的繼承關系吧。

繼承關系

技術分享圖片

Executor接口

public interface Executor {
    void execute(Runnable command);
}

Executor接口只有一個方法execute,傳入線程任務參數

ExecutorService接口

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

   
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService接口繼承Executor接口,並增加了submit、shutdown、invokeAll等等一系列方法。

AbstractExecutorService抽象類

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {...}

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {... }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {...}

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {...}

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {...}

}

AbstractExecutorService抽象類實現ExecutorService接口,並且提供了一些方法的默認實現,例如submit方法、invokeAny方法、invokeAll方法。

像execute方法、線程池的關閉方法(shutdown、shutdownNow等等)就沒有提供默認的實現。

ThreadPoolExecutor

先介紹下ThreadPoolExecutor線程池的狀態吧

線程池狀態

int 是4個字節,也就是32位(註:一個字節等於8位

//記錄線程池狀態和線程數量(總共32位,前三位表示線程池狀態,後29位表示線程數量)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程數量統計位數29  Integer.SIZE=32 
private static final int COUNT_BITS = Integer.SIZE - 3;
//容量 000 11111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//運行中 111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
//關閉 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//停止 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
//整理 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
//終止 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

//獲取運行狀態(獲取前3位)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//獲取線程個數(獲取後29位)
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • RUNNING:接受新任務並且處理阻塞隊列裏的任務
  • SHUTDOWN:拒絕新任務但是處理阻塞隊列裏的任務
  • STOP:拒絕新任務並且拋棄阻塞隊列裏的任務同時會中斷正在處理的任務
  • TIDYING:所有任務都執行完(包含阻塞隊列裏面任務)當前線程池活動線程為0,將要調用terminated方法
  • TERMINATED:終止狀態。terminated方法調用完成以後的狀態

線程池狀態轉換

RUNNING -> SHUTDOWN
   顯式調用shutdown()方法, 或者隱式調用了finalize()方法
(RUNNING or SHUTDOWN) -> STOP
   顯式調用shutdownNow()方法
SHUTDOWN -> TIDYING
   當線程池和任務隊列都為空的時候
STOP -> TIDYING
   當線程池為空的時候
TIDYING -> TERMINATED
   當 terminated() hook 方法執行完成時候

構造函數

有四個構造函數,其他三個都是調用下面代碼中的這個構造函數

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
}

參數介紹

參數 類型 含義
corePoolSize int 核心線程數
maximumPoolSize int 最大線程數
keepAliveTime long 存活時間
unit TimeUnit 時間單位
workQueue BlockingQueue 存放線程的隊列
threadFactory ThreadFactory 創建線程的工廠
handler RejectedExecutionHandler 多余的的線程處理器(拒絕策略)

提交任務

submit

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

流程步驟如下

  1. 調用submit方法,傳入Runnable或者Callable對象
  2. 判斷傳入的對象是否為null,為null則拋出異常,不為null繼續流程
  3. 將傳入的對象轉換為RunnableFuture對象
  4. 執行execute方法,傳入RunnableFuture對象
  5. 返回RunnableFuture對象

流程圖如下

技術分享圖片

execute

public void execute(Runnable command) {
   //傳進來的線程為null,則拋出空指針異常
   if (command == null)
       throw new NullPointerException();
  
   //獲取當前線程池的狀態+線程個數變量
   int c = ctl.get();
   /**
    * 3個步驟
    */
   //1.判斷當前線程池線程個數是否小於corePoolSize,小於則調用addWorker方法創建新線程運行,且傳進來的Runnable當做第一個任務執行。
   //如果調用addWorker方法返回false,則直接返回
   if (workerCountOf(c) < corePoolSize) {
       if (addWorker(command, true))
           return;
       c = ctl.get();
   }

   //2.如果線程池處於RUNNING狀態,則添加任務到阻塞隊列
   if (isRunning(c) && workQueue.offer(command)) {

       //二次檢查
       int recheck = ctl.get();
       //如果當前線程池狀態不是RUNNING則從隊列刪除任務,並執行拒絕策略
       if (! isRunning(recheck) && remove(command))
           reject(command);

       //否者如果當前線程池線程空,則添加一個線程
       else if (workerCountOf(recheck) == 0)
           addWorker(null, false);
   }
   //3.新增線程,新增失敗則執行拒絕策略
   else if (!addWorker(command, false))
       reject(command);
}

其實從上面代碼註釋中可以看出就三個判斷,

  1. 核心線程數是否已滿
  2. 隊列是否已滿
  3. 線程池是否已滿

然後根據這三個條件進行不同的操作,下圖是Java並發編程的藝術書中的線程池的主要處理流程,或許會比較容易理解些

技術分享圖片

下面是整個流程的詳細步驟

  1. 調用execute方法,傳入Runable對象
  2. 判斷傳入的對象是否為null,為null則拋出異常,不為null繼續流程
  3. 獲取當前線程池的狀態和線程個數變量
  4. 判斷當前線程數是否小於核心線程數,是走流程5,否則走流程6
  5. 添加線程數,添加成功則結束,失敗則重新獲取當前線程池的狀態和線程個數變量,
  6. 判斷線程池是否處於RUNNING狀態,是則添加任務到阻塞隊列,否則走流程10,添加任務成功則繼續流程7
  7. 重新獲取當前線程池的狀態和線程個數變量
  8. 重新檢查線程池狀態,不是運行狀態則移除之前添加的任務,有一個false走流程9,都為true則走流程11
  9. 檢查線程池線程數量是否為0,否則結束流程,是調用addWorker(null, false),然後結束
  10. 調用!addWorker(command, false),為true走流程11,false則結束
  11. 調用拒絕策略reject(command),結束

可能看上面會有點繞,不清楚的可以看下面的流程圖

技術分享圖片

 addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 檢查當前線程池狀態是否是SHUTDOWN、STOP、TIDYING或者TERMINATED
        // 且!(當前狀態為SHUTDOWN、且傳入的任務為null,且隊列不為null)
        // 條件都成立則返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        //循環
        for (;;) {
            int wc = workerCountOf(c);
            //如果當前的線程數量超過最大容量或者大於(根據傳入的core決定是核心線程數還是最大線程數)核心線程數 || 最大線程數,則返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //CAS增加c,成功則跳出retry
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //CAS失敗執行下面方法,查看當前線程數是否變化,變化則繼續retry循環,沒變化則繼續內部循環
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    //CAS成功
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //新建一個線程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //加鎖
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                
                //重新檢查線程池狀態
                //避免ThreadFactory退出故障或者在鎖獲取前線程池被關閉
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // 先檢查線程是否是可啟動的
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //判斷worker是否添加成功,成功則啟動線程,然後將workerStarted設置為true
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //判斷線程有沒有啟動成功,沒有則調用addWorkerFailed方法
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這裏可以將addWorker分為兩部分,第一部分增加線程池個數,第二部分是將任務添加到workder裏面並執行。

第一部分主要是兩個循環,外層循環主要是判斷線程池狀態,下面描述來自Java中線程池ThreadPoolExecutor原理探究

rs >= SHUTDOWN &&
              ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty())

展開!運算後等價於

s >= SHUTDOWN &&
               (rs != SHUTDOWN ||
             firstTask != null ||
             workQueue.isEmpty())

也就是說下面幾種情況下會返回false:

  • 當前線程池狀態為STOP,TIDYING,TERMINATED
  • 當前線程池狀態為SHUTDOWN並且已經有了第一個任務
  • 當前線程池狀態為SHUTDOWN並且任務隊列為空

內層循環作用是使用cas增加線程個數,如果線程個數超限則返回false,否者進行cas,cas成功則退出雙循環,否者cas失敗了,要看當前線程池的狀態是否變化了,如果變了,則重新進入外層循環重新獲取線程池狀態,否者進入內層循環繼續進行cas嘗試。

到了第二部分說明CAS成功了,也就是說線程個數加一了,但是現在任務還沒開始執行,這裏使用全局的獨占鎖來控制workers裏面添加任務,其實也可以使用並發安全的set,但是性能沒有獨占鎖好(這個從註釋中知道的)。這裏需要註意的是要在獲取鎖後重新檢查線程池的狀態,這是因為其他線程可可能在本方法獲取鎖前改變了線程池的狀態,比如調用了shutdown方法。添加成功則啟動任務執行。

所以這裏也將流程圖分為兩部分來描述

第一部分流程圖

技術分享圖片

第二部分流程圖

技術分享圖片

Worker對象

Worker是定義在ThreadPoolExecutor中的finnal類,其中繼承了AbstractQueuedSynchronizer類和實現Runnable接口,其中的run方法如下

public void run() {
    runWorker(this);
}

線程啟動時調用了runWorker方法,關於類的其他方面這裏就不在敘述。

runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        //循環獲取任務
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 當線程池是處於STOP狀態或者TIDYING、TERMINATED狀態時,設置當前線程處於中斷狀態
            // 如果不是,當前線程就處於RUNNING或者SHUTDOWN狀態,確保當前線程不處於中斷狀態
            // 重新檢查當前線程池的狀態是否大於等於STOP狀態
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //提供給繼承類使用做一些統計之類的事情,在線程運行前調用
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //提供給繼承類使用做一些統計之類的事情,在線程運行之後調用
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //統計當前worker完成了多少個任務
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //整個線程結束時調用,線程退出操作。統計整個線程池完成的任務個數之類的工作
        processWorkerExit(w, completedAbruptly);
    }
}

getTask

getTask方法的主要作用其實從方法名就可以看出來了,就是獲取任務

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    //循環
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //線程線程池狀態和隊列是否為空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        //線程數量
        int wc = workerCountOf(c);

        
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //(當前線程數是否大於最大線程數或者)
        //且(線程數大於1或者任務隊列為空)
        //這裏有個問題(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //獲取任務
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

關閉線程池

shutdown

當調用shutdown方法時,線程池將不會再接收新的任務,然後將先前放在隊列中的任務執行完成。

下面是shutdown方法的源碼

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

shutdownNow

立即停止所有的執行任務,並將隊列中的任務返回

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutdown和shutdownNow區別

shutdown和shutdownNow這兩個方法的作用都是關閉線程池,流程大致相同,只有幾個步驟不同,如下

  1. 加鎖
  2. 檢查關閉權限
  3. CAS改變線程池狀態
  4. 設置中斷標誌(線程池不在接收任務,隊列任務會完成)/中斷當前執行的線程
  5. 調用onShutdown方法(給子類提供的方法)/獲取隊列中的任務
  6. 解鎖
  7. 嘗試將線程池狀態變成終止狀態TERMINATED
  8. 結束/返回隊列中的任務

總結

線程池可以給我們多線程編碼上提供極大便利,就好像數據庫連接池一樣,減少了線程的開銷,提供了線程的復用。而且ThreadPoolExecutor也提供了一些未實現的方法,供我們來使用,像beforeExecute、afterExecute等方法,我們可以通過這些方法來對線程進行進一步的管理和統計。

在使用線程池上好需要註意,提交的線程任務可以分為CPU 密集型任務IO 密集型任務,然後根據任務的不同進行分配不同的線程數量。

  • CPU密集型任務:
    • 應當分配較少的線程,比如 CPU個數相當的大小
  • IO 密集型任務:
    • 由於線程並不是一直在運行,所以可以盡可能的多配置線程,比如 CPU 個數 * 2
  • 混合型任務:
    • 可以將其拆分為 CPU 密集型任務以及 IO 密集型任務,這樣來分別配置。

好了,這篇博文到這裏就結束了,文中可能會有些紕漏,歡迎留言指正。

如果本文對你有所幫助,給個star唄,謝謝。本文GitHub地址:點這裏點這裏

參考資料

  1. 並發編程網-Java中線程池ThreadPoolExecutor原理探究
  2. Java並發編程的藝術

java多線程系列:ThreadPoolExecutor源碼分析