執行緒池原理(四):ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor用於定時任務,這裡的定時意義在於:
- 指定延時後執行任務。
- 週期性重複執行任務。
我們接著分析ScheduledThreadPoolExecutor原始碼,從類宣告開始
類宣告
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
//……
}
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,實現了ScheduledExecutorService。線上程池的基礎上,實現了可排程的執行緒池功能。上一篇文章已經詳細介紹了
ScheduledExecutorService
//可排程的執行者服務介面
public interface ScheduledExecutorService extends ExecutorService {
//指定時延後排程執行任務
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//指定時延後排程執行任務
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
//指定時延後開始執行任務,以後每隔period的時長再次執行該任務
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//指定時延後開始執行任務,以後任務執行完成後等待delay時長,再次執行任務
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
ScheduledExecutorService實現了ExecutorService,並增加若干定時相關的介面。其中schedule方法用於單次排程執行任務。這裡主要理解下後面兩個方法。
scheduleAtFixedRate:該方法在initialDelay時長後第一次執行任務,以後每隔period時長,再次執行任務。注意,period是從任務開始執行算起的。開始執行任務後,定時器每隔period時長檢查該任務是否完成,如果完成則再次啟動任務,否則等該任務結束後才再次啟動任務,看下圖示例。
scheduleWithFixDelay:該方法在initialDelay時長後第一次執行任務,以後每當任務執行完成後,等待delay時長,再次執行任務,看下圖示例。
schedule
ScheduledThreadPoolExecutor方法實現了ScheduledExecutorService,schedule方法排程的任務只執行一次。
先看下schedule方法的實現:
//delay時長後執行任務command,該任務只執行一次
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//這裡的decorateTask方法僅僅返回第二個引數
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//延時或者週期執行任務的主要方法
delayedExecute(t);
return t;
}
我們先屢下ScheduledFuture、RunnableScheduledFuture、ScheduledFutureTask的關係,看下類圖:
這個類圖比較複雜,其中淺色部分都是我們已經學習過了,深色部分我們之前沒有接觸過,所以重點學習這幾個類,從上往下依次看。
Delayed介面
Delayed介面提供了getDelay方法,該方法返回物件剩餘時延。介面繼承了Comparable介面,表示物件支援排序,看下該介面的定義:
//繼承Comparable介面,表示該類物件支援排序
public interface Delayed extends Comparable<Delayed> {
//返回該物件剩餘時延
long getDelay(TimeUnit unit);
}
Delayed介面很簡單,繼續看ScheduledFuture介面。
ScheduledFuture介面
ScheduledFuture是延時的Future,僅僅繼承了Delayed和Future介面,並沒有新增其他方法,看下該介面的定義:
//僅僅繼承了Delayed和Future介面
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}
RunnableScheduledFuture介面
可執行的ScheduledFuture,該介面繼承了ScheduledFuture和RunnableFuture介面。
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
//是否是週期任務,週期任務可被排程執行多次,非週期任務只被執行一次
boolean isPeriodic();
}
ScheduledFutureTask類
該類是ScheduledThreadPoolExecutor的內部類,繼承了FutureTask,實現了RunnableScheduledFuture介面。FutureTask我們在介紹執行緒池的時候講過。先看下ScheduledFutureTask的構造方法:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
//呼叫父類FutureTask的構造方法
super(r, result);
//time表示任務下次執行的時間
this.time = ns;
//週期任務,正數表示按照固定速率,負數表示按照固定時延
this.period = period;
//任務的編號
this.sequenceNumber = sequencer.getAndIncrement();
}
這裡需要注意幾點,
- time表示任務下一次執行的時間,單位為納秒。
- period=0表示該任務不是週期性任務,正數表示每隔period時長執行任務,負數表示任務執行完成後到下一次被排程執行的延時時間。
- sequenceNumber表示該任務的編號,通過執行緒池的sequencer成員變數從0開始生成編號。
繼續看下getDelay方法:
getDelay
//實現Delayed介面的getDelay方法,返回任務開始執行的剩餘時間
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
這個方法其實就是任務開始執行的倒計時時間,通過任務預期執行時間減去當前時間獲得,單位是納秒。
compareTo
該方法實現了Comparable介面的compareTo方法,比較兩個任務的”大小”。後面我們會講到,可排程的執行緒池其實利用了可排序的延時佇列,延時佇列儲存了ScheduledFutureTask任務,並且佇列中的元素會根據開始執行的倒計時時間排序,剩餘等待時間最少的將會被最先排程執行。這裡排序策略就是根據compareTo方法實現的。
public int compareTo(Delayed other) {
if (other == this)
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
//小於0,說明當前任務的執行時間點早於other,要排在延時佇列other的前面
if (diff < 0)
return -1;
//大於0,說明當前任務的執行時間點晚於other,要排在延時佇列other的後面
else if (diff > 0)
return 1;
//如果兩個任務的執行時間點一樣,比較兩個任務的編號,編號小的排在佇列前面,編號大的排在佇列後面
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
//如果任務型別不是ScheduledFutureTask,通過getDelay方法比較
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
setNextRunTime
任務執行完後,設定下次執行的時間
private void setNextRunTime() {
long p = period;
//p>0,說明是固定速率執行的任務,在原來任務開始執行時間的基礎上加上p即可
if (p > 0)
time += p;
//p<0,說明是固定時延執行的任務,下次執行時間在當前時間(任務執行完成的時間)的基礎上加上-p的時間
else
time = triggerTime(-p);
}
任務執行完成後需要確定下次執行的時間,如果任務是以固定速率執行的,下次開始執行時間就是上次任務開始執行時間加上period。如果任務是以固定延時執行的,下次開始執行時間就是當前時間(上次任務線束時間)加上period(取正值)。
cancel
取消任務的執行,重點關注將取消的任務從佇列移除的邏輯。
public boolean cancel(boolean mayInterruptIfRunning) {
//呼叫FutureTask的cancel方法
boolean cancelled = super.cancel(mayInterruptIfRunning);
//cancelled: 任務取消成功
//removeOnCancel:任務取消後從佇列移除
//headIndex:任務原先處於二叉堆的位置
if (cancelled && removeOnCancel && heapIndex >= 0)
//從佇列中移除,該方法是ThreadPoolExecutor的方法
remove(this);
//返回是否取消成功
return cancelled;
}
run
ScheduledFutureTask重寫了FutureTask的run方法。
public void run() {
boolean periodic = isPeriodic();
//如果當前狀態下不能執行任務,則取消任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
//不是週期性任務,執行一次任務即可,呼叫父類的run方法
else if (!periodic)
ScheduledFutureTask.super.run();
//是週期性任務,呼叫FutureTask的runAndReset方法,方法執行完成後
//重新設定任務下一次執行的時間,並將該任務重新入隊,等待再次被排程
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
註釋已經解釋的很清楚了,重點看下FutureTask的runAndReset方法,該方法是為任務多次執行而設計的。runAndReset方法執行完任務後不會設定任務的執行結果,也不會去更新任務的狀態,維持任務的狀態為初始狀態(NEW狀態),這也是該方法和FutureTask的run方法的區別。
好了,講完了ScheduledFutureTask,接著看ScheduledPoolExecutor原始碼。
通常我們通過submit或者execute方法將任務提交給執行緒池執行,這兩個方法最終都是呼叫了schedule方法,前面已經講過,schedule方法只會排程任務執行一次。那麼ScheduledThreadPoolExecutor是怎樣排程固定週期或延時的任務的呢?是通過scheduledAtFixedRate和scheduledAtFixedDelay方法實現的,我們先看下scheduledAtFixedRate原始碼:
scheduledAtFixedRate
關於該方法的說明,我們在ScheduledExecutorService介面已經說明過了,這裡主要看下實現。
//注意,固定速率和固定時延,傳入的引數都是Runnable,也就是說這種定時任務是沒有返回值的
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//建立一個有初始延時和固定週期的任務
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//outerTask表示將會重新入隊的任務
sft.outerTask = t;
//稍後說明
delayedExecute(t);
return t;
}
其實主要建立了一個帶有初始延時和固定週期的任務,類似的,scheduledAtFixedDelay建立一個帶有初始延時和任務間固定延時的任務。
scheduledAtFixedDelay
和scheduledAtFixedRate類似,唯一不同的地方在於在於建立的ScheduledFutureTask不同,FixedRate和FixedDelay也是通過ScheduledFutureTask體現的。這裡不再展示程式碼了。
delayedExecute
前面講到的schedule、scheduleAtFixedRate和scheduleAtFixedDelay最後都呼叫了delayedExecute方法,該方法是定時任務執行的主要方法。看下delayedExecute原始碼:
private void delayedExecute(RunnableScheduledFuture<?> task) {
//執行緒池已經關閉,呼叫拒絕執行處理器處理
if (isShutdown())
reject(task);
else {
//將任務加入到等待佇列
super.getQueue().add(task);
//執行緒池已經關閉,且當前狀態不能執行該任務,將該任務從等待佇列移除並取消該任務
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//增加一個worker,就算corePoolSize=0也要增加一個worker
ensurePrestart();
}
}
delayedExecute方法的邏輯也很簡單,主要就是將任務新增到等待佇列並增加一個worker,增加的worker並不能立即執行該任務,因為該任務可能要等待一定時間後才能執行。
對於ScheduledThreadPoolExecutor,worker新增到執行緒池後會在等待佇列上等待獲取任務,這點是和ThreadPoolExecutor一致的。但是worker是怎麼從等待佇列取定時任務的?該等待佇列隊首應該儲存的是最近將要執行的任務,如果隊首任務的開始執行時間還未到,worker也應該繼續等待。
ScheduledThreadPoolExecutor實現了一個延時佇列,該佇列不僅實現了阻塞佇列的功能,也實現了排序功能。後面我們會發現,該佇列是通過二叉堆實現的,理解了該佇列基本上能夠理解ScheduledThreadPoolExecutor了,因此我們好好學習下該佇列。
ScheduledThreadPoolExecutor內部類DelayedWorkQueue就是儲存定時任務的等待佇列。
DelayedWorkQueue
看下DelayedWorkQueue的宣告:
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
//……
}
DelayedWorkQueue繼承了AbstractQueue抽象類、實現了BlockingQueue介面。
理解DelayedWorkQueue之前需要理解堆排序,這裡的堆排序演算法和DelayedWorkQueue的稍有不同,但是基本思想是相同的。
堆排序是通過陣列實現的,因此DelayedWorkQueue定義了一個數組作為等待佇列。
//佇列初始容量
private static final int INITIAL_CAPACITY = 16;
//陣列用來儲存定時任務,通過陣列實現堆排序
private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
DelayedWorkQueue儲存了當前在隊首等待的執行緒:
private Thread leader = null;
當一個執行緒成為leader,它只要等待隊首任務的delay時間即可,其他執行緒會無條件等待。leader取到任務返回前要通知其他執行緒,直到有執行緒成為新的leader。每當隊首的定時任務被其他更早需要執行的任務替換時,leader設定為null,其他等待的執行緒(被當前leader通知)和當前的leader重新競爭成為leader。
DelayedWorkQueue定義了鎖lock和條件available用於執行緒競爭成為leader。
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
當一個新的任務成為隊首,或者需要有新的執行緒成為leader時,available條件將會被通知。
執行緒取任務時需要在available條件上等待,當被通知時,該執行緒可能會成為新的leader。
我們先看下DelayedWorkQueue的take方法
take
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//取堆頂的任務,堆頂是最近要執行的任務
RunnableScheduledFuture first = queue[0];
//堆頂為空,執行緒要在條件available上等待
if (first == null)
available.await();
else {
//堆頂任務還要多長時間才能執行
long delay = first.getDelay(TimeUnit.NANOSECONDS);
//堆頂任務已經可以執行了,finishPoll會重新調整堆,使其滿足最小堆特性,該方法設定任務在
//堆中的index為-1並返回該任務
if (delay <= 0)
return finishPoll(first);
//如果leader不為空,說明已經有執行緒成為leader並等待堆頂任務
//到達執行時間,此時,其他執行緒都需要在available條件上等待
else if (leader != null)
available.await();
else {
//leader為空,當前執行緒成為新的leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//當前執行緒已經成為leader了,只需要等待堆頂任務到達執行時間即可
available.awaitNanos(delay);
} finally {
//返回堆頂元素之前將leader設定為空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//通知其他在available條件等待的執行緒,這些執行緒可以去競爭成為新的leader
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
再梳理下take方法的邏輯
- 如果堆頂元素為空,在available條件上等待。
- 如果堆頂任務的執行時間已到,將堆頂元素替換為堆的最後一個元素並調整堆使其滿足最小堆特性,同時設定任務在堆中索引為-1,返回該任務。
- 如果leader不為空,說明已經有執行緒成為leader了,其他執行緒都要在available條件上等待。
- 如果leader為空,當前執行緒成為新的leader,並等待直到堆頂任務執行時間到達。
- take方法返回之前,將leader設定為空,並通知其他執行緒。
繼續看下finishPool方法:
private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
//堆元素數量減1
int s = --size;
//取堆的最後一個元素
RunnableScheduledFuture x = queue[s];
queue[s] = null;
if (s != 0)
//調整堆,使其重新滿足最小堆特性,從位置0開始往堆的底層調整
siftDown(0, x);
//該任務在堆中的索引設定為-1
setIndex(f, -1);
//返回該任務
return f;
}
offer
該方法往佇列插入一個值,返回是否成功插入。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
//佇列元素已經大於等於陣列的長度,需要擴容,新堆的容易是原來堆容量的1.5倍
if (i >= queue.length)
grow();
//堆中元素增加1
size = i + 1;
//調整堆
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
//通知其他在available條件上等待的執行緒,這些執行緒可以競爭成為新的leader
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
siftUp
該方法是調整堆的方法,調整堆的目的是使其滿足最小堆的特性。
//從位置k開始往堆頂方向查詢,直到找到key儲存的位置
private void siftUp(int k, RunnableScheduledFuture key) {
while (k > 0) {
//parent是父節點的索引
int parent = (k - 1) >>> 1;
RunnableScheduledFuture e = queue[parent];
//如果父節點比子節點e的執行時間要早,說明已經符合最小堆的特性,跳出迴圈
if (key.compareTo(e) >= 0)
break;
//子節點比父節點更早執行,將子節點位置的值替換為父節點
queue[k] = e;
setIndex(e, k);
//繼續往上查詢
k = parent;
}
//k是最終key存放的位置
queue[k] = key;
setIndex(key, k);
}
看下siftUp示例圖,對於左邊這個堆來說,在位置K處往堆頂方向查詢key=12的位置,因為父節點值為23,大於12,因此將23移到位置K處,位置K上移到父節點所在位置,繼續往堆頂方向查詢key=12的位置。
如果查詢key=50,因為父節點23小於50,因此位置K就是key=50的最終儲存位置。
siftDown
該方法和siftUp方法類似
//從位置k處開始往下查詢,找到key的儲存位置
private void siftDown(int k, RunnableScheduledFuture key) {
//從half開始,就不再有孩子節點的,這是一個優化
int half = size >>> 1;
while (k < half) {
//左孩子位置
int child = (k << 1) + 1;
RunnableScheduledFuture c = queue[child];
//右孩子位置
int right = child + 1;
//如果右孩子存在,並且右孩子比左孩子更早執行,更新c為右孩子
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
//以上做的都是取兩個孩子中更早執行的那個孩子節點,取到後和key比較
//如果key比兩個孩子都更早執行,位置k就是key的最終位置了,跳出迴圈
if (key.compareTo(c) <= 0)
break;
//更早執行的孩子放到父節點處
queue[k] = c;
setIndex(c, k);
//繼續往下查詢
k = child;
}
queue[k] = key;
setIndex(key, k);
}
看下siftDown的示例圖,對於左邊的堆來說,在位置K處開始往下查詢key的位置,如果key=12,因為12小於兩個孩子中的最小結點35,因此位置K就是key=12的最終儲存位置。如果key=50,因為50大於兩個孩子結點中的最小結點35,因此將35上移到父節點,位置K下移到35所在的位置,繼續往堆底查詢。