1. 程式人生 > >java 非阻塞演算法在併發容器中的實現(ConcurrentLinkedQueue原始碼)

java 非阻塞演算法在併發容器中的實現(ConcurrentLinkedQueue原始碼)

簡介

非阻塞演算法在更細粒度的層面協調爭用,它比傳統的鎖有更高的併發性。隨著非阻塞演算法在 Java 中的應用越來越廣泛,java.concurrent 包中用非阻塞演算法實現的併發容器也越來越多,ConcurrentLinkedQueue 就是其中的一個重要成員。鑑於 ConcurrentLinkedQueue 的非阻塞演算法實現在併發容器中具有代表性,本文將結合 JDK Update23 的原始碼來分析它在當前的實現。由於非阻塞演算法本身比較複雜,閱讀本文的讀者需要對 CAS 原子指令和非阻塞同步機制有所瞭解。本文分為兩部分,第一部分主要是對非阻塞演算法相關理論知識的簡介,第二部分結合 ConcurrentLinkedQueue 的原始碼,來探索非阻塞演算法的具體實現。希望通過本文,能夠有助於讀者理解非阻塞演算法在併發容器中的工作原理與具體實現機制。

非阻塞演算法相關技術簡介

為了便於讀者更好的理解本文,首先讓我們來對非阻塞演算法相關的理論知識做個簡單的瞭解,更詳細的論述請參閱文中提及的相關參考文獻。

Java 的多執行緒同步機制

在現代的多處理器系統中,提高程式的並行執行能力是有效利用 CPU 資源的關鍵。為了有效協調多執行緒間的併發訪問,必須採用適當的同步機制來協調競爭。當前常用的多執行緒同步機制可以分為下面三種類型:

  • volatile 變數:輕量級多執行緒同步機制,不會引起上下文切換和執行緒排程。僅提供記憶體可見性保證,不提供原子性。
  • CAS 原子指令:輕量級多執行緒同步機制,不會引起上下文切換和執行緒排程。它同時提供記憶體可見性和原子化更新保證。
  • 內部鎖和顯式鎖:重量級多執行緒同步機制,可能會引起上下文切換和執行緒排程,它同時提供記憶體可見性和原子性。

多處理器系統對併發的支援

現代的多處理器系統大多提供了特殊的指令來管理對共享資料的併發訪問,這些指令能實現原子化的讀 - 改 - 寫操作。現代典型的多處理器系統通常支援兩種同步原語(機器級別的原子指令):CAS 和 LL/SC。Intel,AMD 和 SPARC 的多處理器系統支援“比較並交換”(compare-and-swap,CAS)指令。IBM PowerPC,Alpha AXP,MISP 和 ARM 的多處理器系統支援“載入連結 / 儲存條件”(load-linked/store-conditional,LL/SC)指令。

JDK 為 concurrent.atomic 包中的原子類提供了 compareAndSet() 方法,compareAndSet() 方法使用上面這些機器級別的原子指令來原子化的更新值。java. concurrent 包中的這些原子類,為用非阻塞演算法實現併發容器打下了基礎。

關於 CAS 原子指令,感興趣的讀者可以參閱參考文獻 1 的 15.2 章和參考文獻 3 的附錄 B8。關於 CAS 原子指令在不同多處理器上的有關細節,可以參閱參考文獻 4 的“Multiprocessors”部分。

非阻塞演算法

一個執行緒的失敗和掛起不會引起其他些執行緒的失敗和掛起,這樣的演算法稱為非阻塞演算法。非阻塞演算法通過使用底層機器級別的原子指令來取代鎖,從而保證資料在併發訪問下的一致性。

從 Amdahl 定律我們可以知道,要想提高併發性,就應該儘量使序列部分達到最大程度的並行;也就是說:最小化序列程式碼的粒度是提高併發效能的關鍵。

與鎖相比,非阻塞演算法在更細粒度(機器級別的原子指令)的層面協調多執行緒間的競爭。它使得多個執行緒在競爭相同資源時不會發生阻塞,它的併發性與鎖相比有了質的提高;同時也大大減少了執行緒排程的開銷。同時,由於幾乎所有的同步原語都只能對單個變數進行操作,這個限制導致非阻塞演算法的設計和實現非常複雜。

