1. 程式人生 > >執行緒池原理(四):ScheduledThreadPoolExecutor

執行緒池原理(四):ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor用於定時任務,這裡的定時意義在於:

  1. 指定延時後執行任務。
  2. 週期性重複執行任務。

我們接著分析ScheduledThreadPoolExecutor原始碼,從類宣告開始

類宣告

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    //……
}        

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,實現了ScheduledExecutorService。線上程池的基礎上,實現了可排程的執行緒池功能。上一篇文章已經詳細介紹了

ThreadPoolExecutor,這裡我們先看下ScheduledExecutorService的原始碼:

ScheduledExecutorService

//可排程的執行者服務介面
public interface ScheduledExecutorService extends ExecutorService {

    //指定時延後排程執行任務
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    //指定時延後排程執行任務
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); //指定時延後開始執行任務,以後每隔period的時長再次執行該任務 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long
initialDelay, long period, TimeUnit unit); //指定時延後開始執行任務,以後任務執行完成後等待delay時長,再次執行任務 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }

ScheduledExecutorService實現了ExecutorService,並增加若干定時相關的介面。其中schedule方法用於單次排程執行任務。這裡主要理解下後面兩個方法。

  • scheduleAtFixedRate:該方法在initialDelay時長後第一次執行任務,以後每隔period時長,再次執行任務。注意,period是從任務開始執行算起的。開始執行任務後,定時器每隔period時長檢查該任務是否完成,如果完成則再次啟動任務,否則等該任務結束後才再次啟動任務,看下圖示例。

    這裡寫圖片描述

  • scheduleWithFixDelay:該方法在initialDelay時長後第一次執行任務,以後每當任務執行完成後,等待delay時長,再次執行任務,看下圖示例。

    這裡寫圖片描述

schedule

ScheduledThreadPoolExecutor方法實現了ScheduledExecutorService,schedule方法排程的任務只執行一次。

先看下schedule方法的實現:

//delay時長後執行任務command,該任務只執行一次
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    if (command == null || unit == null)
      throw new NullPointerException();
    //這裡的decorateTask方法僅僅返回第二個引數
    RunnableScheduledFuture<?> t = decorateTask(command,
                                                new ScheduledFutureTask<Void>(command, null,
                                                                              triggerTime(delay, unit)));
    //延時或者週期執行任務的主要方法
    delayedExecute(t);
    return t;
}

我們先屢下ScheduledFuture、RunnableScheduledFuture、ScheduledFutureTask的關係,看下類圖:

這裡寫圖片描述

這個類圖比較複雜,其中淺色部分都是我們已經學習過了,深色部分我們之前沒有接觸過,所以重點學習這幾個類,從上往下依次看。

Delayed介面

Delayed介面提供了getDelay方法,該方法返回物件剩餘時延。介面繼承了Comparable介面,表示物件支援排序,看下該介面的定義:

//繼承Comparable介面,表示該類物件支援排序
public interface Delayed extends Comparable<Delayed> {
    //返回該物件剩餘時延
    long getDelay(TimeUnit unit);
}

Delayed介面很簡單,繼續看ScheduledFuture介面。

ScheduledFuture介面

ScheduledFuture是延時的Future,僅僅繼承了Delayed和Future介面,並沒有新增其他方法,看下該介面的定義:

//僅僅繼承了Delayed和Future介面
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

RunnableScheduledFuture介面

可執行的ScheduledFuture,該介面繼承了ScheduledFuture和RunnableFuture介面。

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
    //是否是週期任務,週期任務可被排程執行多次,非週期任務只被執行一次
    boolean isPeriodic();
}

ScheduledFutureTask類

該類是ScheduledThreadPoolExecutor的內部類,繼承了FutureTask,實現了RunnableScheduledFuture介面。FutureTask我們在介紹執行緒池的時候講過。先看下ScheduledFutureTask的構造方法:

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    //呼叫父類FutureTask的構造方法
    super(r, result);
    //time表示任務下次執行的時間
    this.time = ns;
    //週期任務,正數表示按照固定速率,負數表示按照固定時延
    this.period = period;
    //任務的編號
    this.sequenceNumber = sequencer.getAndIncrement();
}

這裡需要注意幾點,

  1. time表示任務下一次執行的時間,單位為納秒。
  2. period=0表示該任務不是週期性任務,正數表示每隔period時長執行任務,負數表示任務執行完成後到下一次被排程執行的延時時間。
  3. sequenceNumber表示該任務的編號,通過執行緒池的sequencer成員變數從0開始生成編號。

繼續看下getDelay方法:

getDelay

//實現Delayed介面的getDelay方法,返回任務開始執行的剩餘時間
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}

這個方法其實就是任務開始執行的倒計時時間,通過任務預期執行時間減去當前時間獲得,單位是納秒。

