1. 程式人生 > >Java任務排程執行緒池ScheduledThreadPoolExecutor原理解析

Java任務排程執行緒池ScheduledThreadPoolExecutor原理解析

  ScheduledThreadPoolExecutor是JDK在ThreadPoolExecutor的基礎上實現的任務排程執行緒池。
  ScheduledThreadPoolExecutor的建構函式全部是呼叫父類(也就是ThreadPoolExecutor)的建構函式。其中,核心執行緒數是必須設定的,最大執行緒數是Integer.MAX_VALUE,空閒工作執行緒生存時間是0,阻塞佇列是DelayedWorkQueue。
  DelayedWorkQueue內部使用一個初始容量為16的陣列來儲存任務,容量不夠時會擴容,所以可以任務DelayedWorkQueue是一個無界佇列,那麼最大執行緒數的設定也是沒有意義的。

//建構函式
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
          new DelayedWorkQueue());
}

  既然ScheduledThreadPoolExecutor的建構函式全部使用父類的,那麼又是如何實現定時排程的呢?對比ScheduledThreadPoolExecutor與ThreadPoolExecutor,不同之處主要是下面兩點:

  1. 任務不同。ScheduledThreadPoolExecutor的任務統一被封裝成了ScheduledFutureTask
    物件,而ThreadPoolExecutor執行的還是原始的Runnable的物件。
  2. 阻塞佇列不同。ScheduledThreadPoolExecutor使用的是DelayedWorkQueue,顧名思義,這是一個延時佇列。

我們以scheduleAtFixedRate()方法為例來看看具體是如何實現的。
scheduleAtFixedRate的大致邏輯如下:

  1. 將任務封裝成一個ScheduledFutureTask物件
  2. 將ScheduledFutureTask物件放到延時佇列中
/**
 * 主要任務:
 * 1.封裝一個ScheduledFutureTask物件
 * 2.執行delayedExecute()方法
 * /
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                             long initialDelay,
                                             long period,
                                             TimeUnit unit) {
    if
(command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } /** * 主要任務: * 1.將task新增到佇列中 * / private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }

  所以,下面最重要的應該是延時佇列DelayedWorkQueue的offer和take方法了,來看看是怎麼實現的。
  DelayedWorkQueue內部使用陣列去維護任務佇列的,那麼陣列是怎麼保證任務有序呢?
  其實仔細看程式碼,我們能發現,這裡的實現是用一個二叉堆去對陣列元素進行排序。確切的說是小頂堆。那麼小頂堆是依據什麼來排序的呢?
  因為ScheduledFutureTask實現了Comparable介面,是按照任務執行的時間來倒敘排序的。

//首先判斷容量,如果容量不夠就擴容,接著判斷是不是第一個元素,如果是,
//那麼直接放在index為0的位置,不是的話進行上濾操作。接下來判斷新增的元素是不是
//在堆頂,如果是那麼需要進行優先排程,那麼進行signal
public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture e = (RunnableScheduledFuture)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            //擴容
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            //根據任務的下一次執行時間比較,將最近需要執行的任務放到前面
            siftUp(i, e);
        }
        if (queue[0] == e) {
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

//毫無疑問,take中直接獲取queue[0],它是距離目前最近的要被執行的任務,
//先檢測一下還有多長時間,任務會被執行,如果小於0,那麼立刻彈出,
//並且做一個下濾操作,重新找出堆頂元素。如果不小於0,那麼證明時間還沒到,
//那麼available.awaitNanos(delay);等到delay時間後自動喚醒,
//或者因為添加了一個更加緊急的任務即offer中的signal被呼叫了,那麼喚醒,
//重新迴圈獲取最優先執行的任務,如果delay小於0,那麼直接彈出任務。
public RunnableScheduledFuture take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture first = queue[0];
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(TimeUnit.NANOSECONDS);
                if (delay <= 0)
                    //時間已到,取出
                    return finishPoll(first);
                else if (leader != null)
                    //等待
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

  弄清楚了延時的實現原理,下面最關鍵的就是週期排程的原理了。這個是在ScheduledFutureTask的run方法裡面實現的。
  判斷是否是週期執行的,如果不是,直接執行,如果是,先執行,然後計算下一次執行時間,將任務重新新增到延時佇列中。

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}