1. 程式人生 > >Java多執行緒複習與鞏固(七)--任務排程執行緒池ScheduledThreadPoolExecutor

Java多執行緒複習與鞏固(七)--任務排程執行緒池ScheduledThreadPoolExecutor

1. 為什麼要使用ScheduledThreadPoolExecutor

《Java多執行緒複習與鞏固(二)–執行緒相關工具類Timer和ThreadLocal的使用》提到過,Timer可以實現指定延時排程任務,還可以實現任務的週期性執行。但是Timer中的所有任務都是由一個TimerThread執行,也就是說Timer是單執行緒執行任務。單執行緒執行任務有一個致命的缺點:當某些任務的執行特別耗時,後續的任務無法在預定的時間內得到執行,前一個任務的延遲或異常將影響到後續的任務;另外TimerThread沒有做異常處理,一個任務出現異常將會導致整個Timer執行緒結束

由於Timer單執行緒的種種缺點,這個時候我們就需要讓執行緒池去執行這些任務。

2. 使用Executors工具類

Executors是執行緒池框架提供給我們的建立執行緒池的工具類,FixedThreadPool,SingleThreadExecutor,CachedThreadPool都是上一篇文章中的ThreadPoolExecutor物件

他還有另外兩個方法:

// 建立(可計劃的)任務延時執行執行緒池
public static ScheduledExecutorService newScheduledThreadPool();
// 單執行緒版的任務計劃執行的執行緒池,
public static ScheduledExecutorService newSingleThreadScheduledExecutor
();

從下面的繼承圖我們知道ScheduledThreadPoolExecutor就是ScheduledExecutorService介面的實現類。

執行緒池ThreadPoolExecutor相關類繼承圖

3. 構造ScheduledThreadPoolExecutor物件

先看一下ScheduledThreadPoolExecutor的幾個建構函式

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    ...
    public
ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } ... }

從上面的程式碼可以看出ScheduledThreadPoolExecutor都是直接呼叫的父類ThreadPoolExecutor的建構函式。

我們結合上一篇對ThreadPoolExecutor構造引數的解釋對ScheduledThreadPoolExecutor的幾個引數進行分析,主要有以下幾個引數比較特殊:

  • maximumPoolSize:執行緒池允許的最大執行緒數為Integer.MAX_VALUE,也就意味著ScheduledThreadPoolExecutor對執行緒數沒有限制。這個是必須的,因為一旦對執行緒數有了限制,必定會存在任務等待排程的情況,有等待就可能會存在任務延時,所以最大執行緒數不能有限制。
  • keepAliveTime和unit:0 NANOSECONDS,0納秒,也就是說一旦有空閒執行緒會立即銷燬該執行緒物件。
  • workQueue:DelayedWorkQueue是ScheduledThreadPoolExecutor的內部類,它也是實現按時排程的核心。

4. 二叉堆DelayedWorkQueue

DelayedWorkQueue和java.util.concurrent.DelayQueue有著驚人的相似度:

  • DelayedWorkQueue實現了一個容量無限的二叉堆,DelayQueue底層使用PriorityQueue實現二叉堆各種操作。
  • DelayedWorkQueue儲存了java.util.concurrent.RunnableScheduledFuture介面的實現類,DelayQueue儲存java.util.concurrent.Delayed介面的實現類,這兩個介面有以下的繼承關係(其中ScheduledThreadPoolExecutor內部類ScheduledFutureTask就實現了RunnableScheduledFuture介面)

Future繼承圖

5. 為什麼使用二叉堆

大學學過資料結構的應該學過堆排序吧:堆排序就是用小頂堆(或大頂堆)實現最小(或最大)的元素往堆頂移動。這裡的DelayedWorkQueue就是使用二叉堆獲取堆中延時最短的任務。具體的比較策略讓我們看下面這個方法:

ScheduledThreadPoolExecutor.ScheduledFutureTask.compareTo()

        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                // 優先比較任務執行的時間
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                // 時間相同比較任務的先後順序(FIFO)
                // 這個sequenceNumber在建立ScheduledFutureTask的時候
                // 由一個AtomicLong生成
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

