Java多執行緒複習與鞏固(七)--任務排程執行緒池ScheduledThreadPoolExecutor
1. 為什麼要使用ScheduledThreadPoolExecutor
在《Java多執行緒複習與鞏固(二)–執行緒相關工具類Timer和ThreadLocal的使用》提到過,Timer可以實現指定延時排程任務,還可以實現任務的週期性執行。但是Timer中的所有任務都是由一個TimerThread執行,也就是說Timer是單執行緒執行任務。單執行緒執行任務有一個致命的缺點:當某些任務的執行特別耗時,後續的任務無法在預定的時間內得到執行,前一個任務的延遲或異常將影響到後續的任務;另外TimerThread沒有做異常處理,一個任務出現異常將會導致整個Timer執行緒結束。
由於Timer單執行緒的種種缺點,這個時候我們就需要讓執行緒池去執行這些任務。
2. 使用Executors工具類
Executors是執行緒池框架提供給我們的建立執行緒池的工具類,FixedThreadPool,SingleThreadExecutor,CachedThreadPool都是上一篇文章中的ThreadPoolExecutor物件。
他還有另外兩個方法:
// 建立(可計劃的)任務延時執行執行緒池
public static ScheduledExecutorService newScheduledThreadPool();
// 單執行緒版的任務計劃執行的執行緒池,
public static ScheduledExecutorService newSingleThreadScheduledExecutor ();
從下面的繼承圖我們知道ScheduledThreadPoolExecutor就是ScheduledExecutorService介面的實現類。
3. 構造ScheduledThreadPoolExecutor物件
先看一下ScheduledThreadPoolExecutor的幾個建構函式
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
...
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
...
}
從上面的程式碼可以看出ScheduledThreadPoolExecutor都是直接呼叫的父類ThreadPoolExecutor的建構函式。
我們結合上一篇對ThreadPoolExecutor構造引數的解釋對ScheduledThreadPoolExecutor的幾個引數進行分析,主要有以下幾個引數比較特殊:
- maximumPoolSize:執行緒池允許的最大執行緒數為
Integer.MAX_VALUE
,也就意味著ScheduledThreadPoolExecutor對執行緒數沒有限制。這個是必須的,因為一旦對執行緒數有了限制,必定會存在任務等待排程的情況,有等待就可能會存在任務延時,所以最大執行緒數不能有限制。 - keepAliveTime和unit:0 NANOSECONDS,0納秒,也就是說一旦有空閒執行緒會立即銷燬該執行緒物件。
- workQueue:DelayedWorkQueue是ScheduledThreadPoolExecutor的內部類,它也是實現按時排程的核心。
4. 二叉堆DelayedWorkQueue
DelayedWorkQueue和java.util.concurrent.DelayQueue
有著驚人的相似度:
- DelayedWorkQueue實現了一個容量無限的二叉堆,DelayQueue底層使用PriorityQueue實現二叉堆各種操作。
- DelayedWorkQueue儲存了
java.util.concurrent.RunnableScheduledFuture
介面的實現類,DelayQueue儲存java.util.concurrent.Delayed
介面的實現類,這兩個介面有以下的繼承關係(其中ScheduledThreadPoolExecutor
內部類ScheduledFutureTask
就實現了RunnableScheduledFuture
介面)
5. 為什麼使用二叉堆
大學學過資料結構的應該學過堆排序吧:堆排序就是用小頂堆(或大頂堆)實現最小(或最大)的元素往堆頂移動。這裡的DelayedWorkQueue就是使用二叉堆獲取堆中延時最短的任務。具體的比較策略讓我們看下面這個方法:
ScheduledThreadPoolExecutor.ScheduledFutureTask.compareTo()
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
// 優先比較任務執行的時間
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
// 時間相同比較任務的先後順序(FIFO)
// 這個sequenceNumber在建立ScheduledFutureTask的時候
// 由一個AtomicLong生成
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
6. 為什麼不用DelayQueue的二叉堆實現
java.util.concurrent.DelayQueue
就是根據延時獲取元素的,那為什麼不直接用DalayQueue
而重新定義一個DelayedWorkQueue
呢。這個問題本質上就是在問DelayQueue
與DelayedWorkQueue
的區別,我們看一下DelayedWorkQueue
註釋中的一段話:
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
/*
* A DelayedWorkQueue is based on a heap-based data structure
* like those in DelayQueue and PriorityQueue, except that
* every ScheduledFutureTask also records its index into the
* heap array. This eliminates the need to find a task upon
* cancellation, greatly speeding up removal (down from O(n)
* to O(log n)), and reducing garbage retention that would
* otherwise occur by waiting for the element to rise to top
* before clearing. But because the queue may also hold
* RunnableScheduledFutures that are not ScheduledFutureTasks,
* we are not guaranteed to have such indices available, in
* which case we fall back to linear search. (We expect that
* most tasks will not be decorated, and that the faster cases
* will be much more common.)
*
* All heap operations must record index changes -- mainly
* within siftUp and siftDown. Upon removal, a task's
* heapIndex is set to -1. Note that ScheduledFutureTasks can
* appear at most once in the queue (this need not be true for
* other kinds of tasks or work queues), so are uniquely
* identified by heapIndex.
*/
...
}
大致翻譯過來:
DelayedWorkQueue類似於DelayQueue和PriorityQueue,是基於“堆”的一種資料結構。
區別就在於ScheduledFutureTask記錄了它在堆陣列中的索引,這個索引的好處就在於:
取消任務時不再需要從陣列中查詢任務,極大的加速了remove操作,時間複雜度從O(n)降低到了O(log n),
同時不用等到元素上升至堆頂再清除從而降低了垃圾殘留時間。
但是由於DelayedWorkQueue持有的是RunnableScheduledFuture介面引用而不是ScheduledFutureTask的引用,
所以不能保證索引可用,不可用時將會降級到線性查詢演算法(我們預測大多數任務不會被包裝修飾,因此速度更快的情況更為常見)。
所有的堆操作必須記錄索引的變化 ————主要集中在siftUp和siftDown兩個方法中。一個任務刪除後他的headIndex會被置為-1。
注意每個ScheduledFutureTask在佇列中最多出現一次(對於其他型別的任務或者佇列不一定只出現一次),
所以可以通過heapIndex進行唯一標識。
這裡有幾個地方可能有疑問:
1. remove操作的時間複雜度從O(n)降低到了O(log n)
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 因為在heapIndex中儲存了索引
// indexOf的時間複雜度從線性搜尋的O(n)
// 降低到了常量O(1)
int i = indexOf(x);
if (i < 0)
return false;
// heapIndex標記為-1,表示已刪除
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// siftUp和siftDown操作完全二叉樹時間複雜度為O(log n)
// 綜合前面的O(1)+O(log n) ==> O(log n)
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
private int indexOf(Object x) {
if (x != null) {
// 這裡的ScheduledFutureTask不是靜態內部類
// 所以ScheduledFutureTask會與外部的執行緒池物件關聯(儲存外部執行緒池的引用)
// 這裡的instanceof操作的如果是其他執行緒池中的ScheduledFutureTask物件
// 將會返回false
if (x instanceof ScheduledFutureTask) {
// 如果是ScheduledFutureTask,可用heapIndex直接索引
int i = ((ScheduledFutureTask) x).heapIndex;
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
// 否則使用線性查詢
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}
2. 任務的包裝修飾
包裝修飾主要是指兩個ScheduledThreadPoolExecutor.decorateTask
方法。這部分內容放在文末“擴充套件ScheduledThreadPoolExecutor的功能”時講。
7. 任務的提交
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
我們看到原來ThreadPoolExecutor中的幾個提交方法都被重寫了,最終呼叫了個的都是schedule
方法,並且這幾個方法的延時都為0納秒。
8. schedule
既然前面任務的提交全部都是交給schedule方法執行,那麼讓我們看一下schedule相關的幾個方法
下面的幾個方法也是
ScheduledExecutorService
介面擴充套件的幾個方法
下面需要注意的主要是scheduleAtFixedRate
和scheduleWithFixedDelay
兩個方法的區別。
// 觸發時間
private long triggerTime(long delay, TimeUnit unit) {
// 時間統一使用納秒單位
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
// 當前時間加上延遲時間
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// 在指定的時間執行一次,沒有返回值
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
// 將Runnable介面物件封裝成ScheduledFutureTask
new ScheduledFutureTask<Void>(command, null, // Runnable給的返回值為null
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
// 在指定的時間執行一次,有返回值
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
// 將Callable介面物件封裝成ScheduledFutureTask
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
// 建立並執行一個週期性的任務,這個任務在initialDelay時間後生效
// 第一次initialDelay,然後initialDelay+period,再然後initialDelay + 2 * period
// 依此類推往下執行
// 1. 如果執行過程中出現異常,後續的執行將會終止
// 否則後續的任務會一直執行除非任務呼叫cancel方法取消或者執行緒池終止了
// 2. 如果該任務任意一次執行超過了它的週期,那麼後續的執行計劃將會推遲
// 絕對不會一個任務同時由兩個執行緒執行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 週期執行的任務
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
// 正數:固定週期執行
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
// 建立並執行一個週期性的任務,任務在initialDelay時間後生效
// 後續的執行時間在前一次任務執行完成後延時delay時間後執行
// 第一次執行時間在initialDelay
// 如果第一次執行耗時T1,那麼第二次執行時間在initialDelay+T1+delay,
// 如果第二次執行耗時T2,那麼第三次執行時間在initialDelay+T1+T2+2*delay
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 延遲執行的任務
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
// 負數:固定延遲執行
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
// 下面兩個方法時留給子類實現的,預設直接返回task
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
9. delayedExecute
上面的幾個方法都是將runnable
或callable
包裝成ScheduledFutureTask
物件,最終都是丟給delayedExecute
方法去執行:
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果執行緒池已經SHUTDOWN,則拒絕任務
if (isShutdown())
reject(task);
else {
// 入隊
super.getQueue().add(task);
// 再次檢查
if (isShutdown() &&
// 檢查執行緒池當前狀態是否能繼續執行任務
// shutdown狀態下是否把未完成的任務執行完
!canRunInCurrentRunState(task.isPeriodic()) &&
// 不能執行則移除任務
remove(task))
// 移除失敗則取消任務
task.cancel(false);
else
ensurePrestart();
}
}
// 這個方法和ThreadPoolExecutor.prestartCoreThread方法基本一致
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
// 新增核心執行緒
addWorker(null, true);
else if (wc == 0)
// wc==0,說明corePoolSize==0,也就是所有的執行緒都是普通執行緒
// 新增普通執行緒
addWorker(null, false);
}
10. ScheduledFutureTask.run
新增執行緒後,執行緒肯定會從阻塞佇列中獲取任務,並執行任務的run方法,也就是ScheduledFutureTask的run方法:
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
...
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
// 不是週期性執行,則直接執行
ScheduledFutureTask.super.run();
// 否則就是週期性執行:執行完一個週期後,重置任務的狀態
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime(); // 設定下一次執行的時間
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)
// 是呼叫scheduleAtFixedRate建立的任務,固定週期
// 直接將上一次的時間加上週期
time += p;
else
// 是呼叫scheduleWithFixedDelay建立的任務,固定延遲
// 當前時間加上延遲
time = triggerTime(-p);
}
}
11. ScheduledThreadPoolExecutor的其他配置項
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
/**
* false:線上程池SHUTDOWN後取消已存在的週期任務
* true: 執行緒池SHUTDOWN後,繼續執行已存在的週期任務
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/**
* false: 線上程池SHUTDOWN後取消已存在的非週期性任務
* true: 執行緒池SHUTDOWN後,繼續執行已存在的非週期性任務
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
/**
* true: 呼叫ScheduledFutureTask.cancel方法後將任務從佇列中remove
*/
private volatile boolean removeOnCancel = false;
// 省略這三個屬性的getter/setter方法
}
12. 繼承ScheduledThreadPoolExecutor對任務進行包裝
ThreadPoolExecutor提供了beforeExecute,afterExecute,terminated三個鉤子方法讓我們過載以進行擴充套件。
ScheduledThreadPoolExecutor也提供了兩個方法給我們擴充套件,下面是JDK文件提供的一個簡單例子:
public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
// 我們可以在這兩個方法中對任務進行修改或包裝
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable r, RunnableScheduledFuture<V> task) {
return new CustomTask<V>(r, task);
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> c, RunnableScheduledFuture<V> task) {
return new CustomTask<V>(c, task);
}
// ... add constructors, etc.
}
13. ScheduledThreadPoolExecutor尚有的缺點
ScheduledThreadPoolExecutor是使用納秒為單位進行任務排程,它底層使用的是System.nanoTime()
來獲取時間:
final long now() {
return System.nanoTime();
}
這個時間是相對於JVM虛擬機器啟動的時間,這個納秒值在後會溢位(幾乎可以忽略溢位問題),ScheduledThreadPoolExecutor也對溢位進行了處理:
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
// 溢位會影響compareTo方法的比較
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
既然ScheduledThreadPoolExecutor已經處理了,那還有什麼問題嗎。問題就在於我們無法使用yyyy-MM-dd HH-mm-ss
這種精確時間點的方式進行任務的排程。
不過在SpringTask 以及 Quartz等框架中已經解決了這個問題,並提供了cron表示式來精確任務的排程時間。後續如果有機會對這些框架的原理進行分析。
SpringTask既可以單獨使用也可以整合Quartz使用,除了Quartz還有一個輕量級的Cron4j可以實現任務排程,不過Cron4j並沒有用執行緒池(估計那時候java5還沒出來),每個任務都會去建立一個新執行緒。