Java中排程執行緒池ScheduledThreadPoolExecutor原理探究
一、 前言
前面講解過Java中執行緒池ThreadPoolExecutor原理探究,ThreadPoolExecutor是Executors中一部分功能,下面來介紹另外一部分功能也就是ScheduledThreadPoolExecutor的實現,後者是一個可以在一定延遲時候或者定時進行任務排程的執行緒池。
二、 類圖結構
Executors其實是個工具類,裡面提供了好多靜態方法,根據使用者選擇返回不同的執行緒池例項。
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor並實現ScheduledExecutorService介面,關於ThreadPoolExecutor的介紹可以參考:
執行緒池佇列是DelayedWorkQueue,它是對delayqueue的優化,關於delayqueue參考:http://www.jianshu.com/p/2659eb72134b
ScheduledFutureTask是阻塞佇列元素是對任務修飾。
建構函式:
//使用改造後的delayqueue.
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
三、一個例子
// 任務間以固定時間間隔執行,延遲1s後開始執行任務,任務執行完畢後間隔2s再次執行,任務執行完畢後間隔2s再次執行,依次往復
static void scheduleWithFixedDelay() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
System.out.println(System.currentTimeMillis());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
// 由於是定時任務,一直不會返回
result.get();
System.out.println("over");
}
// 相對開始加入任務的時間點固定頻率執行:從加入任務開始算1s後開始執行任務,1+2s開始執行,1+2*2s執行,1+n*2s開始執行;
// 但是如果執行任務時間大約2s則不會併發執行後續任務將會延遲。
static void scheduleAtFixedRate() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println(System.currentTimeMillis());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
// 由於是定時任務,一直不會返回
result.get();
System.out.println("over");
}
// 延遲1s後開始執行,只執行一次,沒有返回值
static void scheduleRunable() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println("gh");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, 1000, TimeUnit.MILLISECONDS);
System.out.println(result.get());
}
// 延遲1s後開始執行,只執行一次,有返回值
static void scheduleCaller() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<String> result = executorService.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "gh";
}
}, 1000, TimeUnit.MILLISECONDS);
// 阻塞,直到任務執行完成
System.out.print(result.get());
}
三、 原始碼分析
3.1 schedule(Runnable command, long delay,TimeUnit unit)方法
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//裝飾任務,主要實現public long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//新增任務到延遲佇列
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果執行緒池關閉了,則拒絕任務
if (isShutdown())
reject(task);
else {
//新增任務到佇列
super.getQueue().add(task);
//再次檢查執行緒池關閉
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//確保至少一個執行緒在處理任務,即使核心執行緒數corePoolSize為0
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
//增加核心執行緒數
if (wc < corePoolSize)
addWorker(null, true);
//如果初始化corePoolSize==0,則也新增一個執行緒。
else if (wc == 0)
addWorker(null, false);
}
上面做的首先吧runnable裝飾為delay佇列所需要的格式的元素,然後把元素加入到阻塞佇列,然後執行緒池執行緒會從阻塞佇列獲取超時的元素任務進行處理,下面看下佇列元素如何實現的。
//r為被修飾任務,result=null,ns為當前時間加上delay時間後的
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//通過介面卡把runnable轉換為callable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
- 過期時間計算
//元素過期演算法,裝飾後時間-當前時間,就是即將過期剩餘時間 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); }
- 元素比較
public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }
schedule(Callable<V> callable,
long delay,
TimeUnit unit)和schedule(Runnable command, long delay,TimeUnit unit)類似。
compareTo作用是在加入元素到dealy佇列時候進行比較,需要調整堆讓最快要過期的元素放到隊首。所以無論什麼時候向佇列裡面新增元素,隊首的都是最即將過期的元素。
3.2 scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
定時排程:相鄰任務間時間固定
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//修飾包裝,注意這裡是period=-delay<0
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//新增任務到佇列
delayedExecute(t);
return t;
}
//period為 delay時間
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
我們知道任務新增到佇列後,工作執行緒會從佇列獲取並移除到期的元素,然後執行run方法,所以下面看看ScheduledFutureTask的run方法如何實現定時排程的
public void run() {
//是否只執行一次
boolean periodic = isPeriodic();
//取消任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
//只執行一次,呼叫schdule時候
else if (!periodic)
ScheduledFutureTask.super.run();
//定時執行
else if (ScheduledFutureTask.super.runAndReset()) {
//設定time=time+period
setNextRunTime();
//重新加入該任務到delay佇列
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else//由於period=-delay所以執行這裡,設定time=now()+delay
time = triggerTime(-p);
}
總結:定時排程是先從佇列獲取任務然後執行,然後在重新設定任務時間,在把任務放入佇列實現的。
如果任務執行時間大於delay時間則等任務執行完畢後的delay時間後在次呼叫任務,不會同一個任務併發執行。
3.3 scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
定時排程:相對起始時間點固定頻率呼叫
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//裝飾任務類,注意period=period>0,不是負的
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//新增任務到佇列
delayedExecute(t);
return t;
}
private void setNextRunTime() {
long p = period;
//period=delay;
if (p > 0)
time += p;//由於period>0所以執行這裡,設定time=time+delay
else
time = triggerTime(-p);
}
總結:相對於上面delay,rate方式執行規則為時間為initdelday + n*period;時候啟動任務,但是如果當前任務還沒有執行完,要等到當前任務執行完畢後在執行一個任務。
四、 總結
排程執行緒池主要用於定時器或者延遲一定時間在執行任務時候使用。內部使用優化的DelayQueue來實現,由於使用佇列來實現定時器,有出入隊調整堆等操作,所以定時並不是非常非常精確。