在上一篇執行緒池的文章《併發程式設計(十一)—— Java 執行緒池 實現原理與原始碼深度解析(一)》中從ThreadPoolExecutor原始碼分析了其執行機制。限於篇幅,留下了ScheduledThreadPoolExecutor未做分析,因此本文繼續從原始碼出發分析ScheduledThreadPoolExecutor的內部原理。
類宣告
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,實現了ScheduledExecutorService。因此它具有ThreadPoolExecutor的所有能力。所不同的是它具有定時執行,以週期或間隔迴圈執行任務等功能。
這裡我們先看下ScheduledExecutorService的原始碼:
ScheduledExecutorService
//可排程的執行者服務介面
public interface ScheduledExecutorService extends ExecutorService { //指定時延後排程執行任務,只執行一次,沒有返回值
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit); //指定時延後排程執行任務,只執行一次,有返回值
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit); //指定時延後開始執行任務,以後每隔period的時長再次執行該任務
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit); //指定時延後開始執行任務,以後任務執行完成後等待delay時長,再次執行任務
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
其中schedule方法用於單次排程執行任務。這裡主要理解下後面兩個方法。
- scheduleAtFixedRate:該方法在initialDelay時長後第一次執行任務,以後每隔period時長,再次執行任務。注意,period是從任務開始執行算起的。開始執行任務後,定時器每隔period時長檢查該任務是否完成,如果完成則再次啟動任務,否則等該任務結束後才再次啟動任務,看下圖示例
scheduleWithFixDelay:該方法在initialDelay時長後第一次執行任務,以後每當任務執行完成後,等待delay時長,再次執行任務,看下圖示例。
使用例子
1、schedule(Runnable command,long delay, TimeUnit unit)
/**
* @author: ChenHao
* @Date: Created in 14:54 2019/1/11
*/
public class Test1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 延遲1s後開始執行,只執行一次,沒有返回值
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println("gh");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, 1000, TimeUnit.MILLISECONDS);
System.out.println(result.get());
}
}
執行結果:
2、schedule(Callable<V> callable, long delay, TimeUnit unit);
public class Test2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 延遲1s後開始執行,只執行一次,有返回值
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<String> result = executorService.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "ghq";
}
}, 1000, TimeUnit.MILLISECONDS);
// 阻塞,直到任務執行完成
System.out.print(result.get());
}
}
執行結果:
3、scheduleAtFixedRate
/**
* @author: ChenHao
* @Date: Created in 14:54 2019/1/11
*/
public class Test3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
// 從加入任務開始算1s後開始執行任務,1+2s開始執行,1+2*2s執行,1+n*2s開始執行;
// 但是如果執行任務時間大於2s則不會併發執行後續任務,當前執行完後,接著執行下次任務。
ScheduledFuture<?> result = executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(System.currentTimeMillis());
}
}, 1000, 2000, TimeUnit.MILLISECONDS); //一個ScheduledExecutorService裡可以同時新增多個定時任務,這樣就是形成堆
ScheduledFuture<?> result2 = executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(System.currentTimeMillis());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
}
}
這裡可以看到一個ScheduledExecutorService 中可以新增多個定時任務,這是就會形成堆
執行結果:
4、scheduleWithFixedDelay
/**
* @author: ChenHao
* @Date: Created in 14:54 2019/1/11
*/
public class Test4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//任務間以固定時間間隔執行,延遲1s後開始執行任務,任務執行完畢後間隔2s再次執行,任務執行完畢後間隔2s再次執行,依次往復
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(System.currentTimeMillis());
}
}, 1000, 2000, TimeUnit.MILLISECONDS); // 由於是定時任務,一直不會返回
result.get();
System.out.println("over");
}
}
執行結果:
原始碼分析
構造器
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
內部其實都是呼叫了父類ThreadPoolExecutor的構造器,因此它具有ThreadPoolExecutor的所有能力。
通過super方法的引數可知,核心執行緒的數量即傳入的引數,而執行緒池的執行緒數為Integer.MAX_VALUE,幾乎為無上限。
這裡採用了DelayedWorkQueue任務佇列,也是定時任務的核心,是一種優先佇列,時間小的排在前面,所以獲取任務的時候就能先獲取到時間最小的執行,可以看我上篇文章《併發程式設計(十四)—— ScheduledThreadPoolExecutor 實現原理與原始碼深度解析 之 DelayedWorkQueue》。
由於這裡佇列沒有定義大小,所以佇列不會新增滿,因此最大的執行緒數就是核心執行緒數,超過核心執行緒數的任務就放在佇列裡,並不重新開啟臨時執行緒。
我們先來看看幾個入口方法的實現:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
} public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
} public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
} public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
這幾個方法都是將任務封裝成了ScheduledFutureTask,上面做的首先把runnable裝飾為delay佇列所需要的格式的元素,然後把元素加入到阻塞佇列,然後執行緒池執行緒會從阻塞佇列獲取超時的元素任務進行處理,下面看下佇列元素如何實現的。
ScheduledFutureTask
ScheduledFutureTask是一個延時定時任務,它可以返回任務剩餘延時時間,可以被週期性地執行。
屬性
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** 是一個序列,每次建立任務的時候,都會自增。 */
private final long sequenceNumber; /** 任務能夠開始執行的時間 */
private long time; /**
* 任務週期執行的時間
* 0表示不是一個週期定時任務
* 正數表示固定週期時間去執行任務
* 負數表示任務完成之後,延時period時間再去執行任務
*/
private final long period; /** 表示再次執行的任務,在reExecutePeriodic中呼叫 */
RunnableScheduledFuture<V> outerTask = this; /**
* 表示在任務佇列中的索引位置,用來支援快速從佇列中刪除任務。
*/
int heapIndex;
}
ScheduledFutureTask繼承了 FutureTask 和 RunnableScheduledFuture
屬性說明:
- sequenceNumber: 是一個序列,每次建立任務的時候,都會自增。
- time: 任務能夠開始執行的時間。
- period: 任務週期執行的時間。0表示不是一個週期定時任務。
- outerTask: 表示再次執行的任務,在reExecutePeriodic中呼叫
- heapIndex: 表示在任務佇列中的索引位置,用來支援快速從佇列中刪除任務。
構造器
建立延時任務
/**
* 建立延時任務
*/
ScheduledFutureTask(Runnable r, V result, long ns) {
// 呼叫父類的方法
super(r, result);
// 任務開始的時間
this.time = ns;
// period是0,不是一個週期定時任務
this.period = 0;
// 每次建立任務的時候,sequenceNumber都會自增
this.sequenceNumber = sequencer.getAndIncrement();
} /**
* 建立延時任務
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
// 呼叫父類的方法
super(callable);
// 任務開始的時間
this.time = ns;
// period是0,不是一個週期定時任務
this.period = 0;
// 每次建立任務的時候,sequenceNumber都會自增
this.sequenceNumber = sequencer.getAndIncrement();
}
我們看看super(),其實就是FutureTask 裡面的構造方法,關於FutureTask 可以看看我之前的文章《Java 多執行緒(五)—— 執行緒池基礎 之 FutureTask原始碼解析》
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
- 建立延時定時任務
/**
* 建立延時定時任務
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
// 呼叫父類的方法
super(r, result);
// 任務開始的時間
this.time = ns;
// 週期定時時間
this.period = period;
// 每次建立任務的時候,sequenceNumber都會自增
this.sequenceNumber = sequencer.getAndIncrement();
}
延時定時任務不同的是設定了period,後面通過判斷period是否為0來確定是否是定時任務。
run()
public void run() {
// 是否是週期任務
boolean periodic = isPeriodic();
// 如果不能在當前狀態下執行,那麼就要取消任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果只是延時任務,那麼就呼叫run方法,執行任務。
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果是週期定時任務,呼叫runAndReset方法,執行任務。
// 這個方法不會改變任務的狀態,所以可以反覆執行。
else if (ScheduledFutureTask.super.runAndReset()) {
// 設定週期任務下一次執行的開始時間time
setNextRunTime();
// 重新執行任務outerTask
reExecutePeriodic(outerTask);
}
}
這個方法會在ThreadPoolExecutor的runWorker方法中呼叫,而且這個方法呼叫,說明肯定已經到了任務的開始時間time了。這個方法我們待會會再繼續來回看一下
- 先判斷當前執行緒狀態能不能執行任務,如果不能,就呼叫cancel()方法取消本任務。
- 如果任務只是一個延時任務,那麼呼叫父類的run()執行任務,改變任務的狀態,表示任務已經執行完成了。
- 如果任務只是一個週期定時任務,那麼就任務必須能夠反覆執行,那麼就不能呼叫run()方法,它會改變任務的狀態。而是呼叫runAndReset()方法,只是簡單地執行任務,而不會改變任務狀態。
- 設定週期任務下一次執行的開始時間time,並重新執行任務。
schedule(Runnable command, long delay,TimeUnit unit)
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException(); //裝飾任務,主要實現public long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//新增任務到延遲佇列
delayedExecute(t);
return t;
}
獲取延時執行時間
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
} /**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay) {
//當前時間加上延時時間
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
上述的decorateTask方法把Runnable任務包裝成ScheduledFutureTask,使用者可以根據自己的需要覆寫該方法:
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
schedule的核心是其中的delayedExecute方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) // 執行緒池已關閉
reject(task); // 任務拒絕策略
else {
//將任務新增到任務佇列,會根據任務延時時間進行排序
super.getQueue().add(task);
// 如果執行緒池狀態改變了,當前狀態不能執行任務,那麼就嘗試移除任務,
// 移除成功,就取消任務。
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false); // 取消任務
else
// 預先啟動工作執行緒,確保執行緒池中有工作執行緒。
ensurePrestart();
}
}
這個方法的主要作用就是將任務新增到任務佇列中,因為這裡任務佇列是優先順序佇列DelayedWorkQueue,它會根據任務的延時時間進行排序。
如果執行緒池不是RUNNING狀態,不能執行延時任務task,那麼呼叫reject(task)方法,拒絕執行任務task。
將任務新增到任務佇列中,會根據任務的延時時間進行排序。
因為是多執行緒併發環境,就必須判斷在新增任務的過程中,執行緒池狀態是否被別的執行緒更改了,那麼就可能要取消任務了。
將任務新增到任務佇列後,還要確保執行緒池中有工作執行緒,不然任務也不為執行。所以ensurePrestart()方法預先啟動工作執行緒,確保執行緒池中有工作執行緒。
void ensurePrestart() {
// 執行緒池中的執行緒數量
int wc = workerCountOf(ctl.get());
// 如果小於核心池數量,就建立新的工作執行緒
if (wc < corePoolSize)
addWorker(null, true);
// 說明corePoolSize數量是0,必須建立一個工作執行緒來執行任務
else if (wc == 0)
addWorker(null, false);
}
通過ensurePrestart可以看到,如果核心執行緒池未滿,則新建的工作執行緒會被放到核心執行緒池中。如果核心執行緒池已經滿了,ScheduledThreadPoolExecutor不會像ThreadPoolExecutor那樣再去建立歸屬於非核心執行緒池的工作執行緒,加入到佇列就完了,等待核心執行緒執行完任務再拉取佇列裡的任務。也就是說,在ScheduledThreadPoolExecutor中,一旦核心執行緒池滿了,就不會再去建立工作執行緒。
這裡思考一點,什麼時候會執行else if (wc == 0)建立一個歸屬於非核心執行緒池的工作執行緒?
答案是,當通過setCorePoolSize方法設定核心執行緒池大小為0時,這裡必須要保證任務能夠被執行,所以會建立一個工作執行緒,放到非核心執行緒池中。
看到 addWorker(null, true); 並沒有將任務設定進入,而是設定的null, 則說明執行緒池裡執行緒第一次啟動時, runWorker中取到的 firstTask為null,需要通過 getTask() 從佇列中取任務,這裡可以看看我之前寫的關於執行緒池的文章《併發程式設計(十一)—— Java 執行緒池 實現原理與原始碼深度解析(一)》。
getTask()中 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();如果是存在核心執行緒則呼叫take(),如果傳入的核心執行緒為0,則存在一個臨時執行緒,呼叫poll(),這兩個方法都會先獲取時間,看看有沒有達到執行時間,沒有達到執行時間則阻塞,可以看看我上一篇文章,達到執行時間,則取到任務,就會執行下面的run方法。
public void run() {
// 是否是週期任務
boolean periodic = isPeriodic();
// 如果不能在當前狀態下執行,那麼就要取消任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果只是延時任務,那麼就呼叫run方法,執行任務。
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果是週期定時任務,呼叫runAndReset方法,執行任務。
// 這個方法不會改變任務的狀態,所以可以反覆執行。
else if (ScheduledFutureTask.super.runAndReset()) {
// 設定週期任務下一次執行的開始時間time
setNextRunTime();
// 重新執行任務outerTask
reExecutePeriodic(outerTask);
}
} public boolean isPeriodic() {
return period != 0;
}
schedule不是週期任務,那麼呼叫父類的run()執行任務,改變任務的狀態,表示任務已經執行完成了。
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//裝飾任務類,注意period=period>0,不是負的
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//新增任務到佇列
delayedExecute(t);
return t;
}
如果是週期任務則執行上面run()方法中的第12行,呼叫父類中的runAndReset(),這個方法同run方法比較的區別是call方法執行後不設定結果,因為週期型任務會多次執行,所以為了讓FutureTask支援這個特性除了發生異常不設定結果。
執行完任務後通過setNextRunTime方法計算下一次啟動時間:
private void setNextRunTime() {
long p = period;
//period=delay;
if (p > )
time += p;//由於period>0所以執行這裡,設定time=time+delay
else
time = triggerTime(-p);
} long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> )) ? delay : overflowFree(delay));
}
scheduleAtFixedRate會執行到情況一,下一次任務的啟動時間最早為上一次任務的啟動時間加period。
scheduleWithFixedDelay會執行到情況二,這裡很巧妙的將period引數設定為負數到達這段程式碼塊,在此又將負的period轉為正數。情況二將下一次任務的啟動時間設定為當前時間加period。
然後將任務再次新增到任務佇列:
/**
* 重新執行任務task
*/
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 判斷當前執行緒池狀態能不能執行任務
if (canRunInCurrentRunState(true)) {
// 將任務新增到任務佇列,會根據任務延時時間進行排序
super.getQueue().add(task);
// 如果執行緒池狀態改變了,當前狀態不能執行任務,那麼就嘗試移除任務,
// 移除成功,就取消任務。
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 預先啟動工作執行緒,確保執行緒池中有工作執行緒。
ensurePrestart();
}
}
這個方法與delayedExecute方法很像,都是將任務新增到任務佇列中。
- 如果當前執行緒池狀態能夠執行任務,那麼任務新增到任務佇列。
- 如果在在新增任務的過程中,執行緒池狀態是否被別的執行緒更改了,那麼就要進行判斷,是否需要取消任務。
- 呼叫ensurePrestart()方法,預先啟動工作執行緒,確保執行緒池中有工作執行緒。
ScheduledFuture的get方法
既然ScheduledFuture的實現是ScheduledFutureTask,而ScheduledFutureTask繼承自FutureTask,所以ScheduledFuture的get方法的實現就是FutureTask的get方法的實現,FutureTask的get方法的實現分析在ThreadPoolExecutor篇已經寫過,這裡不再敘述。要注意的是ScheduledFuture的get方法對於非週期任務才是有效的。
ScheduledThreadPoolExecutor總結
ScheduledThreadPoolExecutor和ThreadPoolExecutor的區別:
ThreadPoolExecutor每次addwoker就會將自己的Task傳進新建立的woker中的執行緒執行,因此woker會第一時間執行當前Task,只有執行緒數超過了核心執行緒才會將任務放進佇列裡
ScheduledThreadPoolExecutor是直接入佇列,並且建立woker時傳到woker的是null,說明woker中的執行緒剛啟動時並沒有任務執行,只能通過getTask去佇列裡取任務,取任務時會判斷是否到了執行時間,因此具有了延時執行的特性,並且task執行完了,會將當前任務重新放進堆裡,並設定下次執行的時間。
ScheduledThreadPoolExecutor是實現自ThreadPoolExecutor的執行緒池,構造方法中傳入引數n,則最多會有n個核心執行緒工作,空閒的核心執行緒不會被自動終止,而是一直阻塞在DelayedWorkQueue的take方法嘗試獲取任務。構造方法傳入的引數為0,ScheduledThreadPoolExecutor將以非核心執行緒工作,並且最多隻會建立一個非核心執行緒,參考上文中ensurePrestart方法的執行過程。而這個非核心執行緒以poll方法獲取定時任務之所以不會因為超時就被回收,是因為任務佇列並不為空,只有在任務佇列為空時才會將空閒執行緒回收,詳見ThreadPoolExecutor篇的runWorker方法,之前我以為空閒的非核心執行緒超時就會被回收是不正確的,還要具備任務佇列為空這個條件。
ScheduledThreadPoolExecutor的定時執行任務依賴於DelayedWorkQueue,其內部用可擴容的陣列實現以啟動時間升序的二叉樹。
工作執行緒嘗試獲取DelayedWorkQueue的任務只有在任務到達指定時間才會成功,否則非核心執行緒會超時返回null,核心執行緒一直阻塞。
對於非週期型任務只會執行一次並且可以通過ScheduledFuture的get方法阻塞得到結果,其內部實現依賴於FutureTask的get方法。
週期型任務通過get方法無法獲取有效結果,因為FutureTask對於週期型任務執行的是runAndReset方法,並不會設定結果。週期型任務執行完畢後會重新計算下一次啟動時間並且再次新增到DelayedWorkQueue中,所有的Task會公用一個佇列,如果一個定時器裡新增多個任務,此時就會形成堆,如果只是一個定時任務,則每次只有堆頂一個數據,並且也只需要一個核心執行緒就夠用了,因為只有當前任務執行完才會再將該任務新增到堆裡。