關於非阻塞演算法,感興趣的讀者可以參閱參考文獻 1 的 15.4 章。關於 Amdahl 定律,感興趣的讀者可以參閱參考文獻 1 的 11.2 章和參考文獻 3 的 1.5 章。

非阻塞演算法實現簡述

基於非阻塞演算法實現的併發容器

在探索 ConcurrentLinkedQueue 非阻塞演算法的具體實現機制之前,首先讓我們來了解一下 JDK 中基於非阻塞演算法實現的併發容器。在 JDKUpdate23 的 util.concurrent 包中,基於非阻塞演算法實現的併發容器包括:ConcurrentLinkedQueue,SynchronousQueue,Exchanger 和 ConcurrentSkipListMap。ConcurrentLinkedQueue 是一個基於連結節點的無界執行緒安全佇列,本文接下來將結合 JDK 原始碼,來探索它的非阻塞演算法的具體實現機制。SynchronousQueue 是一個沒有容量的阻塞佇列,它使用雙重資料結構 來實現非阻塞演算法。Exchanger 是一個能對元素進行配對和交換的交換器。它使用 消除 技術來實現非阻塞演算法 。ConcurrentSkipListMap 是一個可以根據 Key 進行排序的可伸縮的併發 Map。

關於雙重資料結構,感興趣的讀者可以參閱參考文獻 3 的 10.7 章,關於消除技術,感興趣的讀者可以參閱參考文獻 3 的第 11 章。

ConcurrentLinkedQueue 的非阻塞演算法簡述

本文接下來將在分析 ConcurrentLinkedQueue 原始碼實現的過程中,穿插講解非阻塞演算法的具體實現。為了便於讀者理解本文,首先讓我們對它的實現機制做個全域性性的簡述。ConcurrentLinkedQueue 的非阻塞演算法實現可概括為下面 5 點:

  1. 使用 CAS 原子指令來處理對資料的併發訪問,這是非阻塞演算法得以實現的基礎。
  2. head/tail 並非總是指向佇列的頭 / 尾節點,也就是說允許佇列處於不一致狀態。 這個特性把入隊 / 出隊時,原本需要一起原子化執行的兩個步驟分離開來,從而縮小了入隊 / 出隊時需要原子化更新值的範圍到唯一變數。這是非阻塞演算法得以實現的關鍵。
  3. 由於佇列有時會處於不一致狀態。為此,ConcurrentLinkedQueue 使用三個不變式來維護非阻塞演算法的正確性。
  4. 以批處理方式來更新 head/tail,從整體上減少入隊 / 出隊操作的開銷。
  5. 為了有利於垃圾收集,佇列使用特有的 head
    更新機制;為了確保從已刪除節點向後遍歷,可到達所有的非刪除節點,佇列使用了特有的向後推進策略。

ConcurrentLinkedQueue 有機整合了上述 5 點來實現非阻塞演算法。由於三個不變式會從全域性來約束非阻塞演算法,所以在開始分析原始碼之前,讓我們首先來了解它。

不變式

在後面的原始碼分析中,我們將會看到佇列有時會處於不一致狀態。為此,ConcurrentLinkedQueue 使用三個不變式 ( 基本不變式,head 的不變式和 tail 的不變式 ),來約束佇列中方法的執行。通過這三個不變式來維護非阻塞演算法的正確性。

不變式:併發物件需要一直保持的特性。不變式是併發物件的各個方法之間必須遵守的“契約”,每個方法在呼叫前和呼叫後都必須保持不變式。採用不變式,就可以隔離的分析每個方法,而不用考慮它們之間所有可能的互動。

基本不變式
在執行方法之前和之後,佇列必須要保持的不變式:

  • 當入隊插入新節點之後,佇列中有一個 next 域為 null 的(最後)節點。
  • 從 head 開始遍歷佇列,可以訪問所有 item 域不為 null 的節點。