6. 為什麼不用DelayQueue的二叉堆實現

java.util.concurrent.DelayQueue就是根據延時獲取元素的,那為什麼不直接用DalayQueue而重新定義一個DelayedWorkQueue呢。這個問題本質上就是在問DelayQueueDelayedWorkQueue的區別,我們看一下DelayedWorkQueue註釋中的一段話:

    static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

        /*
         * A DelayedWorkQueue is based on a heap-based data structure
         * like those in DelayQueue and PriorityQueue, except that
         * every ScheduledFutureTask also records its index into the
         * heap array. This eliminates the need to find a task upon
         * cancellation, greatly speeding up removal (down from O(n)
         * to O(log n)), and reducing garbage retention that would
         * otherwise occur by waiting for the element to rise to top
         * before clearing. But because the queue may also hold
         * RunnableScheduledFutures that are not ScheduledFutureTasks,
         * we are not guaranteed to have such indices available, in
         * which case we fall back to linear search. (We expect that
         * most tasks will not be decorated, and that the faster cases
         * will be much more common.)
         *
         * All heap operations must record index changes -- mainly
         * within siftUp and siftDown. Upon removal, a task's
         * heapIndex is set to -1. Note that ScheduledFutureTasks can
         * appear at most once in the queue (this need not be true for
         * other kinds of tasks or work queues), so are uniquely
         * identified by heapIndex.
         */
        ...
    }

大致翻譯過來:

DelayedWorkQueue類似於DelayQueue和PriorityQueue,是基於“堆”的一種資料結構。
區別就在於ScheduledFutureTask記錄了它在堆陣列中的索引,這個索引的好處就在於:
取消任務時不再需要從陣列中查詢任務,極大的加速了remove操作,時間複雜度從O(n)降低到了O(log n),
同時不用等到元素上升至堆頂再清除從而降低了垃圾殘留時間。
但是由於DelayedWorkQueue持有的是RunnableScheduledFuture介面引用而不是ScheduledFutureTask的引用,
所以不能保證索引可用,不可用時將會降級到線性查詢演算法(我們預測大多數任務不會被包裝修飾,因此速度更快的情況更為常見)。

所有的堆操作必須記錄索引的變化 ————主要集中在siftUp和siftDown兩個方法中。一個任務刪除後他的headIndex會被置為-1。
注意每個ScheduledFutureTask在佇列中最多出現一次(對於其他型別的任務或者佇列不一定只出現一次),
所以可以通過heapIndex進行唯一標識。

這裡有幾個地方可能有疑問:

1. remove操作的時間複雜度從O(n)降低到了O(log n)

        public boolean remove(Object x) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 因為在heapIndex中儲存了索引
                // indexOf的時間複雜度從線性搜尋的O(n)
                // 降低到了常量O(1)
                int i = indexOf(x);
                if (i < 0)
                    return false;

                // heapIndex標記為-1,表示已刪除
                setIndex(queue[i], -1);
                int s = --size;
                RunnableScheduledFuture<?> replacement = queue[s];
                queue[s] = null;
                // siftUp和siftDown操作完全二叉樹時間複雜度為O(log n)
                // 綜合前面的O(1)+O(log n) ==> O(log n)
                if (s != i) {
                    siftDown(i, replacement);
                    if (queue[i] == replacement)
                        siftUp(i, replacement);
                }
                return true;
            } finally {
                lock.unlock();
            }
        }
        private int indexOf(Object x) {
            if (x != null) {
                // 這裡的ScheduledFutureTask不是靜態內部類
                // 所以ScheduledFutureTask會與外部的執行緒池物件關聯(儲存外部執行緒池的引用)
                // 這裡的instanceof操作的如果是其他執行緒池中的ScheduledFutureTask物件
                // 將會返回false
                if (x instanceof ScheduledFutureTask) {
                    // 如果是ScheduledFutureTask,可用heapIndex直接索引
                    int i = ((ScheduledFutureTask) x).heapIndex;
                    if (i >= 0 && i < size && queue[i] == x)
                        return i;
                } else {
                    // 否則使用線性查詢
                    for (int i = 0; i < size; i++)
                        if (x.equals(queue[i]))
                            return i;
                }
            }
            return -1;
        }

