1. 程式人生 > >Java併發包原始碼學習系列:阻塞佇列實現之DelayQueue原始碼解析

Java併發包原始碼學習系列:阻塞佇列實現之DelayQueue原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112301359) - [Java併發包原始碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838) - [Java併發包原始碼學習系列:ReentrantLock可重入獨佔鎖詳解](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112454874) - [Java併發包原始碼學習系列:ReentrantReadWriteLock讀寫鎖解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112689635) - [Java併發包原始碼學習系列:詳解Condition條件佇列、signal和await](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112727669) - [Java併發包原始碼學習系列:掛起與喚醒執行緒LockSupport工具類](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112757098) - [Java併發包原始碼學習系列:JDK1.8的ConcurrentHashMap原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113059783) - [Java併發包原始碼學習系列:阻塞佇列BlockingQueue及實現原理分析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113186979) - [Java併發包原始碼學習系列:阻塞佇列實現之ArrayBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113252384) - [Java併發包原始碼學習系列:阻塞佇列實現之LinkedBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113329416) - [Java併發包原始碼學習系列:阻塞佇列實現之PriorityBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113358710) ## DelayQueue概述 DelayQueue是一個**支援延時獲取元素**的無界阻塞佇列,使用PriorityQueue來儲存元素。 隊中的元素必須實現`Delayed`介面【Delay介面又繼承了Comparable,需要實現compareTo方法】,每個元素都需要指明過期時間,通過`getDelay(unit)`獲取元素剩餘時間【剩餘時間 = 到期時間 - 當前時間】,每次向優先佇列中新增元素時根據compareTo方法作為排序規則。 當從佇列獲取元素時,只有過期的元素才會出佇列。 使用場景: 快取系統設計、定時任務排程等。 ## 類圖及重要欄位 ![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210130164050576-720957474.png) ```java public class DelayQueue extends AbstractQueue implements BlockingQueue { // 獨佔鎖實現同步 private final transient ReentrantLock lock = new ReentrantLock(); // 優先佇列存放資料 private final PriorityQueue q = new PriorityQueue(); /** * 基於Leader-Follower模式的變體,用於儘量減少不必要的執行緒等待 */ private Thread leader = null; /** * 與lock對應的條件變數 */ private final Condition available = lock.newCondition(); } ``` 1. 使用ReentrantLock獨佔鎖實現執行緒同步,使用Condition實現等待通知機制。 2. 基於Leader-Follower模式的變體,減少不必要的執行緒等待。 3. 內部使用PriorityQueue優先順序佇列儲存元素,且佇列中元素必須實現Delayed介面。 ## Delayed介面 隊中的元素必須實現`Delayed`介面【Delay介面又繼承了Comparable,需要實現compareTo方法】,每個元素都需要指明過期時間,通過`getDelay(unit)`獲取元素剩餘時間【剩餘時間 = 到期時間 - 當前時間】。 每次向優先佇列中新增元素時根據compareTo方法作為排序規則,當然我們約定一下,預設q.peek()出來的就是最先過期的元素。 ```java public interface Delayed extends Comparable { // 返回剩餘時間 long getDelay(TimeUnit unit); } public interface Comparable { // 定義比較方法 public int compareTo(T o); } ``` ## Delayed元素案例 學習了Delayed介面之後,我們看一個實際的案例,加深印象,源於:《Java併發程式設計之美》。 ```java static class DelayedElement implements Delayed { private final long delayTime; // 延遲時間 private final long expire; // 到期時間 private final String taskName; // 任務名稱 public DelayedElement (long delayTime, String taskName) { this.delayTime = delayTime; this.taskName = taskName; expire = now() + delayTime; } final long now () { return System.currentTimeMillis(); } // 剩餘時間 = 到期時間 - 當前時間 @Override public long getDelay (TimeUnit unit) { return unit.convert(expire - now(), TimeUnit.MILLISECONDS); } @Override public int compareTo (Delayed o) { return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString () { final StringBuilder res = new StringBuilder("DelayedElement [ "); res.append("delay = ").append(delayTime); res.append(", expire = ").append(expire); res.append(", taskName = '").append(taskName).append('\''); res.append(" ] "); return res.toString(); } } public static void main (String[] args) { // 建立delayQueue佇列 DelayQueue delayQueue = new DelayQueue<>(); // 建立延遲任務 Random random = new Random(); for (int i = 0; i < 10; i++) { DelayedElement element = new DelayedElement(random.nextInt(500), "task: " + i); delayQueue.offer(element); } // 依次取出任務並列印 DelayedElement ele = null; try { for (; ; ) { while ((ele = delayQueue.take()) != null) { System.out.println(ele); } } } catch (InterruptedException ex) { ex.printStackTrace(); } } // 列印結果 DelayedElement [ delay = 2, expire = 1611995426061, taskName = 'task: 4' ] DelayedElement [ delay = 52, expire = 1611995426111, taskName = 'task: 2' ] DelayedElement [ delay = 80, expire = 1611995426139, taskName = 'task: 5' ] DelayedElement [ delay = 132, expire = 1611995426191, taskName = 'task: 0' ] DelayedElement [ delay = 174, expire = 1611995426233, taskName = 'task: 9' ] DelayedElement [ delay = 175, expire = 1611995426234, taskName = 'task: 7' ] DelayedElement [ delay = 326, expire = 1611995426385, taskName = 'task: 3' ] DelayedElement [ delay = 447, expire = 1611995426506, taskName = 'task: 8' ] DelayedElement [ delay = 452, expire = 1611995426511, taskName = 'task: 1' ] DelayedElement [ delay = 486, expire = 1611995426545, taskName = 'task: 6' ] ``` - 實現了compareTo方法,定義比較規則為越早過期的排在隊頭。 - 實現了getDelay方法,計算公式為:剩餘時間 = 到期時間 - 當前時間。 ## 構造器 DelayQueue構造器相比於前幾個,就顯得非常easy了。 ```java public DelayQueue() {} public DelayQueue(Collection c) { this.addAll(c); } ``` ## void put(E e) 因為DelayQueue是無界佇列,不會因為邊界問題產生阻塞,因此put操作和offer操作是一樣的。 ```java public void put(E e) { offer(e); } public boolean offer(E e) { // 獲取獨佔鎖 final ReentrantLock lock = this.lock; lock.lock(); try { // 加入優先佇列裡 q.offer(e); // 判斷堆頂元素是不是剛剛插入的元素 // 如果判斷為true,說明當前這個元素是將最先過期 if (q.peek() == e) { // 重置leader執行緒為null leader = null; // 啟用available變數條件佇列中的一個執行緒 available.signal(); } return true; } finally { lock.unlock(); } } ``` ## E take() take方法將會**獲取並移除佇列裡面延遲時間過期的元素** ,如果佇列裡面沒有過期元素則陷入等待。 ```java public E take() throws InterruptedException { // 獲取獨佔鎖 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 瞅一瞅誰最快過期 E first = q.peek(); // 佇列為空,則將當前執行緒置入available的條件佇列中,直到裡面有元素 if (first == null) available.await(); else { // 看下還有多久過期 long delay = first.getDelay(NANOSECONDS); // 哇,已經過期了,就移除它並返回 if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // leader不為null表示其他執行緒也在執行take // 則將當前執行緒置入available的條件佇列中 if (leader != null) available.await(); else { // 如果leader為null,則選擇當前執行緒作為leader執行緒 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待delay時間,時間到之後,會出條件佇列,繼續競爭鎖 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } ``` ### first = null 有什麼用 如果不設定`first = null`,將會引起記憶體洩露。 >
- 執行緒A到達,隊首元素沒有到期,設定leader = 執行緒A,並且執行`available.awaitNanos(delay);`等待元素過期。 > - 這時執行緒B來了,因為leader != null,則會`available.await();`阻塞,執行緒C、D、E同理。 > - 執行緒A阻塞完畢了,再次迴圈,獲取列首元素成功,出列。 > > 這個時候列首元素應該會被回收掉,但是問題是它還被執行緒B、執行緒C持有著,所以不會回收,如果執行緒增多,且隊首元素無限期的不能回收,就會造成記憶體洩漏。 ## 總結 DelayQueue是一個**支援延時獲取元素**的**無界阻塞**佇列,使用PriorityQueue來儲存元素。 隊中的元素必須實現`Delayed`介面【Delay介面又繼承了Comparable,需要實現compareTo方法】,每個元素都需要指明過期時間,通過`getDelay(unit)`獲取元素剩餘時間【剩餘時間 = 到期時間 - 當前時間】,每次向優先佇列中新增元素時根據compareTo方法作為排序規則。 基於Leader-Follower模式使用leader變數,減少不必要的執行緒等待。 DelayQueue是無界佇列,因此插入操作是非阻塞的。但是take操作從佇列獲取元素時,是阻塞的,阻塞規則為: - 當一個執行緒呼叫佇列的take方法,如果佇列為空,則將會呼叫` available.await()`陷入阻塞。 - 如果佇列不為空,則檢視佇列的隊首元素是否過期,根據getDelay的返回值是否小於0判斷,如果過期則返回該元素。 - 如果隊首元素未過期,則**判斷當前執行緒是否為leader執行緒**,如果不是,表明有其他執行緒在執行take操作,就呼叫`available.await()`陷入阻塞。 - 如果沒有其他執行緒在執行take,就將當前執行緒設定為leader,並等待隊首元素過期,`available.awaitNanos(delay)`。 - leader執行緒退出take之後,將會呼叫`available.signal()`喚醒一個follower執行緒,接著回到開始那步。 ## 參考閱讀 - 《Java併發程式設計的藝術》 - 《Java併發程式設計之美》 - [【死磕Java併發】—–J.U.C之阻塞佇列:DelayQueue](http://cmsblogs.com/