head 的不變式和可變式
在執行方法之前和之後,head 必須保持的不變式:

  • 所有“活著”的節點(指未刪除節點),都能從 head 通過呼叫 succ() 方法遍歷可達。
  • head 不能為 null。
  • head 節點的 next 域不能引用到自身。

在執行方法之前和之後,head 的可變式:

  • head 節點的 item 域可能為 null,也可能不為 null。
  • 允許 tail 滯後(lag behind)於 head,也就是說:從 head 開始遍歷佇列,不一定能到達 tail。

tail 的不變式和可變式
在執行方法之前和之後,tail 必須保持的不變式:

  • 通過 tail 呼叫 succ() 方法,最後節點總是可達的。
  • tail 不能為 null。

在執行方法之前和之後,tail 的可變式:

  • tail 節點的 item 域可能為 null,也可能不為 null。
  • 允許 tail 滯後於 head,也就是說:從 head 開始遍歷佇列,不一定能到達 tail。
  • tail 節點的 next 域可以引用到自身。

在接下來的原始碼分析中,在初始化 ConcurrentLinkedQueue 之後及呼叫入隊 / 出隊方法之前和之後,我們都會參照上面三個不變式來分析它們的正確性。

節點類實現及佇列初始化

節點類定義
ConcurrentLinkedQueue 是用節點連結成的連結串列來實現的。首先,讓我們來看看節點類的原始碼:

清單 1. 節點類

 private static class Node<E> { 
        private volatile  E item;           // 宣告為 volatile 型
        private volatile  Node<E> next;    // 宣告為 volatile 型

        Node(E item) {                       // 建立新節點
            lazySetItem(item);              // 惰性設定 item 域的值
         } 

        E getItem() { 
            return item; 
        } 

        boolean casItem(E cmp, E val) {   // 使用 CAS 指令設定 item 域的值
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); 
        } 

        void setItem(E val) {  // 使用“volatile 寫”的方式,設定 item 域的值
             item = val; 
        } 
        voidlazySetItem(E val) { //惰性設定 item 域的值
                 UNSAFE.putOrderedObject(this, itemOffset, val); 
        } 

        void lazySetNext(Node<E> val) {    // 惰性設定 next 域的值 
            UNSAFE.putOrderedObject(this, nextOffset, val); 
        } 

        Node<E> getNext() { 
            return next; 
        } 

                                                      //CAS 設定 next 域的值
        boolean casNext(Node<E> cmp, Node<E> val) { 
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); 
        } 

        private static final sun.misc.Unsafe UNSAFE=     // 域更新器
        sun.misc.Unsafe.getUnsafe(); 
        private static final long nextOffset=              //next 域的偏移量
        objectFieldOffset(UNSAFE, "next", Node.class); 
        private static final long itemOffset=              //item 域的偏移量
        objectFieldOffset(UNSAFE, "item", Node.class); 

    }

在 ConcurrentLinkedQueue 的實際應用中,會頻繁分配大量生命週期短暫的節點物件。為了降低開銷,Node 類的 item 域和 next 域被宣告為普通的 volatile 型別。它們通過原子引用域更新器(AtomicReferenceFieldUpdater),使用反射來更新。關於原子引用域更新器,感興趣的讀者可以參閱參考文獻 1 的 15.4.3。

節點型別說明

為了便於讀者理解本文,下面對文中涉及的不同型別的節點集中做個定義:

  • 有效節點:從 head 向後遍歷可達的節點當中,item 域不為 null 的節點。
  • 無效節點:從 head 向後遍歷可達的節點當中,item 域為 null 的節點。
  • 以刪除節點:從 head 向後遍歷不可達的節點。
  • 哨兵節點:連結到自身的節點(哨兵節點同時也是以刪除節點)。
  • 頭節點:佇列中的第一個有效節點(如果有的話)。
  • 尾節點:佇列中 next 域為 null 的節點(可以是無效節點)。

下面是不同型別節點的示意圖:
圖 1. 不同型別節點示意圖

圖 1. 不同型別節點示意圖

對比 head 的不變式和 tail 的不變式可以看出,head 只能指向有效節點和無效節點,而 tail 可以指向任意節點,包括以刪除節點和哨兵節點。在 ConcurrentLinkedQueue 中,入隊時只能把新節點連結到尾節點的後面,出隊時只能刪除頭節點。

