1. 程式人生 > >含原始碼解析,深入Java 執行緒池原理

含原始碼解析,深入Java 執行緒池原理

從池化技術到底層實現,一篇文章帶你貫通執行緒池技術。

1、池化技術簡介

在系統開發過程中,我們經常會用到池化技術來減少系統消耗,提升系統性能。
在程式設計領域,比較典型的池化技術有:
執行緒池、連線池、記憶體池、物件池等。

物件池通過複用物件來減少建立物件、垃圾回收的開銷;連線池(資料庫連線池、Redis連線池和HTTP連線池等)通過複用TCP連線來減少建立和釋放連線的時間。執行緒池通過複用執行緒提升效能。簡單來說,池化技術就是通過複用來提升效能。

執行緒、記憶體、資料庫的連線物件都是資源,在程式中,當你建立一個執行緒或者在堆上申請一塊記憶體的時候都涉及到很多的系統呼叫,也是非常消耗CPU的。如果你的程式需要很多類似的工作執行緒或者需要頻繁地申請釋放小塊記憶體,在沒有對這方面進行優化的情況下,這部分程式碼很可能會成為影響你整個程式效能的瓶頸。

如果每次都是如此的建立執行緒->執行任務->銷燬執行緒,會造成很大的效能開銷。複用已建立好的執行緒可以提高系統的效能,藉助池化技術的思想,通過預先建立好多個執行緒,放在池中,這樣可以在需要使用執行緒的時候直接獲取,避免多次重複建立、銷燬帶來的開銷。

(1)執行緒池的優點

  • 執行緒是稀缺資源,使用執行緒池可以減少建立和銷燬執行緒的次數,每個工作執行緒都可以重複使用。
  • 可以根據系統的承受能力,調整執行緒池中工作執行緒的數量,防止因為消耗過多記憶體導致伺服器崩潰。

(2)執行緒池的風險

雖然執行緒池是構建多執行緒應用程式的強大機制,但使用它並不是沒有風險的。用執行緒池構建的應用程式容易遭受任何其它多執行緒應用程式容易遭受的所有併發風險,諸如同步錯誤和死鎖,它還容易遭受特定於執行緒池的少數其它風險,諸如與池有關的死鎖、資源不足和執行緒洩漏。

  • 死鎖

任何多執行緒應用程式都有死鎖風險。當一組程序或執行緒中的每一個都在等待一個只有該組中另一個程序才能引起的事件時,我們就說這組程序或執行緒 死鎖了。死鎖的最簡單情形是:執行緒 A 持有物件 X 的獨佔鎖,並且在等待物件 Y 的鎖,而執行緒 B 持有物件 Y 的獨佔鎖,卻在等待物件 X 的鎖。除非有某種方法來打破對鎖的等待(Java 鎖定不支援這種方法),否則死鎖的執行緒將永遠等下去。

  • 資源不足

執行緒池的一個優點在於:相對於其它替代排程機制(有些我們已經討論過)而言,它們通常執行得很好。但只有恰當地調整了執行緒池大小時才是這樣的。

執行緒消耗包括記憶體和其它系統資源在內的大量資源。除了

Thread 物件所需的記憶體之外,每個執行緒都需要兩個可能很大的執行呼叫堆疊。除此以外,JVM 可能會為每個 Java
執行緒建立一個本機執行緒,這些本機執行緒將消耗額外的系統資源。最後,雖然執行緒之間切換的排程開銷很小,但如果有很多執行緒,環境切換也可能嚴重地影響程式的效能。

如果執行緒池太大,那麼被那些執行緒消耗的資源可能嚴重地影響系統性能。線上程之間進行切換將會浪費時間,而且使用超出比您實際需要的執行緒可能會引起資源匱乏問題,因為池執行緒正在消耗一些資源,而這些資源可能會被其它任務更有效地利用。

除了執行緒自身所使用的資源以外,服務請求時所做的工作可能需要其它資源,例如 JDBC 連線、套接字或檔案,這些也都是有限資源,有太多的併發請求也可能引起失效,例如不能分配 JDBC 連線。

  • 併發錯誤

執行緒池和其它排隊機制依靠使用
wait() 和 notify()
方法,這兩個方法都難於使用。如果編碼不正確,那麼可能丟失通知,導致執行緒保持空閒狀態,儘管佇列中有工作要處理。使用這些方法時,必須格外小心;即便是專家也可能在它們上面出錯。而最好使用現有的、已經知道能工作的實現,例如在
util.concurrent 包。

  • 執行緒洩漏

