本文開始介紹併發佇列,為後面介紹執行緒池打下基礎。併發佇列莫非也是出隊入隊操作,還有一個比較重要的點就是如何保證其執行緒安全性,有些併發佇列保證執行緒安全是通過lock,有些是通過CAS

我們從ConcurrentLinkedQueue開始吧。

1. 介紹

ConcurrentLinkedQueue集合框架的一員,是一個無界限且執行緒安全,基於單向連結串列的佇列。該佇列的順序是FIFO。當多執行緒訪問公共集合時,使用這個類是一個不錯的選擇。不允許null元素。是一個非阻塞的佇列。

它的迭代器是弱一致性的,不會丟擲java.util.ConcurrentModificationException,也可能在迭代期間,其他操作也正在進行。size()方法,不能保證是正確的,因為在迭代時,其他執行緒也可以操作該佇列。

1.1 類圖



(顯示的方法都是公有方法)

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>

繼承至AbstractQueue,他提供了佇列操作的一個框架,有基本的方法,addremoveelement等等,這些方法基於offerpollpeek(最主要看這幾個方法)。

2. 原始碼分析

2.1 類的整體結構

佇列中的元素Node

private static class Node<E> {
// 保證兩個欄位的可見性
volatile E item;
volatile Node<E> next; /**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
} boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
} void lazySetNext(Node<E> val) {
// putOrderedXXX是putXXXVolatile的延遲版本,設定某個值不會被其他執行緒立即看到(可見性)
// putOrderedXXX設定的值的修飾應該是volatile,這樣該方法才有用 // 關於為什麼使用這個方法,主要目的肯定是提高效率,但是具體原理,我只能告訴大家跟記憶體屏障有關(我也不太清楚這一塊,待我研究後,再寫一篇文章)
UNSAFE.putOrderedObject(this, nextOffset, val);
} boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
} // Unsafe類中的東西,可以去了解一下 private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset; static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}

構造器1:

    // private transient volatile Node<E> head;
// private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

構造器2:

public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}

下面開始講方法,從offerpollpeek從這幾個方法入手

2.2 offer

新增元素到隊尾。因為佇列是無界的,這個方法永遠不會返回false

分為三種情況進行分析(一定自己跟著程式碼debug,一步步的走)

  1. 單執行緒時(使用IDEA debug一直進入的是 else if把我搞迷茫了,我會寫一個部落格來解釋原因
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("A");
queue.offer("B");

以上面的程式碼,分析每一個步驟。

執行建構函式後:

此時連結串列的head與tail指向哨兵節點

插入"A", 此時沒有設定tail('兩跳機制',這裡的原因後面詳見)

插入"B",

單執行緒情況比較簡單

  1. 多執行緒offer時
 public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
// 只有一個執行緒能夠CAS成功,其餘的都重試
if (p.casNext(null, newNode)) { // 延遲設定tail,第一個node入隊不會設定tail,第二個node入隊才會設定tail
//以此類推, '兩跳機制'
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// 這裡是有其他執行緒正在poll操作才會進入,此時只考慮多執行緒offer的情況,暫不分析
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 存在tail被更改前,和更改後的兩種情況
p = (p != t && t != (t = tail)) ? t : q;
}
}

結合上面的程式碼,看圖

  • 步驟一執行緒A執行緒B都執行到
   if (p.casNext(null, newNode))

  • 步驟二,只有一個執行緒執行成功,假設執行緒A成功,執行緒B失敗



    因為p(a) == t(a), 此時不執行casTailtail不變。q = p.next, 所以此時q(b) = Node2 ,那麼 p(b) != q(b), 執行緒B執行p = (p != t && t != (t = tail)) ? t : q;

執行緒B即將執行

   p = (p != t && t != (t = tail)) ? t : q;
  • 步驟三 此時執行緒C進入。

    此時,p(c) != q(c), 執行緒C執行
   p = (p != t && t != (t = tail)) ? t : q;

執行完後,q(c)賦值給p(c). 再次迴圈,此時,q(c) == null, 設定p(c)的next,執行緒C將值入隊

  • 步驟四 p(c) != t(c), 執行緒C執行casTail(t, newNode), 執行緒C設定尾結點

  • 此時執行緒B執行
   p = (p != t && t != (t = tail)) ? t : q;

因為p(b) == t(b),所以 q(b) 賦值給 p(b)。繼續迴圈,最後得到

  1. 多執行緒的另一種情況,回到步驟三,此時執行緒C把值入隊了,但是還沒有設定tail

  • 執行緒B,將值入隊成功

    步驟三的基礎上,執行緒B入隊成功後,目前的狀況如下:

此時,執行緒C執行casTail(t, newNode),但是現在的tail != t(c), CAS失敗, 直接返回。

2.2.1 小結

上面不管是多執行緒還是單執行緒,都是努力的去尋找next為null的節點,若為next節點為null,再判斷是否滿足設定tail的條件。

多執行緒offer的第一種情況存在設定tail滯後的問題,我把它稱之為"兩跳機制",後面講使用這種機制的原因。

我們看到上面的情況一直沒有進入else if (p == q)分支,進入else if分支只會發生在有其他執行緒在poll時,我們先講講poll,再講講何時進入else if分支。

2.3 poll

刪除並返回頭結點的值

簡單提一下單執行緒多執行緒poll,著重分析一下polloffer共存的情況

  1. 單執行緒時



    單執行緒比較簡單,就不畫圖了,按照上面的queue,進行一步一步的debug就行了

  2. 多執行緒,只有poll

 public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item; // casItem這裡只有一個執行緒能夠成功,其餘的繼續下面的程式碼
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
    final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
// 將之前的頭節點,自己指向自己,等待被GC
h.lazySetNext(h);
}

從上面程式碼可以看出,修改itemhead都會使用CAS,這些變數都是被volatile修飾,所以保證了這些變數的執行緒安全性。不管是單執行緒還是多執行緒的poll,它們都是去尋找一個有效的頭節點,刪除並返回該值,若不是有效的就繼續找,若佇列為空了,就返回null

最後分析一下,offerpoll共存的情況

  • 執行緒Aoffer操作,執行緒Bpoll操作,初始的狀態如下:

  • 執行緒A進入。

  • 執行緒A將要執行

Node<E> q = p.next;

執行緒B進入,進行poll操作

此時,執行緒B執行了一次內迴圈,將q(b)賦值給了p(b);

  • 執行緒B再次執行內迴圈,此時將p(b).item置空,將p(b)賦值給head,之前的h(b)next指向自己,執行緒B退出

  • 執行緒A執行

  Node<E> q = p.next;

此時,p(a).next 指向自己(等待被GC), 進入else if (p == q)分支,執行緒A退出,經過一番執行後,最後得到的狀態,如下:

進入else if (p == q)分支的情況,只會發生在polloffer共存的情況下。

2.4 peek

獲取首個有效的節點,並返回

public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

peekpoll的操作類似,這裡就貼一下程式碼就是了。

3. 總結

ConcurrentLinkedQueue是使用非阻塞的方式保證執行緒的安全性,在設定關係到整個Queue結構的變數時(這些變數都被volatile修飾),都使用CAS的方式對它們進行賦值。

  • size方法是執行緒不安全的,返回的結果可能不準確

關於“兩跳機制”(自己取得名字),

Both head and tail are permitted to lag. In fact, failing to update them every time one could is a significant optimization (fewer CASes). As with LinkedTransferQueue (see the internal documentation for that class), we use a slack threshold of two; that is, we update head/tail when the current pointer appears to be two or more steps away from the first/last node.

Since head and tail are updated concurrently and independently, it is possible for tail to lag behind head (why not)? -- ConcurrentLinkedQueue

大致意思,headtail允許被延遲設定。不是每次更新它們是一個重大的優化,這樣做就可以更少的CAS(這樣在很多執行緒使用時,積少成多,效率更高)。它的延遲閾值是2,設定head/tail時,當前的結點離first/last有兩步或更多的距離。 這就是“兩跳機制

我們想不通的地方,可能是這個類或方法的一個優化的地方。向著大佬看齊~

4. 引用

Java多執行緒 39 - ConcurrentLinkedQueue詳解,講的非常好,上面的思路是跟著他來的