佇列初始化

接下來讓我們來看看 ConcurrentLinkedQueue 初始化過程的原始碼實現:
清單 2. 初始化

   // 建立一個 item 域為 null,next 域為 null 的偽節點
 private transient volatile Node<E> head = new Node<E>(null); 

 private transient volatile Node<E> tail = head; 

 public ConcurrentLinkedQueue() {}

當初始化一個 ConcurrentLinkedQueue 物件時,會建立一個 item 域為 null 和 next 域為 null 的偽節點,並讓 head 和 tail 指向這個偽節點。下面是佇列初始化之後的結構示意圖:
圖 2. 初始化狀態結構圖

圖 2. 初始化狀態結構圖

從上圖我們可以看出,處於初始化狀態的佇列滿足三個不變式。

批處理更新,更新 head 及向後推進

批處理更新 head/tail

為了儘量減少執行 CAS 原子指令的次數,執行入隊 / 出隊操作時 , ConcurrentLinkedQueue 並不總是更新 head/tail。只有從 head/tail 到頭 / 尾節點之間的“距離”達到變數 HOPS 指定的閥值,入隊 / 出隊操作才會更新它們。下面這行原始碼定義了 HOPS 這個變數:

清單 3.HOPS 變數

// 更新 head/tail 的閥值
private static final int HOPS = 1;

以批處理方式更新減少了更新 head/tail 的次數(減少了執行 CAS 原子指令的次數),但額外的增加了遍歷佇列,尋找頭 / 尾節點的開銷(增加了讀 volatile 變數的開銷)。在當前大多數的處理器系統中,volatile 讀操作的開銷非常低,幾乎和非 volatile 變數的讀操作一樣(見參考文獻 2)。而執行一條 CAS 原子指令要消耗比普通載入或儲存指令多得多的時鐘週期。因為 CAS 原子指令的執行包含了記憶體屏障(關於記憶體屏障,感興趣的讀者可以參閱參考文獻 4 的 Memory barriers 一章),防止亂序執行以及對各種編譯器優化的抑制。因此以批處理方式更新 head/tail,從整體上減少了入隊 / 出隊操作的開銷。

head 更新
為了有利於垃圾收集,ConcurrentLinkedQueue 在更新 head 指向新頭結點後,會把舊頭節點設定為哨兵節點。下面是更新 head 的原始碼:

清單 4. 更新 head

final void updateHead(Node<E> h, Node<E> p) { 
         // 如果兩個節點不相同,嘗試用 CAS 指令原子更新 head 指向新頭節點
         if (h != p && casHead(h, p)) 
             // 惰性設定舊頭結點為哨兵節點
             h.lazySetNext (h); 
         }

下面通過一個示意圖來理解已刪除節點在佇列中的狀態:
圖 3. 已刪除節點狀態示意圖

圖 3. 已刪除節點狀態示意圖

在上圖中,假設開始時 head 指向 A 節點,然後連續執行了 4 次出隊操作,刪除 A,B,C,D 4 個節點。在出隊 B 節點時,head 與頭結點之間的距離達到變數 HOPS 指定的閥值。這觸發執行 updateHead()方法:首先設定 head 指向 C 節點,然後設定 B 節點的 next 域指向自身。同樣,在出隊 D 節點時,重複同樣的過程。由於 B 和 D 節點斷開了以刪除節點與佇列的連結,這將有利於虛擬機器回收這些以刪除節點佔用的記憶體空間。

向後推進
由於 tail 可以指向任意節點,所以從 tail 向後遍歷尋找尾節點的過程中,可能會遇到哨兵節點。此時 succ() 方法會直接跳轉到 head 指向的節點繼續遍歷。下面是 succ() 方法的原始碼:

清單 5. 向後推進

final Node<E> succ(Node<E> p) { 
    Node<E> next = p.getNext(); 

 // 如果 p 節點的 next 域連結到自身(p 節點是哨兵節點)
 // 就跳轉到 head,從 head 開始繼續遍歷 
         // 否則向後推進到下一個節點
         return (p == next) ? head : next; 
     }