2. 任務的包裝修飾

包裝修飾主要是指兩個ScheduledThreadPoolExecutor.decorateTask方法。這部分內容放在文末“擴充套件ScheduledThreadPoolExecutor的功能”時講。

7. 任務的提交

    public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }
    public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }
    public <T> Future<T> submit(Runnable task, T result) {
        return schedule(Executors.callable(task, result), 0, NANOSECONDS);
    }
    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, NANOSECONDS);
    }

我們看到原來ThreadPoolExecutor中的幾個提交方法都被重寫了,最終呼叫了個的都是schedule方法,並且這幾個方法的延時都為0納秒。

8. schedule

既然前面任務的提交全部都是交給schedule方法執行,那麼讓我們看一下schedule相關的幾個方法

下面的幾個方法也是ScheduledExecutorService介面擴充套件的幾個方法

下面需要注意的主要是scheduleAtFixedRatescheduleWithFixedDelay兩個方法的區別

    // 觸發時間
    private long triggerTime(long delay, TimeUnit unit) {
        // 時間統一使用納秒單位
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }
    long triggerTime(long delay) {
        // 當前時間加上延遲時間
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }
    // 在指定的時間執行一次,沒有返回值
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            // 將Runnable介面物件封裝成ScheduledFutureTask
            new ScheduledFutureTask<Void>(command, null, // Runnable給的返回值為null
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
    // 在指定的時間執行一次,有返回值
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            // 將Callable介面物件封裝成ScheduledFutureTask
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    // 建立並執行一個週期性的任務,這個任務在initialDelay時間後生效
    // 第一次initialDelay,然後initialDelay+period,再然後initialDelay + 2 * period
    // 依此類推往下執行
    // 1. 如果執行過程中出現異常,後續的執行將會終止
    //    否則後續的任務會一直執行除非任務呼叫cancel方法取消或者執行緒池終止了
    // 2. 如果該任務任意一次執行超過了它的週期,那麼後續的執行計劃將會推遲
    //    絕對不會一個任務同時由兩個執行緒執行
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        // 週期執行的任務
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          // 正數:固定週期執行
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

    // 建立並執行一個週期性的任務,任務在initialDelay時間後生效
    // 後續的執行時間在前一次任務執行完成後延時delay時間後執行
    // 第一次執行時間在initialDelay
    // 如果第一次執行耗時T1,那麼第二次執行時間在initialDelay+T1+delay,
    // 如果第二次執行耗時T2,那麼第三次執行時間在initialDelay+T1+T2+2*delay
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        // 延遲執行的任務
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          // 負數:固定延遲執行
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
    // 下面兩個方法時留給子類實現的,預設直接返回task
    protected <V> RunnableScheduledFuture<V> decorateTask(
        Runnable runnable, RunnableScheduledFuture<V> task) {
        return task;
    }
    protected <V> RunnableScheduledFuture<V> decorateTask(
        Callable<V> callable, RunnableScheduledFuture<V> task) {
        return task;
    }

fixRate與fixDelay的區別

9. delayedExecute

上面的幾個方法都是將runnablecallable包裝成ScheduledFutureTask物件,最終都是丟給delayedExecute方法去執行:

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        // 如果執行緒池已經SHUTDOWN,則拒絕任務
        if (isShutdown())
            reject(task);
        else {
            // 入隊
            super.getQueue().add(task);
            // 再次檢查
            if (isShutdown() &&
                // 檢查執行緒池當前狀態是否能繼續執行任務
                // shutdown狀態下是否把未完成的任務執行完
                !canRunInCurrentRunState(task.isPeriodic()) &&
                // 不能執行則移除任務
                remove(task))
                // 移除失敗則取消任務
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
    // 這個方法和ThreadPoolExecutor.prestartCoreThread方法基本一致
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            // 新增核心執行緒
            addWorker(null, true);
        else if (wc == 0)
            // wc==0,說明corePoolSize==0,也就是所有的執行緒都是普通執行緒
            // 新增普通執行緒
            addWorker(null, false);
    }

10. ScheduledFutureTask.run

新增執行緒後,執行緒肯定會從阻塞佇列中獲取任務,並執行任務的run方法,也就是ScheduledFutureTask的run方法:

    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        ...
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                // 不是週期性執行,則直接執行
                ScheduledFutureTask.super.run();

            // 否則就是週期性執行:執行完一個週期後,重置任務的狀態
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime(); // 設定下一次執行的時間
                reExecutePeriodic(outerTask);
            }
        }
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                // 是呼叫scheduleAtFixedRate建立的任務,固定週期
                // 直接將上一次的時間加上週期
                time += p;
            else
                // 是呼叫scheduleWithFixedDelay建立的任務,固定延遲
                // 當前時間加上延遲
                time = triggerTime(-p);
        }
    }

