DelayQueue延遲佇列原理剖析
介紹
DelayQueue佇列是一個延遲佇列,DelayQueue中存放的元素必須實現Delayed介面的元素,實現介面後相當於是每個元素都有個過期時間,當佇列進行take獲取元素時,先要判斷元素有沒有過期,只有過期的元素才能出隊操作,沒有過期的佇列需要等待剩餘過期時間才能進行出隊操作。
原始碼分析
DelayQueue佇列內部使用了PriorityQueue優先佇列來進行存放資料,它採用的是二叉堆進行的優先佇列,使用ReentrantLock鎖來控制執行緒同步,由於內部元素是採用的PriorityQueue來進行存放資料,所以Delayed介面實現了Comparable介面,用於比較來控制優先順序,如下程式碼所示:
1public interface Delayed extends Comparable<Delayed> {
2
3 /**
4 * Returns the remaining delay associated with this object, in the
5 * given time unit.
6 *
7 * @param unit the time unit
8 * @return the remaining delay; zero or negative values indicate
9 * that the delay has already elapsed
10 */
11 long getDelay(TimeUnit unit);
12}
DelayQueue的成員變數如下所示:
1// 鎖。
2private final transient ReentrantLock lock = new ReentrantLock();
3// 優先佇列。
4private final PriorityQueue<E> q = new PriorityQueue<E>();
5
6/**
7 * Leader-Follower的變種。
8 * Thread designated to wait for the element at the head of
9 * the queue. This variant of the Leader-Follower pattern
10 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
11 * minimize unnecessary timed waiting. When a thread becomes
12 * the leader, it waits only for the next delay to elapse, but
13 * other threads await indefinitely. The leader thread must
14 * signal some other thread before returning from take() or
15 * poll(...), unless some other thread becomes leader in the
16 * interim. Whenever the head of the queue is replaced with
17 * an element with an earlier expiration time, the leader
18 * field is invalidated by being reset to null, and some
19 * waiting thread, but not necessarily the current leader, is
20 * signalled. So waiting threads must be prepared to acquire
21 * and lose leadership while waiting.
22 */
23private Thread leader = null;
24
25/**
26 * Condition signalled when a newer element becomes available
27 * at the head of the queue or a new thread may need to
28 * become leader.
29 */
30// 條件,代表如果有資料則通知Follower執行緒,喚醒執行緒處理佇列內容。
31private final Condition available = lock.newCondition();
Leader-Follower模式的變種,用於最小化不必要的定時等待,當一個執行緒被選擇為Leader時,它會等待延遲過去執行程式碼邏輯,而其他執行緒則需要無限期等待,在從take或poll返回之前,每當佇列的頭部被替換為具有更早到期時間的元素時,leader欄位將通過重置為空而無效,Leader執行緒必須向其中一個Follower執行緒發出訊號,被喚醒的 follwer 執行緒被設定為新的Leader 執行緒。
offer操作
1public boolean offer(E e) {
2 // 獲取到鎖
3 final ReentrantLock lock = this.lock;
4 lock.lock();
5 try {
6 // 將元素儲存到PriorityQueue優先佇列中
7 q.offer(e);
8 // 如果第一個元素是當前元素,說明之前佇列中為空,則先將Leader設定為空,通知等待執行緒可以爭搶Leader了。
9 if (q.peek() == e) {
10 leader = null;
11 available.signal();
12 }
13 // 返回成功
14 return true;
15 } finally {
16 lock.unlock();
17 }
18}
offer操作前先進行獲取鎖的操作,也就是同一時間內只能有一個執行緒可以入隊操作。
- 獲取到ReentrantLock鎖物件。
- 將元素新增到PriorityQueue優先佇列中
- 如果佇列中最早過期的元素是自己,則說明佇列原先是空的,所以將Leader進行重置,通知Follower執行緒可以成為Leader執行緒。
- 最後進行解鎖操作。
put操作
put操作其實就是呼叫的offer操作來進行新增資料的,以下是原始碼資訊:
1public void put(E e) {
2 offer(e);
3}
take操作
1public E take() throws InterruptedException {
2 final ReentrantLock lock = this.lock;
3 // 獲取可中斷的鎖。
4 lock.lockInterruptibly();
5 try {
6 // 迴圈獲取資料。
7 for (;;) {
8 // 獲取最早過期的元素,但是不彈出物件。
9 E first = q.peek();
10 // 如果最早過期的元素為空,說明佇列為空,則執行緒直接進入無限期等待,並且讓出鎖。
11 if (first == null)
12 // 當前執行緒無限期等待,直到被喚醒,並且讓出鎖物件。
13 available.await();
14 else {
15 // 獲取最早過期的元素剩餘過期時間。
16 long delay = first.getDelay(NANOSECONDS);
17 // 如果剩餘過期時間小於0,則說明已經過期,反之還沒有過期。
18 if (delay <= 0)
19 // 如果已經過期直接獲取最早過期的元素,並返回。
20 return q.poll();
21 // 如果剩餘過期日期大於0,則會進入到這裡。
22 // 將剛才獲取的最早過期的元素設定為空。
23 first = null; // don't retain ref while waiting
24 // 如果有執行緒爭搶的Leader執行緒,則進行無限期等待。
25 if (leader != null)
26 // 無限期等待並讓出鎖。
27 available.await();
28 else {
29 // 獲取當前執行緒。
30 Thread thisThread = Thread.currentThread();
31 // 設定當前執行緒變為Leader執行緒。
32 leader = thisThread;
33 try {
34 // 等待剩餘等待時間。
35 available.awaitNanos(delay);
36 } finally {
37 // 將Leader設定為null。
38 if (leader == thisThread)
39 leader = null;
40 }
41 }
42 }
43 }
44 } finally {
45 // 如果佇列不為空,並且沒有Leader則通知等待執行緒可以成為Leader。
46 if (leader == null && q.peek() != null)
47 // 通知等待執行緒。
48 available.signal();
49 lock.unlock();
50 }
51}
- 當獲取元素時,先獲取到鎖物件。
- 獲取最早過期的元素,但是並不從佇列中彈出元素。
- 最早過期元素是否為空,如果為空則直接讓當前執行緒無限期等待狀態,並且讓出當前鎖物件。
- 如果最早過期的元素不為空
- 獲取最早過期元素的剩餘過期時間,如果已經過期則直接返回當前元素
- 如果沒有過期,也就是說剩餘時間還存在,則先獲取Leader物件,如果Leader已經有執行緒在處理,則當前執行緒進行無限期等待,如果Leader為空,則首先將Leader設定為當前執行緒,並且讓當前執行緒等待剩餘時間。
- 最後將Leader執行緒設定為空
- 如果Leader已經為空,並且佇列有內容則喚醒一個等待的佇列。
poll操作
獲取最早過期的元素,如果佇列頭沒有過期的元素則直接返回null,反之返回過期的元素。
1public E poll() {
2 final ReentrantLock lock = this.lock;
3 lock.lock();
4 try {
5 E first = q.peek();
6 // 如果佇列為空或者佇列最早過期的元素沒有過期,則返回null。
7 if (first == null || first.getDelay(NANOSECONDS) > 0)
8 return null;
9 else
10 // 出佇列操作。
11 return q.poll();
12 } finally {
13 lock.unlock();
14 }
15}
小結
- DelayQueue是一個無界的併發延遲阻塞佇列,佇列中的元素必須實現Delayed介面,相應了需要實現Comparable介面實現比較的方法
- Leader-Follower模式的變種,用於最小化不必要的定時等待,當一個執行緒被選擇為Leader時,它會等待延遲過去執行程式碼邏輯,而其他執行緒則需要無限期等待,在從take或poll返回之前,每當佇列的頭部被替換為具有更早到期時間的元素時,leader欄位將通過重置為空而無效,Leader執行緒必須向其中一個Follower執行緒發出訊號,被喚醒的 follwer 執行緒被設定為新的Leader 執行緒。
喜歡的同學點贊關注下微信公眾號,推送優質文章。