從上面的原始碼我們可以看出,如果向後推進過程中遇到哨兵節點,就跳轉到 head,從 head 開始繼續遍歷;否則,就推進到下一個節點。
下面通過一個示意圖來理解跳轉動作的的執行過程:

圖 4. 跳轉動作示意圖

圖 4. 跳轉動作示意圖

上圖的隊列當前處於 tail 滯後於 head 狀態。假設現在執行入隊操作,需要從 tail 開始向後遍歷找到佇列的尾節點。tail 開始時指向 A 節點,執行 succ() 方法向後推進到 B 節點。在 B 節點執行 succ() 方法時,由於 B 節點連結到自身,所以跳轉到 head 指向的 E 節點繼續遍歷。下面對滯後與跳轉做個總結:

  • 如上圖所示,如果 head 落在 tail 的後面,佇列就處於 tail 滯後於 head 狀態。
  • 如果 tail 滯後於 head,從 tail 向後遍歷過程中就會發生跳轉動作。
  • 跳轉動作確保從已刪除節點向後遍歷,可以到達所有的未刪除節點。

入隊操作

在 ConcurrentLinkedQueue 中,插入新節點時,不用考慮尾節點是否為有效節點,直接把新節點插入到尾節點的後面即可。由於 tail 可以指向任意節點,所以入隊時必須先通過 tail 找到尾節點,然後才能執行插入操作。如果插入不成功(說明其他執行緒已經搶先插入了一個新的節點)就繼續向後推進。重複上述迭代過程,直到插入成功為止。下面是入隊方法的原始碼:

清單 6. 插入新節點

    public boolean offer(E e) { 
        if (e == null) throw new NullPointerException(); 
        Node<E> n = new Node<E>(e);     // 建立新節點
        retry: 
        for (;;) { 
            Node<E> t = tail; 
            Node<E> p = t; 
            for (int hops = 0; ; hops++) { 
                Node<E> next = succ(p);          //A 
                if (next != null) {                    //B 
                    if (hops > HOPS&& t != tail)    //B1 
                        continue retry;                 //B2 
                    p = next;                           //B3 
                } else if (p.casNext(null, n)) {        //C 
                    if (hops >= HOPS)         //C1 
                        casTail(t, n);                  //C2 
                    return true;                        //C3 
                } else {                                //D 
                    p = succ(p);                        //D1 
                } 
            } 
        } 
 }

插入新節點的原始碼分析

offer() 方法使用非阻塞演算法慣用的“迴圈嘗試”的方式來執行:如果因其他執行緒干擾而 失敗就重新嘗試,直到成功為止。下面是關鍵程式碼的解釋:

A: 找到 tail 的下一個節點 next。
B: 如果 next 不為 null。
B1:如果已經至少越過了兩個節點,且 tail 被修改 (tail 被修改,說明其他執行緒向佇列添加了新的節點,且更新 tail 成功 )。
B2:跳出內外兩層迴圈,重新開始迭代(因為 tail 剛剛被其他執行緒更新了)。
B3:向後推進到下一個節點。
C: 如果當前節點為尾節點,使用 CAS 原子指令設定尾節點的 next 域指向新節點。
C1:如果已經至少越過了一個節點(此時,tail 至少滯後尾節點兩個節點)。
C2:使用 CAS 原子指令更新 tail 指向這個新插入的節點。
C3:新節點以插入佇列,不管更新 tail 是否成功,退出方法。
D: 如果向隊尾插入新節點不成功(其他執行緒已經搶先在隊尾插入了一個新節點)。
D1:向後推進到下一個節點。

佇列的入隊方法包含兩個步驟:新增新節點和更新 tail 指向這個新節點。這兩個步驟分別對應程式碼分析的 C 和 C2。從程式碼中我們可以看到,這兩個步驟都是用 CAS 原子指令來完成的。由於 ConcurrentLinkedQueue 允許佇列處於不一致狀態,所以這裡的 C 和 C2 這兩個步驟不必一起原子的執行。在 C 處新增新節點後,只有當 tail 與新新增節點之間的距離達到了 HOPS 指定的閥值,才會執行 C2 來更新 tail。

