1. 程式人生 > >Java併發程式設計筆記——J.U.C之collections框架:ConcurrentLinkedQueue

Java併發程式設計筆記——J.U.C之collections框架:ConcurrentLinkedQueue

一:ConcurrentLinkedQueue簡介

ConcurrentLinkedQueue是執行緒安全的無界非阻塞佇列,其底層資料結構使用單向連結串列實現,對於入隊和出隊操作使用CAS來實現執行緒安全。

Doug Lea在實現ConcurrentLinkedQueue時,並沒有利用鎖或底層同步原語,而是完全基於自旋+CAS的方式實現了該佇列。回想一下AQS,AQS內部的CLH等待佇列也是利用了這種方式。

由於是完全基於無鎖演算法實現的,所以當出現多個執行緒同時進行修改佇列的操作(比如同時入隊),很可能出現CAS修改失敗的情況,那麼失敗的執行緒會進入下一次自旋,再嘗試入隊操作,直到成功。

所以,在併發量適中的情況下,ConcurrentLinkedQueue一般具有較好的效能。

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

二:下面是ConcurrentLinkedQueue的類圖結構

三:

三:ConcurrentLinkedQueue原理

佇列結構

我們來看下ConcurrentLinkedQueue的內部結構:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
    implements Queue<E>, java.io.Serializable {
 
    /**
     * 佇列頭指標
     */
    private transient volatile Node<E> head;
 
    /**
     * 佇列尾指標.
     */
    private transient volatile Node<E> tail;
 
    // Unsafe mechanics
     
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
     
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentLinkedQueue.class;
            headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
    /**
     * 佇列結點定義
     */
    private static class Node<E> {
        volatile E item;        // 元素值
        volatile Node<E> next;  // 後驅指標
 
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
 
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
 
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
 
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
 
        // Unsafe mechanics
 
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
 
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
 
    //...
}

可以看到,ConcurrentLinkedQueue內部就是一個簡單的單鏈表結構,每入隊一個元素就是插入一個Node型別的結點。

欄位head指向佇列頭,tail指向佇列尾,通過Unsafe來CAS操作欄位值以及node物件的欄位值。

構造器的定義

ConcurrentLinkedQueue包含兩種構造器:

//構建一個空佇列(head,tail均指向一個佔位結點)
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
//根據以有集合構造佇列
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

我們不妨先看下空構造器,通過空構造器建立的ConcurrentLinkedQueue物件,其head和tail指標並非指向null,而是指向一個item值為null的node結點,如下圖:

入隊操作

元素的入隊是在隊尾插入元素,ConcurrentLinkedQueue的入隊程式碼非常簡單,卻也非常精妙:

offer操作是在佇列末尾新增一個元素,如果傳遞的引數是null則丟擲NPE異常,否則由於ConcurrentLinkedQueue是無界佇列,該方法會一直返回true。

另外,由於使用CAS演算法,因此該方法不會阻塞掛起呼叫執行緒。下面具體看下實現原理。

public boolean add(E e) {//入隊一個元素
    return offer(e);
}
//在隊尾入隊元素e,直到成功
public boolean offer(E e) {
    //e為null,則丟擲空指標異常
    checkNotNull(e);    

    //構造node結點,在建構函式內部呼叫unsafe.putObject
    final Node<E> newNode = new Node<E>(e);    

    //從尾結點進行插入
    for (Node<E> t = tail, p = t;;) {   
        Node<E> q = p.next;

         //CASE1:q==null 說明p是尾結點,則指向插入
        if (q == null) {   
            //使用CAS設定p結點的next結點
            if (p.casNext(null, newNode)) {    
                //CAS成功,則說明新增結點已經放入連結串列,然後設定當前尾結點(包含head,第1,3,4....個結點為尾結點)
                if (p != t) // hop two nodes at a time    //CAS競爭失敗的執行緒會在下一次自旋中進入該邏輯
                    casTail(t, newNode);  // Failure is OK.    //重新設定隊尾指標tail
                return true;
            }    //CAS競爭失敗則進入下一次自旋
        }
        else if (p == q)    //CASE2:發生了出隊操作
            //多執行緒操作時,由於poll操作移除元素後,可能會把head變為自引用,也就是head的next變成了head,所以這裡需要重新找新的head
            p = (t != (t = tail)) ? t : head;
        else
            //尋找尾結點
            p = (p != t && t != (t = tail)) ? t : q;    //將p重新指向隊尾結點
    }
}

我們來分析下offer方法的實現。單執行緒的情況下,元素入隊比較好理解,直接線性地在隊首插入元素即可。我們假設有兩個執行緒ThreadA和ThreadB同時進行入隊地操作。

①ThreadA先單獨入隊兩個元素9、2

此時佇列地結構如下:

②ThreadA入隊元素“10”,ThreadB入隊元素“25”

此時ThreadA和ThreadB若併發執行,我們看下會發生什麼:

1、ThreadA和ThreadB同時進入自旋中的以下程式碼塊:

if (q == null) {
    if (p.casNext(null, newNode)) {    //CASE1:正常情況下,新結點直接插入到隊尾
       //CAS競爭插入成功
        if (p != t) // hop two nodes at a time//CAS競爭失敗地執行緒會在下一次自旋中進入該邏輯
            casTail(t, newNode);  // Failure is OK.    //重新設定隊尾指標tail
        return true;
    //CAS競爭插入失敗則進入下一次自旋
    }

2、ThreadA執行cas操作(p.casNext)成功,插入新結點“10”

ThreadA執行完成後,直接返回true,佇列結構如下:

3、ThreadB執行cas操作(p.casNext)失敗

由於CAS操作同時修改隊尾元素,導致ThreadB操作失敗,則ThreadB進入下一次自旋;
在下一次自旋中,進入以下程式碼塊:

else
    // Check for tail updates after two hops.
    p = (p != t && t != (t = tail)) ? t : q;    //將p重新指向隊尾結點

上述分支的作用就是讓p指標重新定位到隊尾結點,此時佇列結構如下:

然後ThreadB會繼續下一次自旋,並再次進入以下程式碼塊:

if (q == null) {
    // p is last node
    if (p.casNext(null, newNode)) {
        // Successful CAS is the linearization point
        // for e to become an element of this queue,
        // and for newNode to become "live".
        if (p != t) // hop two nodes at a time
            casTail(t, newNode);  // Failure is OK.
        return true;
    }
    // Lost CAS race to another thread; re-read next
}

此時,CAS操作成功,佇列結構如下:

由於此時p!=t ,所以會呼叫casTail方法重新設定隊尾指標:

private boolean casTail(Node<E> cmp, Node<E> val) {    //重新設定隊尾指標tail
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

這個分支只有在元素入隊的同時,針對該元素也發生了“出隊”操作才會執行,我們後面會分析元素的“出隊”,理解了“出隊”操作再回頭來看這個分支就容易理解很多了。

出隊操作

佇列中元素的“出隊”是從隊首移除元素,我們來看下ConcurrentLinkedQueue是如何實現出隊的:

//在隊首出隊元素,直到成功
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {//CASE2:隊首是非哨兵結點(item!=null)
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);//CASE1:隊首是一個哨兵結點(item==null)
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

還是通過示例來看,假設初始的佇列結構如下:

①ThreadA先單獨進行出隊操作

由於head所指的是item==null的結點,所以ThreadA會執行以下分支:

else
    p = q;

然後進入下一次自旋,在自旋中執行以下分支,如果CAS操作成功,則移除首個有效元素,並重新設定頭指標:

if (item != null && p.casItem(item, null)) {    //CASE2:隊首是非哨兵結點
    // Successful CAS is the linearization point
    // for item to be removed from this queue.
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

此時佇列的結構如下:

如果ThreadA的CAS操作失敗呢?

CAS操作失敗則會進入以下分支,並重新開始自旋:

else if (p == q)
    continue restartFromHead;

最終前面兩個null結點會被GC回收,佇列結構如下:

②ThreadA繼續進行出隊操作

ThreadA繼續執行“出隊”操作,還是執行以下分支:

if (item != null && p.casItem(item, null)) {
    // Successful CAS is the linearization point
    // for item to be removed from this queue.
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

但是此時p==h,所以僅將頭結點置null,這其實是一種“懶刪除”的策略。

出隊元素“2”:

出隊元素“10”:

最終佇列結果如下:

③ThreadA進行出隊,其它執行緒進行入隊

這是最特殊的一種情況,當佇列中只剩下一個元素時,如果同時發生出隊和入隊操作,會導致隊列出現下面這種結構:(假設ThreadA進行出隊元素“25”,ThreadB進行入隊元素“11”)

此時tail.next=tail自身,所以ThreadB在執行入隊時,會進入到offer方法的以下分支:

else if (p == q)    //CASE2:發生出隊操作
    // We have fallen off list.  If tail is unchanged, it
    // will also be off-list, in which case we need to
    // jump to head, from which all live nodes are always
    // reachable.  Else the new tail is a better bet.
    p = (t != (t = tail)) ? t : head;

三、總結

ConcurrentLinkedQueue使用了自旋+CAS的非阻塞演算法來保證執行緒併發訪問時的資料一致性。由於佇列本身是一種連結串列結構,所以雖然演算法看起來很簡單,但其實需要考慮各種併發的情況,實現複雜度較高,並且ConcurrentLinkedQueue不具備實時的資料一致性,實際運用中,佇列一般在生產者-消費者的場景下使用得較多,所以ConcurrentLinkedQueue的使用場景並不如阻塞佇列那麼多。

另外,關於ConcurrentLinkedQueue還有以下需要注意的幾點:

  1. ConcurrentLinkedQueue的迭代器是弱一致性的,這在併發容器中是比較普遍的現象,主要是指在一個執行緒在遍歷佇列結點而另一個執行緒嘗試對某個佇列結點進行修改的話不會丟擲ConcurrentModificationException,這也就造成在遍歷某個尚未被修改的結點時,在next方法返回時可以看到該結點的修改,但在遍歷後再對該結點修改時就看不到這種變化。
  2. size方法需要遍歷連結串列,所以在併發情況下,其結果不一定是準確的,只能供參考。

參考書籍

Java併發程式設計之美

參考連結

https://segmentfault.com/a/11900000162481