1. 程式人生 > >高併發之——從原始碼角度分析建立執行緒池究竟有哪些方式

高併發之——從原始碼角度分析建立執行緒池究竟有哪些方式

前言

在Java的高併發領域,執行緒池一直是一個繞不開的話題。有些童鞋一直在使用執行緒池,但是,對於如何建立執行緒池僅僅停留在使用Executors工具類的方式,那麼,建立執行緒池究竟存在哪幾種方式呢?就讓我們一起從建立執行緒池的原始碼來深入分析究竟有哪些方式可以建立執行緒池。

使用Executors工具類建立執行緒池

在建立執行緒池時,初學者用的最多的就是Executors 這個工具類,而使用這個工具類建立執行緒池時非常簡單的,不需要關注太多的執行緒池細節,只需要傳入必要的引數即可。Executors 工具類提供了幾種建立執行緒池的方法,如下所示。

  • Executors.newCachedThreadPool:建立一個可快取的執行緒池,如果執行緒池的大小超過了需要,可以靈活回收空閒執行緒,如果沒有可回收執行緒,則新建執行緒
  • Executors.newFixedThreadPool:建立一個定長的執行緒池,可以控制執行緒的最大併發數,超出的執行緒會在佇列中等待
  • Executors.newScheduledThreadPool:建立一個定長的執行緒池,支援定時、週期性的任務執行
  • Executors.newSingleThreadExecutor: 建立一個單執行緒化的執行緒池,使用一個唯一的工作執行緒執行任務,保證所有任務按照指定順序(先入先出或者優先順序)執行
  • Executors.newSingleThreadScheduledExecutor:建立一個單執行緒化的執行緒池,支援定時、週期性的任務執行
  • Executors.newWorkStealingPool:建立一個具有並行級別的work-stealing執行緒池

其中,Executors.newWorkStealingPool方法是Java 8中新增的建立執行緒池的方法,它能夠為執行緒池設定並行級別,具有更高的併發度和效能。除了此方法外,其他建立執行緒池的方法本質上呼叫的是ThreadPoolExecutor類的構造方法。

例如,我們可以使用如下程式碼建立執行緒池。

Executors.newWorkStealingPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool(3);

使用ThreadPoolExecutor類建立執行緒池

從程式碼結構上看ThreadPoolExecutor類繼承自AbstractExecutorService,也就是說,ThreadPoolExecutor類具有AbstractExecutorService類的全部功能。

既然Executors工具類中建立執行緒池大部分呼叫的都是ThreadPoolExecutor類的構造方法,所以,我們也可以直接呼叫ThreadPoolExecutor類的構造方法來建立執行緒池,而不再使用Executors工具類。接下來,我們一起看下ThreadPoolExecutor類的構造方法。

ThreadPoolExecutor類中的所有構造方法如下所示。

public ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                 BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                    ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     threadFactory, defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                    TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     Executors.defaultThreadFactory(), handler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                    BlockingQueue<Runnable> workQueue,
                ThreadFactory threadFactory,
                RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

由ThreadPoolExecutor類的構造方法的原始碼可知,建立執行緒池最終呼叫的構造方法如下。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
              long keepAliveTime, TimeUnit unit,
              BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
                  RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

關於此構造方法中各引數的含義和作用,如下所示。
注意:為了更加深入的分析ThreadPoolExecutor類的構造方法,會適當調整引數的順序進行解析,以便於大家更能深入的理解ThreadPoolExecutor構造方法中每個引數的作用。

上述構造方法接收如下引數進行初始化:

(1)corePoolSize:核心執行緒數量。

(2)maximumPoolSize:最大執行緒數。

(3)workQueue:阻塞佇列,儲存等待執行的任務,很重要,會對執行緒池執行過程產生重大影響。

其中,上述三個引數的關係如下所示:

  • 如果執行的執行緒數小於corePoolSize,直接建立新執行緒處理任務,即使執行緒池中的其他執行緒是空閒的。
  • 如果執行的執行緒數大於等於corePoolSize,並且小於maximumPoolSize,此時,只有當workQueue滿時,才會建立新的執行緒處理任務。
  • 如果設定的corePoolSize與maximumPoolSize相同,那麼建立的執行緒池大小是固定的,此時,如果有新任務提交,並且workQueue沒有滿時,就把請求放入到workQueue中,等待空閒的執行緒,從workQueue中取出任務進行處理。
  • 如果執行的執行緒數量大於maximumPoolSize,同時,workQueue已經滿了,會通過拒絕策略引數rejectHandler來指定處理策略。

根據上述三個引數的配置,執行緒池會對任務進行如下處理方式:

當提交一個新的任務到執行緒池時,執行緒池會根據當前執行緒池中正在執行的執行緒數量來決定該任務的處理方式。處理方式總共有三種:直接切換、使用無限佇列、使用有界佇列。

  • 直接切換常用的佇列就是SynchronousQueue。
  • 使用無限佇列就是使用基於連結串列的佇列,比如:LinkedBlockingQueue,如果使用這種方式,執行緒池中建立的最大執行緒數就是corePoolSize,此時maximumPoolSize不會起作用。當執行緒池中所有的核心執行緒都是執行狀態時,提交新任務,就會放入等待佇列中。
  • 使用有界佇列使用的是ArrayBlockingQueue,使用這種方式可以將執行緒池的最大執行緒數量限制為maximumPoolSize,可以降低資源的消耗。但是,這種方式使得執行緒池對執行緒的排程更困難,因為執行緒池和佇列的容量都是有限的了。