tail 在佇列中的位置分析
根據 tail 的不變式和可變式,在執行入隊操作前,tail 在佇列中的位置共有三種可能:
1. tail 指向尾節點。
2. tail 節點指向非尾節點。
3. tail 滯後於 head。
下面分別分析這三種情形,首先讓我們看看第一種情形的示意圖:

圖 5.tail 指向尾節點

圖 5.tail 指向尾節點

開始時,tail 指向 D 節點,首先尋找 D 節點的後繼節點。由於 D 的後繼節點為 null,所以插入新節點到 D 節點的後面。如果插入成功就退出方法;如果插入失敗(說明其他執行緒剛剛插入了一個新節點),就向後推進到新插入的節點,然後重新開始迭代。下圖是插入成功後的示意圖:

圖 6.tail 指向尾節點,插入新節點成功

圖 6.tail 指向尾節點,插入新節點成功

在上圖中,由於 tail 滯後於尾節點的節點數還沒有達到 HOPS 指定的閾值,所以 tail 沒有被更新。
下面,讓我們看看第二種情形的結構示意圖:

圖 7.tail 指向非尾節點

圖 7.tail 指向非尾節點

開始時,tail 指向 C 節點。首先找到 C 的後繼節點 D,然後向後推進到節點 D,後面程式碼執行路徑與上面的“tail 指向尾節點 ”的程式碼執行路徑相同。下圖是插入成功後的結構示意圖:

圖 8.tail 指向非尾節點,插入新節點成功

圖 8.tail 指向非尾節點,插入新節點成功

上圖中的 tail 更新了位置。因為在新增 E 節點後,tail 滯後的節點數達到了 HOPS 指定的閾值。這觸發執行更新 tail 的 CAS 操作。
最後,讓我們看看第三種情形的結構示意圖:

圖 9.tail 滯後於 head

圖 9.tail 滯後於 head

開始時,tail 指向 A 節點。首先找到 A 的後繼節點 B,然後向後推進到節點 B。由於 B 是哨兵節點,產生跳轉動作,跳過 C 節點,從 head 指向的 D 節點開始繼續向後遍歷。後面的程式碼執行路徑與“tail 指向非尾節點”相同。下面是成功插入一個新節點後的結構示意圖:

圖 10.tail 滯後於 head,插入新節點成功

圖 10.tail 滯後於 head,插入新節點成功

上圖的 tail 更新了位置,因為 tail 滯後的節點數達到了 HOPS 指定的閾值,這觸發執行更新 tail 的 CAS 操作。
從上面插入新節點後的三個結構示意圖我們可以看出,執行入隊操作後的佇列依然滿足三個不變式。

出隊操作

在 ConcurrentLinkedQueue 中,出隊操作從佇列的頭部刪除第一個有效節點。根據 head 的不變式和可變式,head 可以指向無效節點,所以出隊前必須先檢查 head 是否指向有效節點。如果指向無效節點就要向後推進,直到找到第一個有效節點,然後再執行出隊操作。下面是出隊方法的原始碼:

清單 7. 刪除佇列頭節點

    public E poll() { 
        Node<E> h = head; 
        Node<E> p = h; 
        for (int hops = 0; ; hops++) { 
            E item = p.getItem();                                    //A 
            if (item != null && p.casItem(item, null)) {     //B 
                if (hops >= HOPS) {                        //C 
                    Node<E> q = p.getNext();                   //C1 
                    updateHead(h, (q != null) ? q : p);              //C2 
                } 
                return item;                                         //D 
            } 
            Node<E> next = succ(p);                            //E 
            if (next == null) {                                      //F 
                updateHead(h, p);                                    //G 
                break;                                               //H 
            } 
            p = next;                                                //I 
        } 
        return null; 
    }

刪除頭結點的程式碼分析