compareTo

該方法實現了Comparable介面的compareTo方法,比較兩個任務的”大小”。後面我們會講到,可排程的執行緒池其實利用了可排序的延時佇列,延時佇列儲存了ScheduledFutureTask任務,並且佇列中的元素會根據開始執行的倒計時時間排序,剩餘等待時間最少的將會被最先排程執行。這裡排序策略就是根據compareTo方法實現的。

public int compareTo(Delayed other) {
    if (other == this)
      return 0;
    if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      long diff = time - x.time;
      //小於0,說明當前任務的執行時間點早於other,要排在延時佇列other的前面
      if (diff < 0)
        return -1;
      //大於0,說明當前任務的執行時間點晚於other,要排在延時佇列other的後面
      else if (diff > 0)
        return 1;
      //如果兩個任務的執行時間點一樣,比較兩個任務的編號,編號小的排在佇列前面,編號大的排在佇列後面
      else if (sequenceNumber < x.sequenceNumber)
        return -1;
      else
        return 1;
    }
    //如果任務型別不是ScheduledFutureTask,通過getDelay方法比較
    long d = (getDelay(TimeUnit.NANOSECONDS) -
              other.getDelay(TimeUnit.NANOSECONDS));
    return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}

setNextRunTime

任務執行完後,設定下次執行的時間

private void setNextRunTime() {
    long p = period;
    //p>0,說明是固定速率執行的任務,在原來任務開始執行時間的基礎上加上p即可
    if (p > 0)
      time += p;
    //p<0,說明是固定時延執行的任務,下次執行時間在當前時間(任務執行完成的時間)的基礎上加上-p的時間
    else
      time = triggerTime(-p);
}

任務執行完成後需要確定下次執行的時間,如果任務是以固定速率執行的,下次開始執行時間就是上次任務開始執行時間加上period。如果任務是以固定延時執行的,下次開始執行時間就是當前時間(上次任務線束時間)加上period(取正值)。

cancel

取消任務的執行,重點關注將取消的任務從佇列移除的邏輯。

public boolean cancel(boolean mayInterruptIfRunning) {
    //呼叫FutureTask的cancel方法
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    //cancelled: 任務取消成功
    //removeOnCancel:任務取消後從佇列移除
    //headIndex:任務原先處於二叉堆的位置
    if (cancelled && removeOnCancel && heapIndex >= 0)
      //從佇列中移除,該方法是ThreadPoolExecutor的方法
      remove(this);
    //返回是否取消成功
    return cancelled;
}

run

ScheduledFutureTask重寫了FutureTask的run方法。

public void run() {
    boolean periodic = isPeriodic();
    //如果當前狀態下不能執行任務,則取消任務
    if (!canRunInCurrentRunState(periodic))
      cancel(false);
    //不是週期性任務,執行一次任務即可,呼叫父類的run方法
    else if (!periodic)
      ScheduledFutureTask.super.run();
    //是週期性任務,呼叫FutureTask的runAndReset方法,方法執行完成後
    //重新設定任務下一次執行的時間,並將該任務重新入隊,等待再次被排程
    else if (ScheduledFutureTask.super.runAndReset()) {
      setNextRunTime();
      reExecutePeriodic(outerTask);
    }
}

註釋已經解釋的很清楚了,重點看下FutureTask的runAndReset方法,該方法是為任務多次執行而設計的。runAndReset方法執行完任務後不會設定任務的執行結果,也不會去更新任務的狀態,維持任務的狀態為初始狀態(NEW狀態),這也是該方法和FutureTask的run方法的區別。

好了,講完了ScheduledFutureTask,接著看ScheduledPoolExecutor原始碼。

通常我們通過submit或者execute方法將任務提交給執行緒池執行,這兩個方法最終都是呼叫了schedule方法,前面已經講過,schedule方法只會排程任務執行一次。那麼ScheduledThreadPoolExecutor是怎樣排程固定週期或延時的任務的呢?是通過scheduledAtFixedRate和scheduledAtFixedDelay方法實現的,我們先看下scheduledAtFixedRate原始碼:

scheduledAtFixedRate

關於該方法的說明,我們在ScheduledExecutorService介面已經說明過了,這裡主要看下實現。

//注意,固定速率和固定時延,傳入的引數都是Runnable,也就是說這種定時任務是沒有返回值的
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
    if (command == null || unit == null)
      throw new NullPointerException();
    if (period <= 0)
      throw new IllegalArgumentException();
    //建立一個有初始延時和固定週期的任務
    ScheduledFutureTask<Void> sft =
      new ScheduledFutureTask<Void>(command,
                                    null,
                                    triggerTime(initialDelay, unit),
                                    unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    //outerTask表示將會重新入隊的任務
    sft.outerTask = t;
    //稍後說明
    delayedExecute(t);
    return t;
}