各種型別的執行緒池中一個嚴重的風險是執行緒洩漏,當從池中除去一個執行緒以執行一項任務,而在任務完成後該執行緒卻沒有返回池時,會發生這種情況。發生執行緒洩漏的一種情形出現在任務丟擲一個 RuntimeException 或一個 Error 時。

如果池類沒有捕捉到它們,那麼執行緒只會退出而執行緒池的大小將會永久減少一個。當這種情況發生的次數足夠多時,執行緒池最終就為空,而且系統將停止,因為沒有可用的執行緒來處理任務。

  • 請求過載

僅僅是請求就壓垮了伺服器,這種情況是可能的。在這種情形下,我們可能不想將每個到來的請求都排隊到我們的工作佇列,因為排在佇列中等待執行的任務可能會消耗太多的系統資源並引起資源缺乏。在這種情形下決定如何做取決於您自己;在某些情況下,您可以簡單地拋棄請求,依靠更高級別的協議稍後重試請求,您也可以用一個指出伺服器暫時很忙的響應來拒絕請求。

2、 如何配置執行緒池大小配置

一般需要根據任務的型別來配置執行緒池大小:

  • 如果是CPU密集型任務,就需要儘量壓榨CPU,參考值可以設為 NCPU+1
  • 如果是IO密集型任務,參考值可以設定為2*NCPU

當然,這只是一個參考值,具體的設定還需要根據實際情況進行調整,比如可以先將執行緒池大小設定為參考值,再觀察任務執行情況和系統負載、資源利用率來進行適當調整。

3、執行緒池的底層原理

(1)執行緒池的狀態

執行緒池和執行緒一樣擁有自己的狀態,在ThreadPoolExecutor類中定義了一個volatile變數runState來表示執行緒池的狀態,執行緒池有四種狀態,分別為RUNNING、SHURDOWN、STOP、TERMINATED。

  • 執行緒池建立後處於RUNNING狀態。
  • 呼叫shutdown後處於SHUTDOWN狀態,執行緒池不能接受新的任務,會等待緩衝佇列的任務完成。
  • 呼叫shutdownNow後處於STOP狀態,執行緒池不能接受新的任務,並嘗試終止正在執行的任務。
  • 當執行緒池處於SHUTDOWN或STOP狀態,並且所有工作執行緒已經銷燬,任務快取佇列已經清空或執行結束後,執行緒池被設定為TERMINATED狀態。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

其中ctl這個AtomicInteger的功能很強大,其高3位用於維護執行緒池執行狀態,低29位維護執行緒池中執行緒數量

RUNNING:-1<<COUNT_BITS,即高3位為1,低29位為0,該狀態的執行緒池會接收新任務,也會處理在阻塞佇列中等待處理的任務

SHUTDOWN:0<<COUNT_BITS,即高3位為0,低29位為0,該狀態的執行緒池不會再接收新任務,但還會處理已經提交到阻塞佇列中等待處理的任務

STOP:1<<COUNT_BITS,即高3位為001,低29位為0,該狀態的執行緒池不會再接收新任務,不會處理在阻塞佇列中等待的任務,而且還會中斷正在執行的任務

TIDYING:2<<COUNT_BITS,即高3位為010,低29位為0,所有任務都被終止了,workerCount為0,為此狀態時還將呼叫terminated()方法

TERMINATED:3<<COUNT_BITS,即高3位為100,低29位為0,terminated()方法呼叫完成後變成此狀態

這些狀態均由int型表示,大小關係為 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,這個順序基本上也是遵循執行緒池從 執行 到 終止這個過程。

  • runStateOf(int c) 方法:c & 高3位為1,低29位為0的~CAPACITY,用於獲取高3位儲存的執行緒池狀態

  • workerCountOf(int c)方法:c & 高3位為0,低29位為1的CAPACITY,用於獲取低29位的執行緒數量

  • ctlOf(int rs, int wc)方法:引數rs表示runState,引數wc表示workerCount,即根據runState和workerCount打包合併成ctl

(2)為什麼ctl負責兩種角色

在Doug Lea的設計中,ctl負責兩種角色可以避免多餘的同步邏輯。

很多人會想,一個變量表示兩個值,就節省了儲存空間,但是這裡很顯然不是為了節省空間而設計的,即使將這輛個值拆分成兩個Integer值,一個執行緒池也就多了4個位元組而已,為了這4個位元組而去大費周章地設計一通,顯然不是Doug Lea的初衷。