根據上面三個引數,我們可以簡單得出如何降低系統資源消耗的一些措施:

  • 如果想降低系統資源的消耗,包括CPU使用率,作業系統資源的消耗,上下文環境切換的開銷等,可以設定一個較大的佇列容量和較小的執行緒池容量。這樣,會降低執行緒處理任務的吞吐量。
  • 如果提交的任務經常發生阻塞,可以考慮呼叫設定最大執行緒數的方法,重新設定執行緒池最大執行緒數。如果佇列的容量設定的較小,通常需要將執行緒池的容量設定的大一些,這樣,CPU的使用率會高些。如果執行緒池的容量設定的過大,併發量就會增加,則需要考慮執行緒排程的問題,反而可能會降低處理任務的吞吐量。

接下來,我們繼續看ThreadPoolExecutor的構造方法的引數。

(4)keepAliveTime:執行緒沒有任務執行時最多保持多久時間終止
當執行緒池中的執行緒數量大於corePoolSize時,如果此時沒有新的任務提交,核心執行緒外的執行緒不會立即銷燬,需要等待,直到等待的時間超過了keepAliveTime就會終止。

(5)unit:keepAliveTime的時間單位

(6)threadFactory:執行緒工廠,用來建立執行緒
預設會提供一個預設的工廠來建立執行緒,當使用預設的工廠來建立執行緒時,會使新建立的執行緒具有相同的優先順序,並且是非守護的執行緒,同時也設定了執行緒的名稱

(7)rejectHandler:拒絕處理任務時的策略

如果workQueue阻塞佇列滿了,並且沒有空閒的執行緒池,此時,繼續提交任務,需要採取一種策略來處理這個任務。
執行緒池總共提供了四種策略:

  • 直接丟擲異常,這也是預設的策略。實現類為AbortPolicy。
  • 用呼叫者所在的執行緒來執行任務。實現類為CallerRunsPolicy。
  • 丟棄佇列中最靠前的任務並執行當前任務。實現類為DiscardOldestPolicy。
  • 直接丟棄當前任務。實現類為DiscardPolicy。

大家可以自行呼叫ThreadPoolExecutor類的構造方法來建立執行緒池。例如,我們可以使用如下形式建立執行緒池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());

使用ForkJoinPool類建立執行緒池

在Java8的Executors工具類中,新增瞭如下建立執行緒池的方式。

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}
 
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

從原始碼可以可以,本質上呼叫的是ForkJoinPool類的構造方法類建立執行緒池,而從程式碼結構上來看ForkJoinPool類繼承自AbstractExecutorService抽象類。接下來,我們看下ForkJoinPool類的構造方法。

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}
 public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
 
public ForkJoinPool(int parallelism,
                ForkJoinWorkerThreadFactory factory,
                UncaughtExceptionHandler handler,
                boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}
 
private ForkJoinPool(int parallelism,
                 ForkJoinWorkerThreadFactory factory,
                 UncaughtExceptionHandler handler,
                 int mode,
                 String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

通過檢視原始碼得知,ForkJoinPool的構造方法,最終呼叫的是如下私有構造方法。

private ForkJoinPool(int parallelism,
                 ForkJoinWorkerThreadFactory factory,
                 UncaughtExceptionHandler handler,
                 int mode,
                 String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

其中,各引數的含義如下所示。

  • parallelism:併發級別。
  • factory:建立執行緒的工廠類物件。
  • handler:當執行緒池中的執行緒丟擲未捕獲的異常時,統一使用UncaughtExceptionHandler物件處理。
  • mode:取值為FIFO_QUEUE或者LIFO_QUEUE。
  • workerNamePrefix:執行任務的執行緒名稱的字首。

當然,私有構造方法雖然是引數最多的一個方法,但是其不會直接對外方法,我們可以使用如下方式建立執行緒池。

new ForkJoinPool();
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);

使用ScheduledThreadPoolExecutor類建立執行緒池

在Executors工具類中存在如下方法類建立執行緒池。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
 
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}
 
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
 
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

從原始碼來看,這幾個方法本質上呼叫的都是ScheduledThreadPoolExecutor類的構造方法,ScheduledThreadPoolExecutor中存在的構造方法如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

而從程式碼結構上看,ScheduledThreadPoolExecutor類繼承自ThreadPoolExecutor類,本質上還是呼叫ThreadPoolExecutor類的構造方法,只不過此時傳遞的佇列為DelayedWorkQueue。我們可以直接呼叫ScheduledThreadPoolExecutor類的構造方法來建立執行緒池,例如以如下形式建立執行緒池。

new ScheduledThreadPoolExecutor(3)

最後,需要注意的是:ScheduledThreadPoolExecutor主要用來建立執行定時任務的執行緒池