其實主要建立了一個帶有初始延時和固定週期的任務,類似的,scheduledAtFixedDelay建立一個帶有初始延時和任務間固定延時的任務。

scheduledAtFixedDelay

和scheduledAtFixedRate類似,唯一不同的地方在於在於建立的ScheduledFutureTask不同,FixedRate和FixedDelay也是通過ScheduledFutureTask體現的。這裡不再展示程式碼了。

delayedExecute

前面講到的schedule、scheduleAtFixedRate和scheduleAtFixedDelay最後都呼叫了delayedExecute方法,該方法是定時任務執行的主要方法。看下delayedExecute原始碼:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //執行緒池已經關閉,呼叫拒絕執行處理器處理
    if (isShutdown())
      reject(task);
    else {
      //將任務加入到等待佇列
      super.getQueue().add(task);
      //執行緒池已經關閉,且當前狀態不能執行該任務,將該任務從等待佇列移除並取消該任務
      if (isShutdown() &&
          !canRunInCurrentRunState(task.isPeriodic()) &&
          remove(task))
        task.cancel(false);
      else
        //增加一個worker,就算corePoolSize=0也要增加一個worker
        ensurePrestart();
    }
}

delayedExecute方法的邏輯也很簡單,主要就是將任務新增到等待佇列並增加一個worker,增加的worker並不能立即執行該任務,因為該任務可能要等待一定時間後才能執行。

對於ScheduledThreadPoolExecutor,worker新增到執行緒池後會在等待佇列上等待獲取任務,這點是和ThreadPoolExecutor一致的。但是worker是怎麼從等待佇列取定時任務的?該等待佇列隊首應該儲存的是最近將要執行的任務,如果隊首任務的開始執行時間還未到,worker也應該繼續等待。

ScheduledThreadPoolExecutor實現了一個延時佇列,該佇列不僅實現了阻塞佇列的功能,也實現了排序功能。後面我們會發現,該佇列是通過二叉堆實現的,理解了該佇列基本上能夠理解ScheduledThreadPoolExecutor了,因此我們好好學習下該佇列。

ScheduledThreadPoolExecutor內部類DelayedWorkQueue就是儲存定時任務的等待佇列。

DelayedWorkQueue

看下DelayedWorkQueue的宣告:

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
        //……
}

DelayedWorkQueue繼承了AbstractQueue抽象類、實現了BlockingQueue介面。

理解DelayedWorkQueue之前需要理解堆排序,這裡的堆排序演算法和DelayedWorkQueue的稍有不同,但是基本思想是相同的。

堆排序是通過陣列實現的,因此DelayedWorkQueue定義了一個數組作為等待佇列。

//佇列初始容量
private static final int INITIAL_CAPACITY = 16;
//陣列用來儲存定時任務,通過陣列實現堆排序
private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];

DelayedWorkQueue儲存了當前在隊首等待的執行緒:

private Thread leader = null;

當一個執行緒成為leader,它只要等待隊首任務的delay時間即可,其他執行緒會無條件等待。leader取到任務返回前要通知其他執行緒,直到有執行緒成為新的leader。每當隊首的定時任務被其他更早需要執行的任務替換時,leader設定為null,其他等待的執行緒(被當前leader通知)和當前的leader重新競爭成為leader。

DelayedWorkQueue定義了鎖lock和條件available用於執行緒競爭成為leader。

private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();

當一個新的任務成為隊首,或者需要有新的執行緒成為leader時,available條件將會被通知。

執行緒取任務時需要在available條件上等待,當被通知時,該執行緒可能會成為新的leader。

我們先看下DelayedWorkQueue的take方法

take

public RunnableScheduledFuture take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
      for (;;) {
        //取堆頂的任務,堆頂是最近要執行的任務
        RunnableScheduledFuture first = queue[0];
        //堆頂為空,執行緒要在條件available上等待
        if (first == null)
          available.await();
        else {
          //堆頂任務還要多長時間才能執行
          long delay = first.getDelay(TimeUnit.NANOSECONDS);
          //堆頂任務已經可以執行了,finishPoll會重新調整堆,使其滿足最小堆特性,該方法設定任務在
          //堆中的index為-1並返回該任務
          if (delay <= 0)
            return finishPoll(first);
          //如果leader不為空,說明已經有執行緒成為leader並等待堆頂任務
          //到達執行時間,此時,其他執行緒都需要在available條件上等待
          else if (leader != null)
            available.await();
          else {
            //leader為空,當前執行緒成為新的leader
            Thread thisThread = Thread.currentThread();
            leader = thisThread;
            try {
              //當前執行緒已經成為leader了,只需要等待堆頂任務到達執行時間即可
              available.awaitNanos(delay);
            } finally {
              //返回堆頂元素之前將leader設定為空
              if (leader == thisThread)
                leader = null;
            }
          }
        }
      }
    } finally {
      //通知其他在available條件等待的執行緒,這些執行緒可以去競爭成為新的leader
      if (leader == null && queue[0] != null)
        available.signal();
      lock.unlock();
    }
}