在多執行緒的環境下,執行狀態和有效執行緒數量往往需要保證統一,不能出現一個改而另一個沒有改的情況,如果將他們放在同一個AtomicInteger中,利用AtomicInteger的原子操作,就可以保證這兩個值始終是統一的。

(3)執行緒池工作流程

預先啟動一些執行緒,執行緒無限迴圈從任務佇列中獲取一個任務進行執行,直到執行緒池被關閉。如果某個執行緒因為執行某個任務發生異常而終止,那麼重新建立一個新的執行緒而已,如此反覆。

一個任務從提交到執行完畢經歷過程如下:

第一步:如果當前執行緒池中的執行緒數目小於corePoolSize,則每來一個任務,就會建立一個執行緒去執行這個任務;

第二步:如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其新增到任務快取隊列當中,若新增成功,則該任務會等待空閒執行緒將其取出去執行;若新增失敗(一般來說是任務快取佇列已滿),則會嘗試建立新的執行緒去執行這個任務;

第三步:如果執行緒池中的執行緒數量大於等於corePoolSize,且佇列workQueue已滿,但執行緒池中的執行緒數量小於maximumPoolSize,則會建立新的執行緒來處理被新增的任務

第四步:如果當前執行緒池中的執行緒數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;

流程圖如下:

4、ThreadPoolExecutor解析

ThreadPoolExecutor繼承自AbstractExecutorService,同時實現了ExecutorService介面,也是Executor框架預設的執行緒池實現類,一般我們使用執行緒池,如沒有特殊要求,直接建立ThreadPoolExecutor,初始化一個執行緒池,如果需要特殊的執行緒池,則直接繼承ThreadPoolExecutor,並實現特定的功能,如ScheduledThreadPoolExecutor,它是一個具有定時執行任務的執行緒池。

(1)Executor框架

在深入原始碼之前先來看看J.U.C包中的執行緒池類圖:

它們的最頂層是一個Executor介面,它只有一個方法:

public interface Executor {
    void execute(Runnable command);
}

它提供了一個執行新任務的簡單方法,Java執行緒池也稱之為Executor框架。

ExecutorService擴充套件了Executor,添加了操控執行緒池生命週期的方法,如shutDown(),shutDownNow()等,以及擴充套件了可非同步跟蹤執行任務生成返回值Future的方法,如submit()等方法。

(2)Worker解析

Worker類繼承了AQS,並實現了Runnable介面,它有兩個重要的成員變數:firstTask和thread。firstTask用於儲存第一次新建的任務;thread是在呼叫構造方法時通過ThreadFactory來建立的執行緒,是用來處理任務的執行緒。

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

需要注意workers的資料結構為HashSet,非執行緒安全,所以操作workers需要加同步鎖。新增步驟做完後就啟動執行緒來執行任務了。

 /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

(3)如何線上程池中新增任務

執行緒池要執行任務,那麼必須先新增任務,execute()雖說是執行任務的意思,但裡面也包含了新增任務的步驟在裡面,下面原始碼:

public void execute(Runnable command) {
  // 如果新增訂單任務為空,則空指標異常
  if (command == null)
    throw new NullPointerException();
  // 獲取ctl值
  int c = ctl.get();
  // 1.如果當前有效執行緒數小於核心執行緒數,呼叫addWorker執行任務(即建立一條執行緒執行該任務)
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  // 2.如果當前有效執行緒大於等於核心執行緒數,並且當前執行緒池狀態為執行狀態,則將任務新增到阻塞佇列中,等待空閒執行緒取出佇列執行
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  // 3.如果阻塞佇列已滿,則呼叫addWorker執行任務(即建立一條執行緒執行該任務)
  else if (!addWorker(command, false))
    // 如果建立執行緒失敗,則呼叫執行緒拒絕策略
    reject(command);
}

addWorker新增任務,方法原始碼有點長,按照邏輯拆分成兩部分講解:

java.util.concurrent.ThreadPoolExecutor#addWorker:

retry:
for (;;) {
  int c = ctl.get();
  // 獲取執行緒池當前執行狀態
  int rs = runStateOf(c);

  // 如果rs大於SHUTDOWN,則說明此時執行緒池不在接受新任務了
  // 如果rs等於SHUTDOWN,同時滿足firstTask為空,且阻塞佇列如果有任務,則繼續執行任務
  // 也就說明了如果執行緒池處於SHUTDOWN狀態時,可以繼續執行阻塞佇列中的任務,但不能繼續往執行緒池中新增任務了
  if (rs >= SHUTDOWN &&
      ! (rs == SHUTDOWN &&
         firstTask == null &&
         ! workQueue.isEmpty()))
    return false;

  for (;;) {
    // 獲取有效執行緒數量
    int wc = workerCountOf(c);
    // 如果有效執行緒數大於等於執行緒池所容納的最大執行緒數(基本不可能發生),不能新增任務
    // 或者有效執行緒數大於等於當前限制的執行緒數,也不能新增任務
    // 限制執行緒數量有任務是否要核心執行緒執行決定,core=true使用核心執行緒執行任務
    if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
      return false;
    // 使用AQS增加有效執行緒數量
    if (compareAndIncrementWorkerCount(c))
      break retry;
    // 如果再次獲取ctl變數值
    c = ctl.get();  // Re-read ctl
    // 再次對比執行狀態,如果不一致,再次迴圈執行
    if (runStateOf(c) != rs)
      continue retry;
    // else CAS failed due to workerCount change; retry inner loop
  }
}

這裡特別強調,firstTask是開啟執行緒執行的首個任務,之後常駐線上程池中的執行緒執行的任務都是從阻塞佇列中取出的,需要注意。

以上for迴圈程式碼主要作用是判斷ctl變數當前的狀態是否可以新增任務,特別說明了如果執行緒池處於SHUTDOWN狀態時,可以繼續執行阻塞佇列中的任務,但不能繼續往執行緒池中新增任務了;同時增加工作執行緒數量使用了AQS作同步,如果同步失敗,則繼續迴圈執行。

// 任務是否已執行
boolean workerStarted = false;
// 任務是否已新增
boolean workerAdded = false;
// 任務包裝類,我們的任務都需要新增到Worker中
Worker w = null;
try {
  // 建立一個Worker
  w = new Worker(firstTask);
  // 獲取Worker中的Thread值
  final Thread t = w.thread;
  if (t != null) {
    // 操作workers HashSet 資料結構需要同步加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      // Recheck while holding lock.
      // Back out on ThreadFactory failure or if
      // shut down before lock acquired.
      // 獲取當前執行緒池的執行狀態
      int rs = runStateOf(ctl.get());
      // rs < SHUTDOWN表示是RUNNING狀態;
      // 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態並且firstTask為null,向執行緒池中新增執行緒。
      // 因為在SHUTDOWN時不會在新增新的任務,但還是會執行workQueue中的任務
      // rs是RUNNING狀態時,直接建立執行緒執行任務
      // 當rs等於SHUTDOWN時,並且firstTask為空,也可以建立執行緒執行任務,也說說明了SHUTDOWN狀態時不再接受新任務
      if (rs < SHUTDOWN ||
          (rs == SHUTDOWN && firstTask == null)) {
        if (t.isAlive()) // precheck that t is startable
          throw new IllegalThreadStateException();
        workers.add(w);
        int s = workers.size();
        if (s > largestPoolSize)
          largestPoolSize = s;
        workerAdded = true;
      }
    } finally {
      mainLock.unlock();
    }
    // 啟動執行緒執行任務
    if (workerAdded) {
      t.start();
      workerStarted = true;
    }
  }
} finally {
  if (! workerStarted)
    addWorkerFailed(w);
}
return workerStarted;
}

以上原始碼主要的作用是建立一個Worker物件,並將新的任務裝進Worker中,開啟同步將Worker新增進workers中,這裡需要注意workers的資料結構為HashSet,非執行緒安全,所以操作workers需要加同步鎖。新增步驟做完後就啟動執行緒來執行任務了,繼續往下看。

(4)前置和後置鉤子

如果需要在任務執行前後插入邏輯,你可以實現ThreadPoolExecutor以下兩個方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

這樣一來,就可以對任務的執行進行實時監控。

5、執行緒池總結

執行緒池原理關鍵技術:鎖(lock,cas)、阻塞佇列、hashSet(資源池)

所謂執行緒池本質是一個Worker物件的hashSet,多餘的任務會放在阻塞佇列中,只有當阻塞佇列滿了後,才會觸發非核心執行緒的建立,非核心執行緒只是臨時過來打雜的,直到空閒,然後自己關閉。
執行緒池提供了兩個鉤子(beforeExecute,afterExecute)給我們,我們繼承執行緒池,在執行任務前後做一些事情