Java定時排程機制 - ScheduledExecutorService
前言
通過上一篇文章【 Java定時排程機制 - Timer 】的分析,我們知道,Java的定時排程可以通過 Timer&TimerTask
來實現。由於其實現的方式為單執行緒,因此從JDK1.3釋出之後就一直存在一些問題,大致如下:
- 多個任務之間會相互影響
- 多個任務的執行是序列的,效能較低
ScheduledExecutorService
在設計之初就是為了解決 Timer&TimerTask
的這些問題。因為天生就是基於多執行緒機制,所以任務之間不會相互影響(只要執行緒數足夠。當執行緒數不足時,有些任務會複用同一個執行緒)。
除此之外,因為其內部使用的延遲佇列,本身就是基於 等待/喚醒
機制實現的,所以CPU並不會一直繁忙。同時,多執行緒帶來的CPU資源複用也能極大地提升效能。
如何使用
基本作用
因為 ScheduledExecutorService
繼承於 ExecutorService
,所以本身支援執行緒池的所有功能。額外還提供了4種方法,我們來看看其作用。
/** * 帶延遲時間的排程,只執行一次 * 排程之後可通過Future.get()阻塞直至任務執行完畢 */ 1. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); /** * 帶延遲時間的排程,只執行一次 * 排程之後可通過Future.get()阻塞直至任務執行完畢,並且可以獲取執行結果 */ 2. public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * 帶延遲時間的排程,迴圈執行,固定頻率 */ 3. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** * 帶延遲時間的排程,迴圈執行,固定延遲 */ 4. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
1. schedule Runnable
該方法用於帶延遲時間的排程,只執行一次。排程之後可通過 Future.get()
阻塞直至任務執行完畢。我們來看一個例子。
@Test public void test_schedule4Runnable() throws Exception { ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture future = service.schedule(() -> { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task finish time: " + format(System.currentTimeMillis())); }, 1000, TimeUnit.MILLISECONDS); System.out.println("schedule finish time: " + format(System.currentTimeMillis())); System.out.println("Runnable future's result is: " + future.get() + ", and time is: " + format(System.currentTimeMillis())); }
上述程式碼達到的效果應該是這樣的:延遲執行時間為1秒,任務執行3秒,任務只執行一次,同時通過 Future.get()
阻塞直至任務執行完畢。
我們執行看到的效果的確和我們猜想的一樣,如下圖所示。

執行結果
2. schedule Callable
在schedule Runnable的基礎上,我們將 Runnable
改為 Callable
來看一下。
@Test public void test_schedule4Callable() throws Exception { ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture<String> future = service.schedule(() -> { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task finish time: " + format(System.currentTimeMillis())); return "success"; }, 1000, TimeUnit.MILLISECONDS); System.out.println("schedule finish time: " + format(System.currentTimeMillis())); System.out.println("Callable future's result is: " + future.get() + ", and time is: " + format(System.currentTimeMillis())); }
執行看到的結果和 Runnable
基本相同,唯一的區別在於 future.get()
能拿到 Callable
返回的真實結果。

執行結果
3. scheduleAtFixedRate
該方法用於 固定頻率
地對一個任務迴圈執行,我們通過一個例子來看看效果。
@Test public void test_scheduleAtFixedRate() { ScheduledExecutorService service = Executors.newScheduledThreadPool(5); service.scheduleAtFixedRate(() -> { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task finish time: " + format(System.currentTimeMillis())); }, 1000L, 1000L, TimeUnit.MILLISECONDS); System.out.println("schedule finish time: " + format(System.currentTimeMillis())); while (true) { } }
在這個例子中,任務初始延遲1秒,任務執行3秒,任務執行間隔為1秒。我們來看看執行結果:

執行結果
4. scheduleWithFixedDelay
該方法用於 固定延遲
地對一個任務迴圈執行,我們通過一個例子來看看效果。
@Test public void test_scheduleWithFixedDelay() { ScheduledExecutorService service = Executors.newScheduledThreadPool(5); service.scheduleWithFixedDelay(() -> { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task finish time: " + format(System.currentTimeMillis())); }, 1000L, 1000L, TimeUnit.MILLISECONDS); System.out.println("schedule finish time: " + format(System.currentTimeMillis())); while (true) { } }
在這個例子中,任務初始延遲1秒,任務執行3秒,任務執行間隔為1秒。我們來看看執行結果:

執行結果
5. scheduleAtFixedRate和scheduleWithFixedDelay的區別
既然這兩個方法都是對任務迴圈執行,那麼他們又有何區別呢?通過jdk文件我們找到了答案。

scheduleAtFixedRate - javadoc

scheduleWithFixedDelay - javadoc
直白地講, scheduleAtFixedRate()
為固定頻率, scheduleWithFixedDelay()
為固定延遲。固定頻率是相對於任務執行的開始時間,而固定延遲是相對於任務執行的結束時間,這就是他們最根本的區別!
另外,從3和4的執行結果也能看出這些差異。
原始碼閱讀初體驗
一般原始碼的入口在於構造方法,我們來看看。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
在構造方法中我們看到以下資訊:
-
ScheduledThreadPoolExecutor
構造方法最終呼叫的是ThreadPoolExecutor
構造方法 - 阻塞佇列使用的是
DelayedWorkQueue
上述資訊的第2點至關重要,但是限於篇幅,本文將不做深入分析。
接下來我們看看 scheduleWithFixedDelay()
方法,主要做了3件事情:
- 入參校驗,包括空指標、數字範圍
- 將
Runnable
包裝成RunnableScheduledFuture
- 延遲執行
RunnableScheduledFuture
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { // 1. 入參校驗,包括空指標、數字範圍 if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // 2. 將Runnable包裝成`RunnableScheduledFuture` ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 3. 延遲執行`RunnableScheduledFuture` delayedExecute(t); return t; }
delayedExecute()
這個方法從字面描述來看是延遲執行的意思,我們深入到這個方法裡面去看看。
private void delayedExecute(RunnableScheduledFuture<?> task) { // 1. 執行緒池執行狀態判斷 if (isShutdown()) reject(task); else { // 2. 將任務新增到佇列 super.getQueue().add(task); // 3. 如果任務新增到佇列之後,執行緒池狀態變為非執行狀態, // 需要將任務從佇列移除,同時通過任務的`cancel()`方法來取消任務 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); // 4. 如果任務新增到佇列之後,執行緒池狀態是執行狀態,需要提前啟動執行緒 else ensurePrestart(); } }
線上程池狀態正常的情況下,最終會呼叫 ensurePrestart()
方法來完成執行緒的建立。主要邏輯有兩個:
- 當前執行緒數未達到核心執行緒數,則建立核心執行緒
- 當前執行緒數已達到核心執行緒數,則建立非核心執行緒,不會將任務放到阻塞佇列中,這一點是和普通執行緒池是不相同的
/** * Same as prestartCoreThread except arranges that at least one * thread is started even if corePoolSize is 0. */ void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 1. 當前執行緒數未達到核心執行緒數,則建立核心執行緒 if (wc < corePoolSize) addWorker(null, true); // 2. 當前執行緒數已達到核心執行緒數,則建立非核心執行緒, // 2.1 不會將任務放到阻塞佇列中,這一點是和普通執行緒池是不相同的 else if (wc == 0) addWorker(null, false); }
至此,除了 DelayedWorkQueue
延遲佇列的原始碼還未分析,其他的我們都分析完了。
總結
首先,我們瞭解了 ScheduledExecutorService
的基本作用,然後在此基礎上寫了一些demo來做驗證,得到的結果和基本作用是完全相同的。
然後,我們對其內部的實現原理和原始碼做了初步的分析,知道了其和普通執行緒池是不同的地方在於: 阻塞佇列
和 建立執行緒的方式
。