和 offer() 方法一樣,poll() 方法也使用“迴圈嘗試”的方式來執行。下面是對關鍵程式碼的解釋:
A:獲得當前節點 p 的 item 域的值。
B:如果當前節點是有效節點,且成功設定這個節點為無效節點。
C:如果迭代過程已經越過了不小於 1 個節點。
C1:取得後繼結點 q。
C2:如果 q 不為 null,設定 head 指向後繼節點 q;否則設定 head 指向當前節點 p(此時佇列為空,只有一個偽節點 p)。
D:返回被移除節點 item 域的值。
E:向後推進到下一個節點 next(因為當前節點 p 是一個無效節點)。
F:如果 next 為 null。
G:設定 head 指向 p 節點(此時佇列為空,只有一個偽節點 p)。
H:退出迴圈。
I:推進到下一個節點。

佇列的出隊方法包含兩個步驟:刪除頭節點和更新 head 指向新頭節點。這兩個步驟分別對應程式碼分析的 B 和 C2。這裡對頭節點的刪除使用了一個小技巧:設定頭節點的 item 域為 null,即刪除了它(雖然這個節點還在佇列中,但它以是無效節點)。在程式碼中我們可以看到,這兩個步驟都使用 CAS 原子指令來完成。由於 ConcurrentLinkedQueue 允許佇列處於不一致狀態,所以這裡的 B 和 C2 這兩個步驟不必一起原子的執行。在 B 處刪除頭節點後,只有當 head 與新頭節點之間的距離達到了 HOPS 指定的閥值,才會執行 C2 來更新 head。

head 在佇列中的位置分析
根據 head 的不變式和可變式,在執行出隊操作前,head 在佇列中的位置共有兩種可能:
1. head 指向有效節點。
2. head 指向無效節點。

下面,讓我們首先來看第一種情形的結構示意圖:

圖 11.head 指向有效節點

圖 11.head 指向有效節點

出隊時,首先取得 head 指向的 A 節點的 item 域的值,然後通過 CAS 設定 A 節點 item 域的值為 null。如果成功,由於此時越過的節點數為 0,所以直接返回 A 節點 item 域原有的值。如果不成功,說明其他執行緒已經搶先刪除了該節點,此時向後推進到 B 節點。重複這個過程,直到成功刪除一個節點;如果遍歷完佇列也沒有刪除成功,則返回 null。下面是成功刪除後的結構示意圖:

圖 12. 成功刪除 head 指向的有效節點

圖 12. 成功刪除 head 指向的有效節點

在上圖中,雖然 A 節點被設定成無效節點,但 head 依然指向它,因為刪除操作越過的節點數還沒有達到 HOPS 指定的閥值。
接下來,讓我們來看看第二種情形的結構示意圖:

圖 13.head 指向無效節點

圖 13.head 指向無效節點

首先獲得 head 指向節點的 item 域的值,由於為 null,所以向後推進到 B 節點。獲得 B 節點 item 域的值後,通過 CAS 設定該值為 null。如果成功,由於已經達到 HOPS 指定的閥值,觸發執行 head 更新。如果不成功(說明其他執行緒已經搶先刪除了 B 節點),繼續向後推進到 C 節點。重複這個過程,直到刪除一個有效節點。如果遍歷完佇列也沒有刪除成功,則返回 null。下圖是成功刪除後的結構示意圖:

圖 14.head 指向無效節點,成功刪除

圖 14.head 指向無效節點,成功刪除

從上圖我們可以看到,在執行刪除操作過程中,head 越過的節點數達到閥值,觸發執行 head 的更新,使它指向 C 節點。
從上面刪除頭節點後的兩個結構示意圖可以看出,執行出隊操作後的佇列依然滿足三個不變式。

總結

ConcurrentLinkedQueue 的非阻塞演算法實現非常精巧,也非常複雜。它使用 CAS 原子指令來處理對資料的併發訪問。同時,它允許佇列處於不一致狀態。這個特性分離了入隊 / 出隊操作中包含的兩個需要一起原子執行的步驟,從而有效的縮小了入隊 / 出隊時的原子化(更新值的)範圍為唯一變數。由於佇列可能處於不一致狀態,為此 ConcurrentLinkedQueue 使用三個不變式來維護非阻塞演算法的正確性。

雖然我們不用自己去實現如此複雜的併發資料結構,但知曉它的工作原理與實現機制對於我們更好的使用它將很有幫助。

參考資料