1. 程式人生 > >Java語言中的無鎖化執行緒安全佇列

Java語言中的無鎖化執行緒安全佇列

  在併發程式設計中,有時候需要使用執行緒安全的佇列。對於執行緒安全的佇列,有兩種實現方式:一種是阻塞(加鎖),一種是非阻塞(無鎖)。對於無鎖化執行緒安全佇列,實現要基於兩個方面:原子性操作和記憶體訪問控制。說的淺顯一些,就是在JDK中,需要用到Unsafe類中的CAS操作,結合volatile關鍵字,來實現無鎖化執行緒安全佇列。具有代表性的就是ConcurrentLinkedQueue,當然還有其他的佇列,諸如ConcurrentSkipListSet、ConcurrentSkipListMap。下面就著重說一下ConcurrentLinkedQueue的實現。

  ConcurrentLinkedQueue是一個基於連結串列的無界執行緒安全佇列,它採用先進先出的規則對節點進行排序,當我們新增一個元素的時候,它會新增到佇列的尾部,當我們獲取一個元素的時候,它會返回頭部的元素。這是採用“wait-free”演算法(CAS)來實現的。

原始碼中的CAS操作和volatile關鍵字:

ConcurrentLinkedQueue的類圖如下:

從類圖中可以看出,ConcurrentLinkedQueue是由head節點和tail節點組成,每個節點(Node)由節點元素和指向下一個節點的引用(next)組成,節點與節點之間就是通過這個next連線在一起,從而形成一張連結串列結構的佇列。

要了解ConcurrentLinkedQueue是無鎖的執行緒安全佇列,要從入隊和出隊來分析。

入隊:

入隊就是將入隊的節點新增到佇列的尾部,ConcurrentLinkedQueue的入隊操作是方法offer(E e),該方法原始碼如下:

public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

從原始碼中我們可以看出,入隊過程主要做兩件事情:第一件事情就是定位出尾節點,第二件事情就是使用CAS演算法將入隊節點設定成尾節點的next節點,如不成功則重試。其中 casTail(t, newNode)方法就是CAS操作:

出隊:

出佇列就是從佇列中返回一個節點元素,並且清空該節點對元素的引用。ConcurrentLinkedQueue的出隊操作是方法poll(),該方法原始碼如下:

public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

從原始碼中可以看出,出隊操作首先要獲取頭節點的元素,判斷該元素是否為空,如果為空,則說明另外一個執行緒已經進行了一次出隊操作並且取走該元素,如果不為空,則使用CAS的方式將頭節點的引用設定為null,如果CAS成功,則返回頭節點的元素,否則表示另外一個執行緒先於本執行緒做了CAS,導致該佇列發生了變化,需要重新獲取頭節點。其中casItem(item, null)方法和updateHead(h, ((q = p.next) != null) ? q : p)都屬於CAS操作。