1. 程式人生 > >Java併發包原始碼學習系列:執行緒池ScheduledThreadPoolExecutor原始碼解析

Java併發包原始碼學習系列:執行緒池ScheduledThreadPoolExecutor原始碼解析

[toc] ## ScheduledThreadPoolExecutor概述 我們在上一篇學習了ThreadPoolExecutor的實現原理:[Java併發包原始碼學習系列:執行緒池ThreadPoolExecutor原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113786358) 本篇我們來學習一下在它基礎之上的擴充套件:ScheduledThreadPoolExecutor。它繼承了ThreadPoolExecutor並實現了ScheduledExecutorService介面,是一個可以在**指定一定延遲時間**後或者**定時進行任務排程**執行的執行緒池。 ```java public class TestScheduledThreadPool { private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public static void main (String[] args) throws InterruptedException { scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run () { System.out.println("command .. " + new Date()); } }, 0, 1, TimeUnit.SECONDS); } } ``` 簡單看一個demo吧,這裡使用Executors工具類建立ScheduledExecutorService,起始就是例項化了一個ScheduledThreadPoolExecutor,當然我們自定義也是可以的。 接著呼叫`scheduleAtFixedRate`方法,指定延遲為0,表示立即執行, 指定period為1,以1s為週期定時執行該任務。 **從整體感知ScheduledThreadPoolExecutor的執行** 1. 當呼叫scheduleAtFixedRate時,將會向延遲佇列中新增一個任務ScheduledFutureTask。 2. 執行緒池中的執行緒從延遲佇列中獲取任務,並執行。 ## 類圖結構 ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210219200946218-1976645950.png) - 可以通過Executors工具類建立,也可以通過構造方法建立。 ```java //Executors.java public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //ScheduledThreadPoolExecutor.java public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } ``` - ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor並實現了ScheduledExecutorService介面。 - 執行緒池佇列使用DelayedWorkQueue,和DelayedQueue類似,是延遲佇列。 - ScheduledFutureTask是一個具有返回值的任務,繼承自FutureTask。 ## ScheduledExecutorService ScheduledExecutorService代表**可在指定延遲後或週期性地執行執行緒任務執行緒池**,提供瞭如下4個方法: ```java public interface ScheduledExecutorService extends ExecutorService { // 指定command任務將在delay延遲後執行 public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); // 指定callable任務將在delay延遲後執行 public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit); // 指定command任務將在delay延遲後執行,而且以設定頻率重複執行 // initialDelay + period 開始, initialDelay + n * period 處執行 public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); // 建立並執行一個在給定初始延遲後首次啟用的定期操作,隨後在每一次執行終止和下一次執行開始之間 // 都存在給定的延遲。如果任務在任一一次執行時遇到異常,就會取消後續執行; // 否則,只能通過程式來顯式取消或終止該任務 public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); } ``` ## ScheduledFutureTask 可以按照DelayQueue中的Delayed的元素理解,是具體放入延遲佇列中的東西,可以看到實現了getDelay和compareTo方法。 - getDelay獲取元素剩餘時間,也就是當前任務還剩多久過期,【剩餘時間 = 到期時間 - 當前時間】。 - compareTo方法作為排序規則,一般規定最快過期的元素放到隊首,q.peek()出來的就是最先過期的元素。 ```java private class ScheduledFutureTask extends FutureTask implements RunnableScheduledFuture { /** FIFO佇列中的序列號,time相同,序列號小的排在前面 */ private final long sequenceNumber; /** 任務將要被執行的時間,也就是過期時間 */ private long time; /** * period == 0 當前任務是一次性的, 執行完畢後就退出 * period > 0 當前任務是fixed-delay任務,是固定延遲的定時可重複執行任務 * period < 0 當前任務是fixed-rate任務,是固定頻率的定時可重複執行任務 */ private final long period; /** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture outerTask = this; /** * Index into delay queue, to support faster cancellation. */ int heapIndex; //... 省略建構函式 // 當前任務還剩多久過期 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } // 佇列中的比較策略 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask x = (ScheduledFutureTask)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; // time相同,序列號小的排在前面 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } //... 省略其他方法 } ``` ## FutureTask FutureTask內部使用一個state變量表示任務狀態。 ```java public class FutureTask implements RunnableFuture { /** * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; // 初始狀態 private static final int COMPLETING = 1; // 執行中 private static final int NORMAL = 2; // 正常執行結束 private static final int EXCEPTIONAL = 3; // 執行中異常 private static final int CANCELLED = 4; // 任務被取消 private static final int INTERRUPTING = 5; // 任務正在被中斷 private static final int INTERRUPTED = 6; // 任務已經被中斷 } ``` ## schedule 提交一個延遲執行的任務,任務從提交時間算起延遲單位為unit的delay時間後開始執行。 如果提交的任務不是週期性的任務,任務只會執行一次。 ```java public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { // 引數校驗 if (command == null || unit == null) throw new NullPointerException(); // 任務轉換: 把command任務轉換為ScheduledFutureTask RunnableScheduledFuture t = decorateTask(command, new ScheduledFutureTask(command, null, triggerTime(delay, unit))); // 新增任務到延遲佇列 delayedExecute(t); return t; } // 將延遲時間轉換為絕對時間, private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } // 將當前的那描述加上延遲的nanos後的long型值 long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } private class ScheduledFutureTask extends FutureTask implements RunnableScheduledFuture { ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); // 呼叫FutureTask的構造方法 this.time = ns; this.period = 0; // 這裡表示任務是一次性的 this.sequenceNumber = sequencer.getAndIncrement(); } } // FutureTask.java public class FutureTask implements RunnableFuture { public FutureTask(Runnable runnable, V result) { // 將runnable轉化為callable this.callable = Executors.callable(runnable, result); // 設定當前的任務狀態為NEW this.state = NEW; // ensure visibility of callable } } ``` ### void delayedExecute(task) 1. 首先判斷當前執行緒池是否已經關閉,如果已經關閉則執行執行緒池的拒絕策略,否則將任務新增到延遲佇列。 2. 加入佇列後,還要重新檢查執行緒池是否被關閉,如果已經關閉則從延遲佇列裡刪除剛才新增的任務,但此時可能執行緒池中的執行緒已經執行裡面的任務,此時就需要取消該任務。 ```java private void delayedExecute(RunnableScheduledFuture task) { // 如果執行緒池關閉, 則執行拒絕策略 if (isShutdown()) reject(task); else { // 將任務新增到延遲佇列 super.getQueue().add(task); // 檢查執行緒池狀態,如果已經關閉,則從延遲佇列裡面刪除剛才新增的任務 // 但此時可能執行緒池中的執行緒已經從任務佇列裡面移除了該任務 // 此時需要呼叫cancel 取消任務 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 確保至少一個執行緒在處理任務 ensurePrestart(); } } ``` ### boolean canRunInCurrentRunState(periodic) 判斷當前任務是否應該被取消。 ```java boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); } ``` periodic引數通過`isPeriodic()`得到,如果period為0,則為false。 相應的isRunningOrShutdown方法傳入的引數就應該是executeExistingDelayedTasksAfterShutdown,預設為true,表示:其他執行緒呼叫了shutdown命令關閉執行緒池後,當前任務還是要執行 ### void ensurePrestart() 確保至少一個執行緒在處理任務:如果執行緒個數小於核心執行緒池數則新增一個執行緒,否則如果當前執行緒數為0,則新增一個執行緒。 ```java void ensurePrestart() { int wc = workerCountOf(ctl.get()); // 增加核心執行緒數 if (wc < corePoolSize) addWorker(null, true); // 如果corePoolSize==0 也新增一個執行緒 else if (wc == 0) addWorker(null, false); } ``` ### ScheduledFutureTask#run() 具體執行任務的執行緒是Worker執行緒,任務執行是Worker執行緒呼叫任務的潤方法執行,這裡的任務是ScheduledFutureTask,也就是呼叫它的run方法。 ```java public void run() { // 是否只執行一次 period != 0 boolean periodic = isPeriodic(); // 取消任務 if (!canRunInCurrentRunState(periodic)) cancel(false); // 任務只執行一次, 呼叫FutureTask的run else if (!periodic) ScheduledFutureTask.super.run(); // 定時執行 else if (ScheduledFutureTask.super.runAndReset()) { // 設定下一次執行時間 setNextRunTime(); // 重新加入延遲佇列 reExecutePeriodic(outerTask); } } ``` ### FutureTask#run() ```java public void run() { // 如果任務不是NEW狀態 直接返回 // 如果是NEW, 但是cas設定當前任務的持有者為當前執行緒失敗 也直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; // 再次判斷任務的狀態,避免兩次判斷狀態之間有其他執行緒對任務狀態進行修改 if (c != null && state == NEW) { V result; boolean ran; try { // 執行任務 result = c.call(); // 執行成功 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } // 如果執行任務成功 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } ``` ### FutureTask#set(V v) ```java protected void set(V v) { // CAS 將當前任務的狀態 從 NEW 轉化 為 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // 走到這裡只有一個執行緒會到這裡,設定任務狀態 為NORMAL 正常結束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } ``` ### FutureTask#setException(Throwable t) ```java protected void setException(Throwable t) { // CAS 將當前任務的狀態 從 NEW 轉化 為 COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; // 走到這裡只有一個執行緒會到這裡,設定任務狀態 為EXCEPTIONAL,非正常結束 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } ``` ## scheduleWithFixedDelay 針對任務型別為fixed-delay,當任務執行完畢後,讓其延遲固定時間後再次執行,原理是: 1. 當向延遲佇列中新增一個任務時,將會等待initialDelay時間,時間到了就過期,從佇列中移除,並執行。 2. 執行完畢之後,會重新設定任務的延遲時間,然後再把任務放入延遲佇列,迴圈。 3. 如果一個任務在執行過程中丟擲了一個異常,任務結束,但不會影響其他任務的執行。 ```java // initialDelay : 提交任務後延遲多少時間開始執行任務 // delay : 當任務執行完畢後延長多少時間後再次執行任務 public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { // 引數校驗 if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // 任務轉換 period < 0 ScheduledFutureTask sft = new ScheduledFutureTask(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; // 新增任務到佇列 delayedExecute(t); return t; } ``` 注意這裡構造的ScheduledFutureTask的period<0,會導致`boolean periodic = isPeriodic();`的結果是true,因此在ScheduledFutureTask的run邏輯中,會呼叫FutureTask的runAndReset()方法。 ### ScheduledFutureTask#run() 具體執行任務的執行緒是Worker執行緒,任務執行是Worker執行緒呼叫任務的潤方法執行,這裡的任務是ScheduledFutureTask,也就是呼叫它的run方法。 ```java public void run() { // 是否只執行一次 period != 0 boolean periodic = isPeriodic(); // 取消任務 if (!canRunInCurrentRunState(periodic)) cancel(false); // 任務只執行一次, 呼叫FutureTask的run else if (!periodic) ScheduledFutureTask.super.run(); // 定時執行 else if (ScheduledFutureTask.super.runAndReset()) { // 設定下一次執行時間 setNextRunTime(); // 重新加入延遲佇列 reExecutePeriodic(outerTask); } } ``` ### FutureTask#runAndReset() 相比於FutureTask的run方法,該方法邏輯差不多,但缺少了:在任務正常執行完後設置狀態的步驟。原因在於:讓任務成為可重複執行的任務。 ```java protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 如果當前任務正常執行完畢並且任務狀態為NEW 則返回true, 否則返回false return ran && s == NEW; } ``` 如果該方法返回true,將會呼叫setNextRunTime()設定下一次的執行時間,接著呼叫reExecutePeriodic(outerTask)重新加入任務佇列。 ### void setNextRunTime() ```java // 設定下一次執行時間 private void setNextRunTime() { long p = period; if (p > 0) time += p; else // 延遲-p的時間 time = triggerTime(-p); } ``` ## scheduleAtFixedRate 針對任務型別為fixed-rate,相對起始時間點以固定頻率呼叫指定的任務。 ```java // initialDelay : 提交任務後延遲多少時間開始執行任務 // period 固定週期 public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // period >
0 ScheduledFutureTask sft = new ScheduledFutureTask(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } ``` 它和scheduleWithFixedDelay類似,區別在於: 1. period>0, 但仍然滿足period!=0的條件。 2. setNextRunTime() 走進time+=p 的分支,而不是 time=triggerTime(-p)。 最終的執行規則為:**initialDelay + n * period** 的 刻執行任務,如果當前任務執行的時間到了,不會併發執行,下一次執行的任務將會延遲執行。 ## 總結 - ScheduledThreadPoolExecutor內部使用DelayedWorkQueue存放執行的任務ScheduledFutureTask。 - ScheduledFutureTask是一個具有返回值的任務,繼承自FutureTask。根據period的值分為三類: - period == 0 ,當前任務是**一次性**的,執行完畢後就退出。 - period > 0 ,當前任務是fixed-delay任務,是**固定延遲**的定時可重複執行任務。 - period < 0 ,當前任務是fixed-rate任務,是**固定頻率**的定時可重複執行任務。 ## 參考閱讀 - 《Java併發程式設計之美》 - 《瘋狂Java講義》 - 《Java併發程式設計的