解析執行緒池排程器之任務延遲排程實現機制
排程執行緒執行器 ScheduledThreadPoolExecutor 是執行緒執行器 ThreadPoolExecutor 的擴充套件,在 ThreadPoolExecutor 基礎之上添加了在一定時間間隔之後排程任務的核心功能,也包括之後的按既定時間間隔去排程任務的功能。同時 ScheduledThreadPoolExecutor 是基於執行緒池的,因此它和 Timer 相比更容易擴充套件。作者認為任務延遲排程是 STPE 最核心的部分,因此這篇文章主要是通過分析原始碼來理解 ScheduledThreadPoolExecutor 的任務延遲排程實現機制。
構造方法
在探究 ScheduledThreadPoolExecutor 實現機制之前,我們必須要學會使用它,那麼我們首先需要搞清楚兩個問題:第一、如何獲取 ScheduledThreadPoolExecutor 例項?第二、ScheduledThreadPoolExecutor 為我們提供了哪些介面以及他們的功能是什麼?ScheduledThreadPoolExecutor 這個名字有點長,為了簡單起見,後文我用(STPE)縮寫來代表它。
為解答第一個問題我們先來看看 STPE 提供了哪些構造方法:

上圖中我們可以看出 STPE 為我們提供了四個構造方法,用過 ThreadPoolExecutor(簡稱PTE)的同學都知道我們可以通過構造方法的引數來配置我們所需要執行緒池的多個引數,包括核心工作執行緒數、最大工作執行緒數、執行緒空閒回收時間、執行緒工廠、等待佇列及佇列大小等,但是從 STPE 的構造方法看來我們能配置的似乎並不多,這是因為什麼呢?我們可以結合 PTE 的構造方法分析一下,除去核心執行緒數大小、執行緒工廠之外,STPE 和 TPE 構造方法的區別有執行緒數相關(包括核心執行緒數、最大工作執行緒數、執行緒空閒回收)和等待佇列。執行緒數相關似乎和實現任務延遲排程沒有什麼關係,也的確關係不大,STPE 的例項只是控制了工作執行緒數為固定大小。那麼問題就在於等待隊列了。的確,STPE 的四個構造方法都預設使用了一個特殊的佇列作為等待佇列,叫做 DelayedWorkQueue,我們可以叫它延遲工作佇列。
DelayedWorkQueue
DelayedWorkQueue(簡稱DWQ)和其他的佇列有什麼不同呢?DWQ 和實現延遲任務有什麼關係呢?這是接下來要搞清楚的問題。
我們先來看看DWQ類的繼承體系:

從上圖看起來都是很常見的介面似乎沒有什麼特別之處,儘管這樣我們還是簡單過一下各個介面及父類給 DWQ 賦予的功能吧。
-
Iterable 提供了可以迭代的介面,也就是佇列中的元素可以通過迭代的方式獲取到。
-
Collection 提供了集合框架的基本介面,JAVA 集合我相信大家都很熟悉。
-
Queue 提供了佇列相關的介面,包括獲取隊頭元素、往隊尾增加元素。
-
AbstractCollection 提供了 Iterable 以及 Collection 的一些預設實現。
-
BlockingQueue 提供了阻塞佇列的相關介面,即表示當執行緒想從佇列中獲取元素而佇列中沒有元素的時候,執行緒需要被掛起。
剩下的就是 DWQ 的實現了,既然它的主要儲存模型是佇列,那麼我們就看看這個佇列的實現吧。
DWQ 的實現佇列是由 RunnableScheduleFuture 陣列實現的,我們看看存取相關的介面,發現往佇列之中新增元素的實現主要是 offer 介面,看下關鍵實現:
try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } }
上面程式碼片段可以看出:如果當前元素數量到達了佇列的長度,那麼久擴充容量;如果當前佇列沒有元素,那麼直接放在隊頭;否則呼叫 siftUp 方法。其實再接著往下看 siftUp 方法的實現,大概大家就明白了其實 DWQ 佇列的實現是基於堆實現的:
/** * Sift element added at bottom up to its heap-ordered spot. * Call only when holding lock. */ private void siftUp(int k, RunnableScheduledFuture key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
而這個堆的順序,也即大根堆還是小根堆是基於佇列中元素實現的 comparable 介面來決定的,但是上面這段程式碼我們可以確定的是當堆尾元素 compareTo 他的父親節點 >=0 的時候位置是不用改變的。我們也可以確定一件事情,即當這個堆是小根堆的時候我們從基於小根堆的佇列頭部獲取的元素總是最小的那個,而當這個根是大根堆的時候我們從基於大根堆的佇列頭部獲取的元素總是最大的那個。
RunableScheduleFuture
下面我們再看看佇列元素的實現,上文中我們可以知道,DWQ 這個佇列只能儲存 RunableScheduleFuture(簡稱RSF)這個型別的元素,我們看看 RSF 的繼承體系:

從上面這張圖我們可以看出什麼呢?來一起簡單的過一下:
-
Comparable 表示 RSF 是可以用來比較的。
-
Delayed 表示 RSF 是有時間狀態的,因此可以用來獲取 RSF 物件的在與某個時間比較之後的剩餘時間。
-
Future 表示 RSF 是能夠獲取非同步計算結果的物件。
-
Runnable 表示 RSF 是個執行緒物件。
-
RunnableFuture 表示 RSF 是個可以用來執行的並且能夠獲取非同步計算結果的物件。
-
ScheduledFuture 表示 RSF 是個可以在指定時間延遲後獲取非同步計算結果的物件。
-
RunnableScheduledFuture 表示 RSF 是個可以被執行、且可以在指定時間延遲後獲取非同步計算內容的執行緒物件。
最後一步我們通過 RSF 的整合體系總結了 RSF 物件的功能。為了銜接上一部分,我們還需要看看下面的實現:
public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }
這裡的實現決定了堆的儲存順序,上面可以看出 RSF 物件的比較其實是按照 time 的自然順序,time 是什麼呢?time 是提交任務的時候設定的延遲時間,也就是我們的任務會在 time 時間之後被排程,那麼我們可以得出結論:上文中的佇列中,任務的順序是按照剩餘時間基於小根堆排列的,每次排程任務的時候從佇列的頭部取到的執行緒物件肯定是最近需要排程的執行緒物件。
佇列內容的獲取-執行緒的延遲排程
上文中我們知道了 DWQ 這個佇列本質,那麼我們再來看看執行緒排程的必經之路,排程器是如何從佇列中獲取內容的。相關介面包括 poll 和 take,poll 和 take 分別是對 Queue 和 BlockingQueue 的實現,他們的主要區別是 poll 在沒有獲取到內容的時候會直接返回 null,而 take 在沒有獲取到內容的時候會阻塞。然而、無論是 poll 還是 take,我們都能夠從實現中得到一條重要的線索,所有能夠獲取到的物件必須要滿足一個條件:RSF 物件的 getDelay 物件必須要 <=0,也就是執行緒物件的時間到了,可以被執行了,那麼我們才可以獲取執行緒物件並進行排程。
文章到這個時候,我們至少應該非常清楚下面兩點:
-
任務的延遲排程是如何實現的。
-
任務的排程順序和任務提交的先後並沒有關係。
啊,我好像忘記了介紹 STPE 提供的主要介面和功能,雖然不瞭解他們並不影響我們隊延遲排程實現機制的理解,為了保證文章的完整性,我們再一起看看吧。
STPE 的主要功能
我們先看看 STPE 提供哪些主要介面:

下面四個不需要糾結,他們並不是用來作為提交延遲任務的,而是立即排程。前兩個呢,第一個和第二個的都是在指定時間之後排程任務,不同的是 Runnable 和 Callable 的區別。Callable 允許使用者獲取執行緒的返回值,而 Runnable 沒有。
最重要的就是第三個和第四個了,他們的實現也是很有意思的,看下原始碼:

兩個方法的實現不同點僅僅在於上圖的紅框處,博主找了半天,o(╯□╰)o。簡單介紹就是上面那個是按照固定時間去排程的,比如設定的初始時間是 2,排程時間間隔是 5,那麼低二次排程時間是 2+5,第三次是 2+5*2,依次類推。下面的方法是按照固定的延遲時間去排程,比如初始時間是 2,任務執行長度是 1,那麼第二次排程時間是 2+1+5。