1. 程式人生 > >Java併發包原始碼學習系列:基於CAS非阻塞併發佇列ConcurrentLinkedQueue原始碼解析

Java併發包原始碼學習系列:基於CAS非阻塞併發佇列ConcurrentLinkedQueue原始碼解析

[toc] ## 非阻塞併發佇列ConcurrentLinkedQueue概述 我們之前花了很多時間瞭解學習BlockingQueue阻塞佇列介面下的各種實現,也大概對阻塞佇列的實現機制有了一定的瞭解:阻塞 + 佇列嘛。 而且其中絕大部分是完全基於獨佔鎖ReentrantLock和條件機制condition實現的併發同步,但基於獨佔鎖的實現較重量級,可能會引起上下文切換和執行緒排程,效能上有一定欠缺。比如:`ArrayBlockingQueue`、`LinkedBlockingQueue`等等。 - [Java併發包原始碼學習系列:阻塞佇列實現之ArrayBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113252384) - [Java併發包原始碼學習系列:阻塞佇列實現之LinkedBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113329416) - [Java併發包原始碼學習系列:阻塞佇列實現之PriorityBlockingQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113358710) - [Java併發包原始碼學習系列:阻塞佇列實現之DelayQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113440013) 在我們印象中,有幾個具有transfer特性的佇列為了效能,會優先考慮自旋,採用CAS非阻塞演算法,自旋到一定程度呢,才採取阻塞,比如:`SynchronousQueue`、`LinkedTransferQueue`等等,原理上是基於CAS原子指令提供的輕量級多執行緒同步機制。 - [Java併發包原始碼學習系列:阻塞佇列實現之SynchronousQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113528143) - [Java併發包原始碼學習系列:阻塞佇列實現之LinkedTransferQueue原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113576816) 而我們今天要學習的這個`ConcurrentLinkedQueue`並沒有實現BlockingQueue介面,是一個完完全全使用CAS操作實現執行緒安全的、無界的非阻塞佇列。 ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210206172049632-1137936677.png) ## 結構組成 ```java public class ConcurrentLinkedQueue extends AbstractQueue implements Queue, java.io.Serializable { private static final long serialVersionUID = 196745693267521676L; /** * The fundamental invariants are: * - There is exactly one (last) Node with a null next reference, * which is CASed when enqueueing. This last Node can be * reached in O(1) time from tail, but tail is merely an * optimization - it can always be reached in O(N) time from * head as well. * - The elements contained in the queue are the non-null items in * Nodes that are reachable from head. CASing the item * reference of a Node to null atomically removes it from the * queue. Reachability of all elements from head must remain * true even in the case of concurrent modifications that cause * head to advance. A dequeued Node may remain in use * indefinitely due to creation of an Iterator or simply a * poll() that has lost its time slice. */ private static class Node { volatile E item; // 值 volatile Node next; // next域 Node(E item) { // 構造節點,保證執行緒安全 UNSAFE.putObject(this, itemOffset, item); } /* ----- 內部使用UNSafe工具類提供的CAS演算法 ----- */ // 如果item為cmp, 改為val boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // 將next設定為val void lazySetNext(Node val) { UNSAFE.putOrderedObject(this, nextOffset, val); } // 如果next為cmp, 將next改為val boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } /** * A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. * Invariants: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * Non-invariants: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */ private transient volatile Node head; /** * A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. * Invariants: * - the last node is always reachable from tail via succ() * - tail != null * Non-invariants: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */ private transient volatile Node tail; // 無參構造,初始化將head和tail指向item為null的哨兵節點 public ConcurrentLinkedQueue() { head = tail = new Node(null); } // 指定初始容量 public ConcurrentLinkedQueue(Collection c) { Node h = null, t = null; for (E e : c) { checkNotNull(e); Node newNode = new Node(e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); t = newNode; } } if (h == null) h = t = new Node(null); head = h; tail = t; } } ``` 在ConcurrentLinkedQueue非阻塞演算法實現中,head/tail並不是總是指向頭/尾節點,也就是說允許佇列處於不一致狀態,優點是:把入隊/出隊原本需要一起原子化執行的兩個步驟分離,從而縮小入隊/出隊時需要原子化更新值的範圍到唯一變數,這是非阻塞演算法得以實現的關鍵。 由於佇列有時會處於不一致的狀態,為此ConcurrentLinkedQueue 提供了3個不變式來維護非阻塞演算法的正確性,分別是:基本不變式、head的不變式和tail的不變式。 > 不變式是指: 併發物件的各個方法之間必須遵守的”契約”,**每個方法在呼叫前和呼叫後都必須保持不變式**。採用不變式,就可以隔離的分析每個方法,而不用考慮它們之間所有可能的互動。 ### 基本不變式 1. 當入隊插入新節點之後,佇列中有一個next域為null的(最後)節點。 2. 從head開始遍歷佇列,可以訪問所有item域不為null的節點。 ### head的不變式與可變式 **不變式** 1. 所有存活的節點,都能從head通過呼叫succ()方法遍歷可達。 2. head不能為null。 3. head節點的next域不能引用到自身。 **可變式** 1. head節點的item值可能為null,也可能不為null。 2. 允許tail之後與head,也就是說:從head開始遍歷佇列,不一定能達到tail。 ### tail的不變式與可變式 **不變式** 1. 通過tail呼叫succ()方法,最後節點總是可達的。 2. tail不能為null。 **可變式** 1. tail節點的item域可能為null,也可能不為null。 2. 允許tail滯後於head,也就是說:從head開始遍歷佇列,不一定能到達tail。 3. tail節點的next域可以引用到自身。 ## offer操作 ### 原始碼解析 offer操作將會將元素e【非null】加入到隊尾,由於無界佇列的特性,這個操作將永遠不會返回false。 ```java public boolean offer(E e) { // 檢查元素是否為null,為null就拋空指標 checkNotNull(e); // 構造新節點 final Node newNode = new Node(e); // 【1】for迴圈從tail開始迭代 for (Node t = tail, p = t;;) { Node q = p.next; // 【2】q == null 說明是p是尾節點 if (q == null) { // 【3】 // cas將p的next設定為newNode,返回true // 如果設定失敗,說明有其他執行緒修改了p.next // 那就再次進入迴圈 if (p.casNext(null, newNode)) { // 【4】 // 這裡tail指標並不是每次插入節點都要更改的,從head開始第奇數個節點會是tail if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } //【5】 else if (p == q) // 併發情況下,移除head的時候【比如poll】,將會head.next = head // 也就滿足p == q 的分支條件, 需要重新找到新的head p = (t != (t = tail)) ? t : head; //【6】 else // 表明tail指向的已經不是最後一個節點了,更新p的位置 // 這裡其實就是找到最後一個節點的位置 p = (p != t && t != (t = tail)) ? t : q; } } ``` ### 圖解offer操作 ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210206172120399-1467997385.png) 上面是模擬的單執行緒情況下的offer一個元素的操作,可以看到: 1. 初始化head、tail都指向了item為null的哨兵節點,他們的next指向null。 2. 單執行緒情況下,我們暫時認為CAS操作都是執行成功的,此時q為null,將會走第一個分支【2】,將p的next指向newNode,此時p==t,因此不會執行【4】casTail操作,直接返回true。 多執行緒情況下,事情就不是這麼簡單了: ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210206172125719-286851832.png) 1. 加入執行緒A希望在隊尾插入資料A,執行緒B希望在隊尾插入資料B,他們同時到了【3】`p.casNext(null, newNode)`這一步,由於`casNext`是原子性的,假設A此時設定成功,且`p == t`,如圖1。 2. A成功,自然B執行緒cas設定next失敗,那麼將會再次進行for迴圈,此時`q != null && p != q`,走到【6】,將p移動到q的位置,也就是A的位置,如圖2。 3. 再次迴圈,此時`q==null`,再次進行【3】的設定next操作,此時假設B成功了,如圖3。 4. 此時你會發現,tail需要重新設定了,因為`p != t`條件滿足【4】,將會執行`casTail(t, newNode)`,將tail指標指向插入的B。 相信一通圖解 + 原始碼分析下來,你會慢慢對整個流程熟悉起來,稍微總結一下: > offer操作其實就是通過原子CAS操作控制某一時刻只有一個執行緒能成功在隊尾追加元素,CAS失敗的執行緒將會通過迴圈再次嘗試CAS操作,直到成功。 > > 非阻塞演算法就是這樣,通過迴圈CAS的方式利用CPU資源來替代阻塞執行緒的資源消耗。 > > 並且,tail指標並不是每次都是指向最後一個節點,由於自身的機制,最後一個節點要麼是tail指向的位置,要麼就是它的next。因此定位的時候,這裡使用p指標定位最後一個節點的位置。 對了,你會發現,在整個過程中,【5】操作一直沒有涉及到,其實【5】的情況會在poll操作的時候可能會發生,這裡先舉個例子吧: ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210206172133393-620592547.png) 圖一是poll操作可能會導致的情況的一種,以他為例子:此時tail節點指向棄用的節點,此時向佇列中offer一個元素。 1. 此時,執行到【2】處,各個指標的指向如圖1。 2. 接著由於q不為null,且p == q,順利進入【5】,這時p被賦值為head,如圖2。 3. 再次迴圈,q指向p.next,此時為null,如圖3。 4. q為null,進入【2】,和之前一樣,【3】設定next,此時【4】`p != t`,設定新節點為新的tail,如圖4。 ### JDK1.6 hops設計意圖 在看原始碼註釋的時候,我發現很多處都對hop這個玩意進行了註釋,原來JDK1.6的原始碼中確實有它的存在:[聊聊併發(六)ConcurrentLinkedQueue的實現原理分析](http://ifeve.com/concurrentlinkedqueue/),並且設計的理念還是一樣的,用hops控制tail節點的更新頻率,提高入隊的效率。 > 引用《Java併發程式設計的藝術》方騰飛 : > > **減少CAS更新tail節點的次數,就能提高入隊的效率**,所以doug lea使用hops變數來控制並減少tail節點的更新頻率,並不是每次節點入隊後都將 tail節點更新成尾節點,而是當 tail節點和尾節點的距離大於等於常量HOPS的值(預設等於1)時才更新tail節點,tail和尾節點的距離越長使用CAS更新tail節點的次數就會越少,但是距離越長帶來的負面效果就是每次入隊時定位尾節點的時間就越長,因為迴圈體需要多迴圈一次來定位出尾節點,但是這樣仍然能提高入隊的效率,因為從本質上來看它通過**增加對volatile變數的讀操作來減少了對volatile變數的寫操作,而對volatile變數的寫操作開銷要遠遠大於讀操作**,所以入隊效率會有所提升。 ```java private static final int HOPS = 1; public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node n = new Node(e); retry: for (;;) { Node t = tail; Node p = t; for (int hops = 0; ; hops++) { Node next = succ(p); // 1.獲取p的後繼節點。(如果p的next指向自身,返回head節點) if (next != null) { // 2.如果next不為null if (hops > HOPS && t != tail) continue retry; // 3.如果自旋次數大於HOPS,且t不是尾節點,跳出2層迴圈重試。 p = next; // 4.如果自旋字數小於HOPS或者t是尾節點,將p指向next。 } else if (p.casNext(null, n)) { // 5.如果next為null,嘗試將p的next節點設定為n,然後自旋。 if (hops >= HOPS) casTail(t, n); // 6.如果設定成功且自旋次數大於HOPS,嘗試將n設定為尾節點,失敗也沒關係。 return true; // 7.新增成功。 } else { p = succ(p); // 8。如果第5步嘗試將p的next節點設定為n失敗,那麼將p指向p的後繼節點,然後自旋。 } } } final Node succ(Node p) { Node next = p.getNext(); //如果p節點的next節點指向自身,那麼返回head節點;否則返回p的next節點。 return (p == next) ? head : next; ``` ## poll操作 poll操作將在隊頭出隊一個元素,並返回,如果佇列為空,則返回null。 ### 原始碼解析 ```java public E poll() { // 【1】continue xxx;會回到這 restartFromHead: // 【2】死迴圈 for (;;) { for (Node h = head, p = h, q;;) { E item = p.item; // 【3】如果當前 有值, 就cas操作置null if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. // 【4】 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 【item == null】 或 【item != null 但是 cas失敗了】 // 【5】佇列為空, 返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } // 【6】 else if (p == q) continue restartFromHead; // 【7】 else p = q; } } } final void updateHead(Node h, Node p) { // h == p 其實就不需要更新了,否則更新head為p,更新成功了,將h.next指向h本身 if (h != p && casHead(h, p)) h.lazySetNext(h); } ``` ### 圖解poll操作 先來看看最簡單的情況: ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210206172143376-767912611.png) 初始情況下,head和tail指向item為null的哨兵節點,此時假設某個執行緒執行poll操作,從head開始迭代:此時,`p.item == null && p.next == null`,將走到【5】這一分支,進行`updateHead`,此時p!=h,也就是直接返回null了。 如果此時走到【5】分支時,正好有另一個執行緒向佇列中添加了元素,這時情況如下: ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210206172148047-283838219.png) 1. 指標q將指向新插入元素的位置,此時【5】位置`q != null`,接著走【6】發現`p != q`,【6】也走不進去。 2. 最後走到【7】,將p指向q節點位置。 3. 再次進入迴圈,走到分支【3】,此時item不為null,嘗試cas設定item為null,假設設定成功後,此時條件【4】成立,`p != h`,設定p為head,使h指向自身,最後返回p的值。 你會發現,最終得結果,就是我們之前在分析offer操作時出現的一種情況,也就是offer的時候,發現tail.next = tail。 接著,我們可以看到,在poll中,也同樣存在類似的判斷,也就是【6】的程式碼,判斷`p == q`,同理也是類似的,下面有紫色表示執行緒A,藍色表示執行緒B。 ![](https://img2020.cnblogs.com/blog/1771072/202102/1771072-20210206172157267-732978096.png) 1. 假設執行緒A執行poll操作時,當前佇列狀態如圖1。 2. 如圖2,此時p通過cas操作將A設定為null。 3. 此時p != h,將會執行updateHead操作,在此之前,如果正好執行緒B開始poll,如圖3。 4. B執行緒就會進走到【6】,跳到restartFromHead,尋找當前佇列的head,如圖4。 > poll一個元素的時候,將會使用CAS操作將當前節點的item值設定為null,並CAS設定head,將移除的節點指向自己,使得被垃圾回收。 > > 整個迴圈過程中,不斷檢測併發情況,如果發現頭節點被修改,將會跳出迴圈,重新獲取新的head。 ## 總結 ConcurrentLinkedQueue是一個使用**CAS操作實現執行緒安全的、無界的非阻塞佇列,基於連結串列**。 連結串列的頭尾節點為volatile修飾,保證在多執行緒環境下的出隊入隊操作的安全性,volatile自身保證可見性,原子性由CAS操作保證。 設計上,**非阻塞演算法**允許佇列處於不一致狀態,比如tail指標並不是每次都指向最後一個節點,最後一個節點可能是tail,也可能是tail.next,這個特性分離了入隊/出隊操作中包含的兩個需要一起原子執行的步驟,從而有效地縮小了入隊/出隊時的原子化範圍的唯一變數。針對不一致,使用三個不變式來維護非阻塞演算法的正確性。 對volatile變數的寫操作開銷要遠遠大於讀操作,因此,額外增加了遍歷佇列、尋找頭/尾節點的開銷【增加volatile讀的開銷】,但是因為不需要每次操作都CAS更新head/tail【減少volatile寫的開銷】,提升入隊效率。 ## 參考閱讀 - 《Java併發程式設計之美》 - 《Java併發程式設計的藝術》 - [非阻塞演算法在併發容器中的實現](https://developer.ibm.com/zh/articles/j-lo-concurrent/) - [聊聊併發(六)ConcurrentLinkedQueue的實現原理分析](http://ifeve.com/concurrentlinkedqueue/) - [Jdk1.6 JUC原始碼解析(24)-ConcurrentLinkedQueue](https://blog.csdn.net/iteye_11160/article/details/8