再梳理下take方法的邏輯

  • 如果堆頂元素為空,在available條件上等待。
  • 如果堆頂任務的執行時間已到,將堆頂元素替換為堆的最後一個元素並調整堆使其滿足最小堆特性,同時設定任務在堆中索引為-1,返回該任務。
  • 如果leader不為空,說明已經有執行緒成為leader了,其他執行緒都要在available條件上等待。
  • 如果leader為空,當前執行緒成為新的leader,並等待直到堆頂任務執行時間到達。
  • take方法返回之前,將leader設定為空,並通知其他執行緒。

繼續看下finishPool方法:

private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
    //堆元素數量減1
    int s = --size;
    //取堆的最後一個元素
    RunnableScheduledFuture x = queue[s];
    queue[s] = null;
    if (s != 0)
      //調整堆,使其重新滿足最小堆特性,從位置0開始往堆的底層調整
      siftDown(0, x);
    //該任務在堆中的索引設定為-1
    setIndex(f, -1);
    //返回該任務
    return f;
}

offer

該方法往佇列插入一個值,返回是否成功插入。

public boolean offer(Runnable x) {
    if (x == null)
      throw new NullPointerException();
    RunnableScheduledFuture e = (RunnableScheduledFuture)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      int i = size;
      //佇列元素已經大於等於陣列的長度,需要擴容,新堆的容易是原來堆容量的1.5倍
      if (i >= queue.length)
        grow();
      //堆中元素增加1
      size = i + 1;
      //調整堆
      if (i == 0) {
        queue[0] = e;
        setIndex(e, 0);
      } else {
        siftUp(i, e);
      }
      if (queue[0] == e) {
        leader = null;
        //通知其他在available條件上等待的執行緒,這些執行緒可以競爭成為新的leader
        available.signal();
      }
    } finally {
      lock.unlock();
    }
    return true;
}

siftUp

該方法是調整堆的方法,調整堆的目的是使其滿足最小堆的特性。

//從位置k開始往堆頂方向查詢,直到找到key儲存的位置
private void siftUp(int k, RunnableScheduledFuture key) {
    while (k > 0) {
      //parent是父節點的索引
      int parent = (k - 1) >>> 1;
      RunnableScheduledFuture e = queue[parent];
      //如果父節點比子節點e的執行時間要早,說明已經符合最小堆的特性,跳出迴圈
      if (key.compareTo(e) >= 0)
        break;
      //子節點比父節點更早執行,將子節點位置的值替換為父節點
      queue[k] = e;
      setIndex(e, k);
      //繼續往上查詢
      k = parent;
    }
    //k是最終key存放的位置
    queue[k] = key;
    setIndex(key, k);
}

看下siftUp示例圖,對於左邊這個堆來說,在位置K處往堆頂方向查詢key=12的位置,因為父節點值為23,大於12,因此將23移到位置K處,位置K上移到父節點所在位置,繼續往堆頂方向查詢key=12的位置。

如果查詢key=50,因為父節點23小於50,因此位置K就是key=50的最終儲存位置。

這裡寫圖片描述

siftDown

該方法和siftUp方法類似

//從位置k處開始往下查詢,找到key的儲存位置
private void siftDown(int k, RunnableScheduledFuture key) {
    //從half開始,就不再有孩子節點的,這是一個優化
    int half = size >>> 1;
    while (k < half) {
      //左孩子位置
      int child = (k << 1) + 1;
      RunnableScheduledFuture c = queue[child];
      //右孩子位置
      int right = child + 1;
      //如果右孩子存在,並且右孩子比左孩子更早執行,更新c為右孩子
      if (right < size && c.compareTo(queue[right]) > 0)
        c = queue[child = right];
      //以上做的都是取兩個孩子中更早執行的那個孩子節點,取到後和key比較
      //如果key比兩個孩子都更早執行,位置k就是key的最終位置了,跳出迴圈
      if (key.compareTo(c) <= 0)
        break;
      //更早執行的孩子放到父節點處
      queue[k] = c;
      setIndex(c, k);
      //繼續往下查詢
      k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

看下siftDown的示例圖,對於左邊的堆來說,在位置K處開始往下查詢key的位置,如果key=12,因為12小於兩個孩子中的最小結點35,因此位置K就是key=12的最終儲存位置。如果key=50,因為50大於兩個孩子結點中的最小結點35,因此將35上移到父節點,位置K下移到35所在的位置,繼續往堆底查詢。

這裡寫圖片描述