1. 程式人生 > >Java併發包原始碼學習系列:阻塞佇列實現之PriorityBlockingQueue原始碼解析

Java併發包原始碼學習系列:阻塞佇列實現之PriorityBlockingQueue原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112301359) - [Java併發包原始碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838) - [Java併發包原始碼學習系列:ReentrantLock可重入獨佔鎖詳解](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112454874) - [Java併發包原始碼學習系列:ReentrantReadWriteLock讀寫鎖解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112689635) - [Java併發包原始碼學習系列:詳解Condition條件佇列、signal和await](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112727669) - [Java併發包原始碼學習系列:掛起與喚醒執行緒LockSupport工具類](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112757098) - [Java併發包原始碼學習系列:JDK1.8的ConcurrentHashMap原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113059783) - [Java併發包原始碼學習系列:阻塞佇列BlockingQueue及實現原理分析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113186979) - [Java併發包原始碼學習系列:阻塞佇列實現之ArrayBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113252384) - [Java併發包原始碼學習系列:阻塞佇列實現之LinkedBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113329416) ## PriorityBlockingQueue概述 PriorityBlockingQueue是一個**支援優先順序的無界阻塞佇列**,基於陣列的二叉堆,其實就是執行緒安全的`PriorityQueue`。 指定排序規則有兩種方式: 1. 傳入PriorityBlockingQueue中的元素實現Comparable介面,自定義`compareTo`方法。 2. 初始化PriorityBlockingQueue時,指定構造引數`Comparator`,自定義`compare`方法來對元素進行排序。 需要注意的是如果兩個物件的優先順序相同,此佇列並不保證它們之間的順序。 PriorityBlocking可以傳入一個初始容量,其實也就是底層陣列的最小容量,之後會使用tryGrow擴容。 ## 類圖結構及重要欄位 ![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210128210859901-1981064844.png) ```java public class PriorityBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { private static final long serialVersionUID = 5595510919245408276L; /** * 預設的容量為 11 */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * 陣列的最大容量 */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * 平衡二叉堆 實現 優先順序佇列, 底層用陣列結構儲存二叉堆 * 假設一個n為陣列中的索引,陣列是從索引0開始儲存元素的,因此 * queue[n]的左兒子存在queue[2*n+1]位置,右兒子存在queue[2*(n+1)]位置 * * 根據比較器排序,如果沒有指定比較器,則按照元素自然順序排序。 * 預設是小根堆,第一個元素是堆中最小元素 * */ private transient Object[] queue; /** * 優先順序佇列中元素個數 */ private transient int size; /** * 比較器,如果按照自然序排序,那麼此屬性可設定為 null */ private transient Comparator comparator; /** * 所有需要保證執行緒安全的操作都要先獲取這把鎖 */ private final ReentrantLock lock; /** * 佇列空的時候,條件佇列存放阻塞執行緒,為什麼沒有佇列滿呢?原因在於它是無界佇列 */ private final Condition notEmpty; /** * 用於CAS操作,後面會看到,這個欄位用於擴容時 */ private transient volatile int allocationSpinLock; /** * 只用於序列化和反序列化 */ private PriorityQueue q; } ``` ## 什麼是二叉堆 > 這邊安利一個數據結構的視覺化網站:[資料結構視覺化網站](https://www.cs.usfca.edu/~galles/visualization/Heap.html) 二叉堆是完全二叉樹,除了最後一層,其他節點都是滿的,且最後一層節點從左到右排列,如下: ![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210128210921468-347972586.png) 二叉堆分為大根堆和小根堆,一般來說都是**小根堆**,**任意一個節點都小於它的左右子節點的值,根節點就是堆中的最小的值**。 堆可以使用陣列儲存,陣列的下標可以從0開始,也可以從1開始,各有好處,當然JDK中堆的實現是從0開始的哦。 - 如果從索引為1的位置開始儲存元素,第k個節點的左右子節點的下標:(2k, 2k + 1),父節點的座標可以很容易求:`floor(k / 2)`,floor表示下取整。 - 如果從0開始,第k個節點的左右子節點的下標:(2k + 1, 2k + 2),父節點的座標也可以很容易求:`floor((k - 1) / 2)`,floor表示下取整。 > 我之前手寫堆的時候,都是使用的第一種方式,我就提一嘴第一種的思路,使用第一種思路介紹一下小根堆的幾個基本操作,之後我們會詳細分析JDK中的實現,也就是第二種。 ## 堆的基本操作 堆中最重要核心的兩個操作便是如何將元素**向上調整**or**向下調整**。 ### 向上調整void up(int u) 以插入操作為例,二話不說,直接在陣列末尾插上元素,接著再一一向上層比較,比較的原則的就是:我們只需要**比較當前這個數是不是比它的父節點小**,如果比它小,就進行交換,否則則停止交換。 思路非常簡單,你可以思考一下其合理性:我們想,如果我們每次插入資料的時候,都做一次向上調整的操作,我們一定能夠保證,每次都是在一個符合條件的二叉堆上插入數,對吧。那這樣的話,本身就滿足任何一個父節點必定比其子節點小的條件,如果待調整節點更小,那他必然也小於另一個子節點,由於我們一直迭代做,最後一定會滿足條件。 ```java // 向上調整 u 是當前的索引 private void up (int u) { // 如果發現當前的節點比父節點小 while (u / 2 > 0 && h[u / 2] > h[u]) { // 就和父節點交換一下 heap_swap(u / 2, u); u /= 2; } } ``` 這邊也給出插入一個元素x的虛擬碼: ```java void insert(int x){ size ++; // 最後一個元素指標 heap[size] = x; // 賦值 up(size); // 向上調整 } ``` ![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210128210932561-1945230083.png) ### 向下調整void down(int u) 為什麼需要向下調整呢,以刪除操作為例,我們知道,要在陣列頭部刪除一個元素且保證後面元素的順序是比較麻煩的,我們通常在遇到刪除堆頂的時候,直接將陣列的最後一個元素heap[size--]將heap[0]覆蓋,接著執行down(0),自上而下地執行調整操作。 調整的規則也比較簡單,其實就是**判斷當前元素和左右孩子的大小關係,和最小的那個交換**,遞迴地去調整,直到無法交換為止。 ```java // 向下調整 private void down (int u) { int t = u; if (u * 2 <= size && h[u * 2] < h[t]) t = u * 2; // 判斷左兒子是否存在, 且如果左兒子比它小,就更新座標 if (u * 2 + 1 <= size && h[u * 2 + 1] < h[t]) t = u * 2 + 1; // 同理 if (u != t) { // 如果需要交換 heap_swap(u, t);// 交換一下 down(t); // 繼續做這個操作 } } ``` 這邊給出刪除小根堆中的最小值的虛擬碼: ```java int poll(){ int res = heap[1]; // 堆頂是最小值 heap[1] = heap[size--]; // 直接將最後一個元素覆蓋堆頂,並size-1 down(1); // 執行向下調整 return res; } ``` ![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210128210944805-261168249.png) 我們希望刪除第k個元素或者更新第k個元素都是比較簡便的: ```java // 刪除位置為k的元素 void removeAt(int k){ heap[k] = heap[size --]; // 分別做一次向下操作和向上操作,其中一個判斷必定只會執行一次 down(k); up(k); } // 更新位置為k的元素為x void updateAt(int k, int x){ heap[k] = x; down(k); up(k); } ``` 到這裡,我就用簡略程式碼簡單地介紹了二叉堆的核心操作,我們待會會看到其實原始碼的思想不變,但是考慮的東西會更多一些,如果到這裡你能夠完全明白,原始碼的實現其實也就不難啦。 ## 構造器 ```java // 使用預設的容量11 public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } // 指定容量大小 public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } // 指定容量和比較器 public PriorityBlockingQueue(int initialCapacity, Comparator comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } // 傳入集合 public PriorityBlockingQueue(Collection c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet) { SortedSet ss = (SortedSet) c; this.comparator = (Comparator) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue) { PriorityBlockingQueue pq = (PriorityBlockingQueue) c; this.comparator = (Comparator) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; // 需要堆化,後面說明該方法 if (heapify) heapify(); } ``` >
接下來我將會把一些核心元件方法都拎出來分析一下,他們很有可能會在後面的操作方法中被頻繁呼叫,所以接下來很重要哦。 ## 擴容方法tryGrow 我們說了,PriorityBlockingQueue是無界的佇列,傳入的capacity也不是最終的容量,它和我們之前學習的許多集合一樣,有**動態擴容**的機制,我們先來瞅一瞅: ```java private void tryGrow(Object[] array, int oldCap) { // 釋放鎖的操作 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; // CAS 操作將allocationSpinLock變為1, 如果已經是1了,就跳到下面 if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 節點個數<64 new = old + old + 2 // 節點個數>
=64 new = old + old / 2 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // 希望節點數較小的時候,增長快一點 (oldCap >> 1)); // 擴容之後越界了 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } //queue != array 的情況 其他執行緒已經為queue分配了其他的空間 if (newCap > oldCap && queue == array) // 分配一個加大容量的陣列 newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } // 可能是其他執行緒在進行擴容操作 if (newArray == null) // back off if another thread is allocating Thread.yield(); // 重新獲取鎖 lock.lock(); // 複製元素 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } } ``` 可以發現的是,在動態擴容之前,將lock釋放,表明這個方法一定是在獲取鎖之後才被呼叫的。 > 為啥在擴容之前先釋放鎖,並使用CAS控制只有一個執行緒可以擴容成功呢? > > 擴容是需要時間的,如果在整個擴容期間一直持有鎖的話,其他執行緒在這時是不能進行出隊和入隊操作的,這大大降低了併發效能。 > > spinlock鎖使用CAS控制只有一個執行緒可以進行擴容,失敗的執行緒執行`Thread.yield()`讓出CPU,目的是讓擴容的執行緒優先呼叫lock.lock()優先獲取鎖,但是這得不到保證,因此需要後面的判斷。 > > 另外自旋鎖變數allocationSpinLock在擴容結束後重置為0,並沒有使用UNSAFE方法的CAS進行設定是因為: > > 1. 同時只可能有一個執行緒獲取到該鎖。 > 2. allocationSpinLock是volatile修飾。 ## 原始碼中向上調整和向下調整實現 準確地說,原始碼中應該是`調整 + 插入`,不斷調整,找到插入的位置,給該位置賦值。但,如果你理解了前面的調整思想,相信你會很快理解原始碼中的實現。 ### siftUpComparable 將x插入到堆中,注意這裡是不斷和父節點比較,最終找到插入位置。 ```java // 將x插入到堆中,注意這裡是不斷和父節點比較,最終找到插入位置 private static void siftUpComparable(int k, T x, Object[] array) { // 如果不傳入Comparable的實現,這裡會強轉失敗,丟擲異常 Comparable key = (Comparable) x; while (k > 0) { //a[k]的父節點位置 int parent = (k - 1) >>> 1; Object e = array[parent]; // 如果比父節點大就不用交換了 if (key.compareTo((T) e) >= 0) break; // 將父元素移下來 array[k] = e; // k向上移 k = parent; } // 退出迴圈後,k的位置就是待插入的位置 array[k] = key; } ``` ### siftDownComparable 移除k位置的元素,並調整二叉堆,具體思想就是,一般通過向下調整找到覆蓋位置,用x覆蓋即可,x一般可以從隊尾獲取。 ```java // 這裡的k就是當前空缺的位置,x就是覆蓋元素比如我們之前說的隊尾元素 private static void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable key = (Comparable)x; // 二叉堆有一個性質,最後一層葉子最多 佔 1 / 2 int half = n >>> 1; // loop while a non-leaf // 迴圈非葉子節點 while (k < half) { // 左孩子 int child = (k << 1) + 1; // assume left child is least Object c = array[child]; // 右孩子 int right = child + 1; // 始終用左孩子c表示最小的數 if (right < n && ((Comparable) c).compareTo((T) array[right]) > 0) // 這裡如果右孩子小,更新child = right c = array[child = right]; // 如果當前的k比左孩子還要小,那就不必交換了,待在那正好! if (key.compareTo((T) c) <= 0) break; // 小的數向上移,k向下更新 array[k] = c; k = child; } // 退出迴圈時,一定找到了x覆蓋的位置,覆蓋即可 array[k] = key; } } ``` 你看看,理解了調整的思想之後,看起程式碼來是不是就相對輕鬆很多啦? ## heapify建堆or堆化 heapify方法可以使節點任意放置的二叉樹,在O(N)的時間複雜度內轉變為二叉堆,具體做法是,**從最後一層非葉子節點自底向上執行down操作**。 ![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210128210959840-458419369.png) ```java private void heapify() { Object[] array = queue; int n = size; int half = (n >
>> 1) - 1; // 最後一層非葉子層 // 兩種排序規則下, 自底向上 地執行 siftdown操作 Comparator cmp = comparator; if (cmp == null) { for (int i = half; i >= 0; i--) siftDownComparable(i, (E) array[i], array, n); } else { for (int i = half; i >= 0; i--) siftDownUsingComparator(i, (E) array[i], array, n, cmp); } } ``` ## put非阻塞式插入 put方法是非阻塞的,但是操作時需要獲取獨佔鎖,如果插入元素後超過了當前的容量,會呼叫`tryGrow`進行動態擴容,接著從插入元素位置進行向上調整,插入成功後,喚醒正在阻塞的讀執行緒。 ```java public void put(E e) { offer(e); // 無界佇列,插入操作不需要阻塞哦 } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 當前佇列中的元素個數 >= 陣列的容量 while ((n = size) >= (cap = (array = queue).length)) // 動態擴容 tryGrow(array, cap); try { Comparator cmp = comparator; // 下面這個if else根據是否傳入比較器選擇對應的方法,大差不差 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; // 喚醒正在阻塞的讀執行緒 notEmpty.signal(); } finally { lock.unlock(); } return true; } ``` ## take阻塞式獲取 take方法是阻塞式的,如果佇列為空,則當前執行緒阻塞在notEmpty維護的條件佇列中。 ```java public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 獲取鎖 lock.lockInterruptibly(); E result; try { // 出隊 while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } // 出隊邏輯 private E dequeue() { int n = size - 1; if (n < 0) return null; else { Object[] array = queue; // 儲存隊頭的值,也就是返回這個值 E result = (E) array[0]; // 準備將隊尾的值 覆蓋第一個 E x = (E) array[n]; array[n] = null; Comparator cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } } ``` ## remove移除指定元素 ```java public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { // 找到匹配元素下標 int i = indexOf(o); if (i == -1) return false; // 移除該下標的元素 removeAt(i); return true; } finally { lock.unlock(); } } // 遍歷底層陣列, 找到匹配元素的下標 private int indexOf(Object o) { if (o != null) { Object[] array = queue; int n = size; for (int i = 0; i < n; i++) if (o.equals(array[i])) return i; } return -1; } // 移除下標為i的元素 private void removeAt(int i) { Object[] array = queue; int n = size - 1; if (n == i) // removed last element array[i] = null; else { // 老套路了,讓隊尾的元素覆蓋這裡 E moved = (E) array[n]; array[n] = null; Comparator cmp = comparator; // 向下調整 if (cmp == null) siftDownComparable(i, moved, array, n); else siftDownUsingComparator(i, moved, array, n, cmp); // 向下調整沒成功,向上調整 if (array[i] == moved) { if (cmp == null) siftUpComparable(i, moved, array); else siftUpUsingComparator(i, moved, array, cmp); } // 這也是慣用做法,上下分別做一次調整 } size = n; } ``` ## 總結 PriorityBlockingQueue是一個**支援優先順序的無界阻塞佇列**,基於陣列的二叉堆,其實就是執行緒安全的`PriorityQueue`。 內部使用一個獨佔鎖來同時控制只有一個執行緒執行入隊和出隊操作,只是用notEmpty條件變數來控制讀執行緒的阻塞,因為無界佇列中入隊操作是不會阻塞的。 指定排序規則有兩種方式: 1. 傳入PriorityBlockingQueue中的元素實現Comparable介面,自定義`compareTo`方法。 2. 初始化PriorityBlockingQueue時,指定構造引數`Comparator`,自定義`compare`方法來對元素進行排序。 底層陣列是可動態擴容的:先釋放鎖,保證擴容操作和讀操作可以同時進行,提高吞吐量,接著通過CAS自旋保證擴容操作的併發安全,如果原容量為old_c,擴容後容量為new_c,滿足: ```java if (old_c < 64) new_c = 2 * old_c + 2 else new_c = 1.5 * old_c ``` heapify方法可以使節點任意放置的二叉樹,在O(N)的時間複雜度內轉變為二叉堆,具體做法是,**從最後一層非葉子節點自底向上執行down操作**。 ## 參考閱讀 - [javadoop : 解讀 java 併發佇列 BlockingQueue](https://javadoop.com/post/java-concurrent-queue#toc_4) - [百度百科: 二叉堆](https://baike.baidu.com/item/%E4%BA%8C%E5%8F%89%E5%A0%86/10978086?fr=aladdin) - 《Java併發程式設計的藝術》 - 《Java併發程式設計