JUC原始碼分析-集合篇(六):LinkedTransferQueue
LinkedTransferQueue 是單向連結串列結構的無界阻塞佇列, 從JDK1.7開始加入到J.U.C的行列中。通過 CAS 和 LockSupport 實現執行緒安全,元素操作按照 FIFO (first-in-first-out 先入先出) 的順序。記憶體一致性遵循對LinkedTransferQueue的插入操作先行發生於(happen-before)訪問或移除操作。相對於其他傳統 Queue,LinkedTransferQueue 有它獨特的性質,本章將對其進行詳細的講解。
概述
LinkedTransferQueue(後稱LTQ) 採用一種預佔模式。意思就是消費者執行緒取元素時,如果佇列為空,那就生成一個節點(節點元素為null)入隊,然後消費者執行緒被等待在這個節點上,後面生產者執行緒入隊時發現有一個元素為null的節點,生產者執行緒就不入隊了,直接就將元素填充到該節點,並喚醒該節點等待的執行緒,被喚醒的消費者執行緒取走元素,從呼叫的方法返回。我們稱這種節點操作為“匹配”方式。
LTQ的演算法實現可以總結為以下幾點:
-
雙重佇列: 和典型的單向連結串列結構不同,LTQ 的 Node 儲存了一個
isData
的 boolean 型欄位,也就是說它的節點可以代表一個數據或者是一個請求,稱為雙重佇列(Dual Queue)。上面說過,在消費者獲取元素時,如果佇列為空,當前消費者就會作為一個“元素為null”的節點被放入佇列中等待,所以 LTQ中 的節點儲存了生產者節點(item不為null)和消費者節點(item為null),這兩種節點就是通過isData
來區分的。 -
鬆弛度: 為了節省 CAS 操作的開銷,LTQ 引入了“鬆弛度”的概念:在節點被匹配(被刪除)之後,不會立即更新head/tail,而是當 head/tail 節點和最近一個未匹配的節點之間的距離超過一個“鬆弛閥值
-
節點自連結: 已匹配節點的 next 引用會指向自身。 如果GC延遲迴收,已刪除節點鏈會積累的很長,此時垃圾收集會耗費高昂的代價,並且所有剛匹配的節點也不會被回收。為了避免這種情況,我們在 CAS 向後推進 head 時,會把已匹配的 head 的"next"引用指向自身(即“自連結節點”),這樣就限制了連線已刪除節點的長度(我們也採取類似的方法,清除在其他節點欄位中可能的垃圾保留值)。如果在遍歷時遇到一個自連結節點,那就表明當前執行緒已經滯後於另外一個更新 head 的執行緒,此時就需要重新獲取 head 來遍歷。
所以,在 LTQ 中,資料在某個執行緒的“某一時刻”可能存在下面這種形式:
LinkedTransferQueue 資料形式
unmatched node:未被匹配的節點。可能是一個生產者節點(item不為null),也可能是一個消費者節點(item為null)。matched node:已經被匹配的節點。可能是一個生產者節點(item不為null)的資料已經被一個消費者拿走;也可能是一個消費者節點(item為null)已經被一個生產者填充上資料。
資料結構
LinkedTransferQueue 繼承關係
LTQ 繼承自AbstractQueue,支援傳統Queue的所有操作;實現了 TransferQueue 介面,並且是 TransferQueue 的唯一實現,TransferQueue 定義了一種“預佔模式”,允許消費者在節點上等待,直到生產者把元素放入節點。
核心引數
//佇列頭節點,第一次入列之前為空
transient volatile Node head;
//佇列尾節點,第一次新增節點之前為空
private transient volatile Node tail;
//累計到一定次數再清除無效node
private transient volatile int sweepVotes;
//當一個節點是佇列中的第一個waiter時,在多處理器上進行自旋的次數(隨機穿插呼叫thread.yield)
private static final int FRONT_SPINS = 1 << 7;
// 當前繼節點正在處理,當前節點在阻塞之前的自旋次數,也為FRONT_SPINS
// 的位變化充當增量,也可在自旋時作為yield的平均頻率
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
//sweepVotes的閥值
static final int SWEEP_THRESHOLD = 32;
/*
* Possible values for "how" argument in xfer method.
* xfer方法型別
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
這裡我們重點說一下sweepVotes這個屬性,其他的都很簡單,就不一一介紹了。
上面我們提到,head/tail 節點並不是及時更新的,在併發操作時連結串列內部可能存在已匹配節點,此時就需要一個閥值來決定何時清除已匹配的內部節點鏈,這就是sweepVotes
和SWEEP_THRESHOLD
的作用。
我們通過節點自連結的方式來減少垃圾滯留,同樣也會解除內部已移除節點的連結。在匹配超時、執行緒中斷或呼叫remove
時,這也些節點也會被清除(解除連結)。例如,在某一時刻有一個節點 s 已經被移除,我們可以通過 CAS 修改 s 的前繼節點的 next 引用的方式來解除 s 的連結。 但是有兩種情況並不能保證節點 s 被解除連結:
1. 如果 s 節點是一個 next 為 null 的節點(trailing node),但是它被作為入列時的目標節點,所以只有在其他節點入列之後才能移除它
2. 通過給定 s 的前繼節點,不一定會移除 s 節點:因為前繼節點有可能已經被解除連結,這種情況下前繼節點的前繼節點有可能指向了s。
所以,通過這兩點,說明在 s 節點或它的前繼節點已經出列時,並不是必須要移除它們。對於這些情況,我們記錄了一個解除節點連結失敗的值-sweepVotes,並且為其定義了一個閥值-SWEEP_THRESHOLD,當解除連結失敗次數超過這個閥值時就會對佇列進行一次“大掃除”(通過sweep()
方法),解除所有已取消的節點連結。
xfer方法型別:
在 LTQ 中,所有的入隊/出隊操作都是通過xfer
方法來控制,並且通過一個型別區分offer, put, poll, take, transfer
,這樣做大大簡化了程式碼。來看一下xfer
的方法型別:NOW
:不等待,直接返回匹配結果。用在poll, tryTransfer
中。ASYNC
:非同步操作,直接把元素新增到佇列尾,不等待匹配。用在offer, put, add
中。SYNC
:等待元素被消費者接收。用在transfer, take
中。TIMED
:附帶超時時間的NOW
,等待指定時間後返回匹配結果。用在附帶超時時間的poll, tryTransfer
中。
原始碼解析
由於 LTQ 的入列/出列方法都是由xfer
來實現,所以我們這裡只對xfer
進行解析。
xfer(E e, boolean haveData, int how, long nanos)
/**
* Implements all queuing methods. See above for explanation.
*
* @param e the item or null for take
* @param haveData true if this is a put, else a take
* @param how NOW, ASYNC, SYNC, or TIMED
* @param nanos timeout in nanosecs, used only if mode is TIMED
* @return an item if matched, else e
* @throws NullPointerException if haveData mode but e is null
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//從head開始向後匹配
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
if (item != p && (item != null) == isData) { // 找到有效節點,進入匹配
if (isData == haveData) //節點與此次操作模式一致,無法匹配 can't match
break;
if (p.casItem(item, e)) { // 匹配成功,cas修改為指定元素 match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {//更新head為匹配節點的next節點
h.forgetNext();//舊head節點指向自身等待回收
break;
} // cas失敗,重新獲取head advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())//如果head的next節點未被匹配,跳出迴圈,不更新head,也就是鬆弛度<2
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);//喚醒在節點上等待的執行緒
return LinkedTransferQueue.<E>cast(item);
}
}
//匹配失敗,繼續向後查詢節點
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
//未找到匹配節點,把當前節點加入到佇列尾
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
//將新節點s新增到佇列尾並返回s的前繼節點
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; //與其他不同模式執行緒競爭失敗重新迴圈 lost race vs opposite mode
if (how != ASYNC)//同步操作,等待匹配
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
說明:xfer
的基本流程如下:
- 從head開始向後匹配,找到一個節點模式跟本次操作的模式不同的未匹配的節點(生產或消費)進行匹配;
- 匹配節點成功 CAS 修改匹配節點的 item 為給定元素 e;
- 如果此時所匹配節點向後移動,則 CAS 更新 head 節點為匹配節點的 next 節點,舊 head 節點連結指向自身等待被回收(
forgetNext()
方法);如果CAS 失敗,並且鬆弛度大於等於2,就需要重新獲取 head 重試。 - 匹配成功,喚醒匹配節點 p 的等待執行緒
waiter
,返回匹配的 item。 - 如果在上述操作中沒有找到匹配節點,則根據引數
how
做不同的處理: NOW:立即返回。 SYNC:通過tryAppend
方法插入一個新的節點 s(item=e,isData = haveData
)到佇列尾,然後自旋或阻塞當前執行緒直到節點被匹配或者取消返回。 ASYNC:通過tryAppend
方法插入一個新的節點 s(item=e,isData = haveData
)到佇列尾,非同步直接返回。 TIMED:通過tryAppend
方法插入一個新的節點 s(item=e,isData = haveData
)到佇列尾,然後自旋或阻塞當前執行緒直到節點被匹配或者取消或等待超時返回。
tryAppend(Node s, boolean haveData)
/**
* Tries to append node s as tail.
* 嘗試新增給定節點s作為尾節點
*
* @param s the node to append
* @param haveData true if appending in data mode
* @return null on failure due to losing race with append in
* different mode, else s's predecessor, or s itself if no
* predecessor
*/
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
if (p == null && (p = head) == null) {//head和tail都為null
if (casHead(null, s))//修改head為新節點s
return s; // initialize
}
else if (p.cannotPrecede(haveData))
return null; // lost race vs opposite mode
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
else {
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}
說明:新增給定節點 s 到佇列尾並返回 s 的前繼節點,失敗時(與其他不同模式執行緒競爭失敗)返回null,沒有前繼節點返回自身。
awaitMatch(Node s, Node pred, E e, boolean timed, long nanos)
/**
* Spins/yields/blocks until node s is matched or caller gives up.
* 自旋/讓步/阻塞,直到給定節點s匹配到或放棄匹配
*
* @param s the waiting node
* @param pred the predecessor of s, or s itself if it has no
* predecessor, or null if unknown (the null case does not occur
* in any current calls but may in possible future extensions)
* @param e the comparison value for checking match
* @param timed if true, wait only until timeout elapses
* @param nanos timeout in nanosecs, used only if timed is true
* @return matched item, or e if unmatched on interrupt or timeout
*/
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//在首個item和取消檢查後初始
int spins = -1; // initialized after first item and cancel checks
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
if (item != e) { //matched
// assert item != s;
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item);
}
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { //取消匹配,item指向自身 cancel
unsplice(pred, s);//解除s節點和前繼節點的連結
return e;
}
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); //不定期讓步,給其他執行緒執行機會 occasionally yield
}
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
LockSupport.park(this);
}
}
}
說明:當前操作為同步操作時,會呼叫awaitMatch
方法阻塞等待匹配,成功返回匹配節點 item,失敗返回給定引數e(s.item)。在等待期間如果執行緒被中斷或等待超時,則取消匹配,並呼叫unsplice
方法解除節點s
和其前繼節點的連結。
/**
* Unsplices (now or later) the given deleted/cancelled node with
* the given predecessor.
*
* 解除給定已經被刪除/取消節點和前繼節點的連結(可能延遲解除)
* @param pred a node that was at one time known to be the
* predecessor of s, or null or s itself if s is/was at head
* @param s the node to be unspliced
*/
final void unsplice(Node pred, Node s) {
s.forgetContents(); // forget unneeded fields
if (pred != null && pred != s && pred.next == s) {
Node n = s.next;
if (n == null ||
(n != s && pred.casNext(s, n) && pred.isMatched())) {//解除s節點的連結
for (;;) { // check if at, or could be, head
Node h = head;
if (h == pred || h == s || h == null)
return; // at head or list empty
if (!h.isMatched())
break;
Node hn = h.next;
if (hn == null)
return; // now empty
if (hn != h && casHead(h, hn))//更新head
h.forgetNext(); // advance head
}
if (pred.next != pred && s.next != s) { // recheck if offlist
for (;;) { // sweep now if enough votes
int v = sweepVotes;
if (v < SWEEP_THRESHOLD) {
if (casSweepVotes(v, v + 1))
break;
}
else if (casSweepVotes(v, 0)) {//達到閥值,進行"大掃除",清除佇列中的無效節點
sweep();
break;
}
}
}
}
}
}
說明:首先把給定節點s
的next引用指向自身,如果s
的前繼節點pred
還是指向s
(pred.next == s
),嘗試解除s
的連結,把pred
的 next 引用指向s
的 next 節點。如果s
不能被解除(由於它是尾節點或者pred
可能被解除連結,並且pred
和s
都不是head
節點或已經出列),則新增到sweepVotes
,sweepVotes
累計到閥值SWEEP_THRESHOLD
之後就呼叫sweep()
對佇列進行一次“大掃除”,清除佇列中所有的無效節點。sweep()
原始碼如下:
/**
* Unlinks matched (typically cancelled) nodes encountered in a
* traversal from head.
* 解除(通常是取消)從頭部遍歷時遇到的已經被匹配的節點的連結
*/
private void sweep() {
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
if (!s.isMatched())
// Unmatched nodes are never self-linked
p = s;
else if ((n = s.next) == null) // trailing node is pinned
break;
else if (s == n) // stale
// No need to also check for p == s, since that implies s == n
p = head;
else
p.casNext(s, n);
}
}
小結
本章重點:理解 LinkedTransferQueue 的特性:雙重佇列、鬆弛度、節點的移除操作。 在 ConcurrentLinkedQueue 、 ConcurrentLinkeDeque 以及 SynchronousQueue 中都用到了 LinkedTransferQueue 的某些特性,如果同學們對它們感興趣,理解本章對之後的原始碼解析會有很大的幫助。
作者:泰迪的bagwell 連結:https://www.jianshu.com/p/42ceaed2afe6 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。