11. ScheduledThreadPoolExecutor的其他配置項

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    /**
     * false:線上程池SHUTDOWN後取消已存在的週期任務
     * true: 執行緒池SHUTDOWN後,繼續執行已存在的週期任務
     */
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;

    /**
     * false: 線上程池SHUTDOWN後取消已存在的非週期性任務
     * true: 執行緒池SHUTDOWN後,繼續執行已存在的非週期性任務
     */
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    /**
     * true: 呼叫ScheduledFutureTask.cancel方法後將任務從佇列中remove
     */
    private volatile boolean removeOnCancel = false;

    // 省略這三個屬性的getter/setter方法
}

12. 繼承ScheduledThreadPoolExecutor對任務進行包裝

ThreadPoolExecutor提供了beforeExecute,afterExecute,terminated三個鉤子方法讓我們過載以進行擴充套件。

ScheduledThreadPoolExecutor也提供了兩個方法給我們擴充套件,下面是JDK文件提供的一個簡單例子:

public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {

  static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }

  // 我們可以在這兩個方法中對任務進行修改或包裝
  protected <V> RunnableScheduledFuture<V> decorateTask(
               Runnable r, RunnableScheduledFuture<V> task) {
      return new CustomTask<V>(r, task);
  }
  protected <V> RunnableScheduledFuture<V> decorateTask(
               Callable<V> c, RunnableScheduledFuture<V> task) {
      return new CustomTask<V>(c, task);
  }
  // ... add constructors, etc.
}

13. ScheduledThreadPoolExecutor尚有的缺點

ScheduledThreadPoolExecutor是使用納秒為單位進行任務排程,它底層使用的是System.nanoTime()來獲取時間:

    final long now() {
        return System.nanoTime();
    }

這個時間是相對於JVM虛擬機器啟動的時間,這個納秒值在263292後會溢位(幾乎可以忽略溢位問題),ScheduledThreadPoolExecutor也對溢位進行了處理:

    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }
    private long overflowFree(long delay) {
        Delayed head = (Delayed) super.getQueue().peek();
        if (head != null) {
            // 溢位會影響compareTo方法的比較
            long headDelay = head.getDelay(NANOSECONDS);
            if (headDelay < 0 && (delay - headDelay < 0))
                delay = Long.MAX_VALUE + headDelay;
        }
        return delay;
    }

既然ScheduledThreadPoolExecutor已經處理了,那還有什麼問題嗎。問題就在於我們無法使用yyyy-MM-dd HH-mm-ss這種精確時間點的方式進行任務的排程。

不過在SpringTask 以及 Quartz等框架中已經解決了這個問題,並提供了cron表示式來精確任務的排程時間。後續如果有機會對這些框架的原理進行分析。

SpringTask既可以單獨使用也可以整合Quartz使用,除了Quartz還有一個輕量級的Cron4j可以實現任務排程,不過Cron4j並沒有用執行緒池(估計那時候java5還沒出來),每個任務都會去建立一個新執行緒。