Java多執行緒進階(三八)—— J.U.C之collections框架:LinkedTransferQueue
一、LinkedTransferQueue簡介
LinkedTransferQueue
是在JDK1.7時,J.U.C包新增的一種比較特殊的阻塞佇列,它除了具備阻塞佇列的常用功能外,還有一個比較特殊的transfer
方法。
我們知道,在普通阻塞佇列中,當佇列為空時,消費者執行緒(呼叫take或poll方法的執行緒)一般會阻塞等待生產者執行緒往佇列中存入元素。而LinkedTransferQueue的transfer方法則比較特殊:
- 當有消費者執行緒阻塞等待時,呼叫transfer方法的生產者執行緒不會將元素存入佇列,而是直接將元素傳遞給消費者;
- 如果呼叫transfer方法的生產者執行緒發現沒有正在等待的消費者執行緒,則會將元素入隊,然後會阻塞等待,直到有一個消費者執行緒來獲取該元素。
TransferQueue介面
可以看到,LinkedTransferQueue實現了一個名為TransferQueue
的介面,TransferQueue也是JDK1.7時J.U.C包新增的介面,正是該介面提供了上述的transfer方法:
除了transfer方法外,TransferQueue還提供了兩個變種方法:tryTransfer(E e)
、tryTransfer(E e, long timeout, TimeUnit unit)
。
tryTransfer(E e)當生產者執行緒呼叫tryTransfer方法時,如果沒有消費者等待接收元素,則會立即返回false。該方法和transfer方法的區別就是tryTransfer方法無論消費者是否接收,方法立即返回,而transfer方法必須等到消費者消費後才返回。
tryTransfer(E e, long timeout, TimeUnit unit)tryTransfer(E e,long timeout,TimeUnit unit)方法則是加上了限時等待功能,如果沒有消費者消費該元素,則等待指定的時間再返回;如果超時還沒消費元素,則返回false,如果在超時時間內消費了元素,則返回true。
TransferQueue介面定義:
LinkedTransferQueue的特點簡要概括如下:
- LinkedTransferQueue是一種無界阻塞佇列,底層基於單鏈表實現;
- LinkedTransferQueue中的結點有兩種型別:資料結點、請求結點;
- LinkedTransferQueue基於無鎖演算法實現。
二、LinkedTransferQueue原理
內部結構
LinkedTransferQueue提供了兩種構造器,也沒有引數設定佇列初始容量,所以是一種無界佇列:
/**
* 佇列結點定義.
*/
static final class Node {
final boolean isData; // true: 資料結點; false: 請求結點
volatile Object item; // 結點值
volatile Node next; // 後驅結點指標
volatile Thread waiter; // 等待執行緒
// 設定當前結點的後驅結點為val
final boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 設定當前結點的值為val
final boolean casItem(Object cmp, Object val) {
// assert cmp == null || cmp.getClass() != Node.class;
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
// 設定當前結點的後驅結點為自身
final void forgetNext() {
UNSAFE.putObject(this, nextOffset, this);
}
/**
* 設定當前結點的值為自身.
* 設定當前結點的等待執行緒為null.
*/
final void forgetContents() {
UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
/**
* 判斷當前結點是否匹配成功.
* Node.item == this || (Node.isData == true && Node.item == null)
*/
final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
/**
* 判斷是否為未匹配的請求結點.
* Node.isData == false && Node.item == null
*/
final boolean isUnmatchedRequest() {
return !isData && item == null;
}
/**
* 當該結點(havaData)是未匹配結點, 且與當前的結點型別不同時, 返回true.
*/
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
/**
* 嘗試匹配資料結點.
*/
final boolean tryMatchData() {
// assert isData; 當前結點必須為資料結點
Object x = item;
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter); // 喚醒等待執行緒
return true;
}
return false;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
private static final long waiterOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
waiterOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiter"));
} catch (Exception e) {
throw new Error(e);
}
}
}
關於Node結點,有以下幾點需要特別注意:
- Node結點有兩種型別:資料結點、請求結點,通過欄位
isData
區分,只有不同型別的結點才能相互匹配; - Node結點的值儲存在
item
欄位,匹配前後值會發生變化;
Node結點的狀態變化如下表:
結點/狀態 | 資料結點 | 請求結點 |
---|---|---|
匹配前 | isData = true; item = 資料結點值 | isData = false; item = null |
匹配後 | isData = true; item = null | isData = false; item = this |
從上表也可以看出,對於一個數據結點,當item == null
表示匹配成功;對於一個請求結點,當item == this
表示匹配成功。歸納起來,匹配成功的結點Node就是滿足(Node.item == this) || ((Node.item == null) == Node.isData)
。
LinkedTransferQueue內部的其餘欄位定義如下,主要就是通過Unsafe類操作欄位值,內部定義了很多常量欄位,比如自旋,這些都是為了非阻塞演算法的鎖優化而定義的:
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {
/**
* True如果是多核CPU
*/
private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;
/**
* 執行緒自旋次數(僅多核CPU時用到).
*/
private static final int FRONT_SPINS = 1 << 7;
/**
* 執行緒自旋次數(僅多核CPU時用到).
*/
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
/**
* The maximum number of estimated removal failures (sweepVotes)
* to tolerate before sweeping through the queue unlinking
* cancelled nodes that were not unlinked upon initial
* removal. See above for explanation. The value must be at least
* two to avoid useless sweeps when removing trailing nodes.
*/
static final int SWEEP_THRESHOLD = 32;
/**
* 隊首結點指標.
*/
transient volatile Node head;
/**
* 隊尾結點指標.
*/
private transient volatile Node tail;
/**
* The number of apparent failures to unsplice removed nodes
*/
private transient volatile int sweepVotes;
// CAS設定隊尾tail指標為val
private boolean casTail(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
// CAS設定隊首head指標為val
private boolean casHead(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
private boolean casSweepVotes(int cmp, int val) {
return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
}
/*
* xfer方法的入參, 不同型別的方法內部呼叫xfer方法時入參不同.
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long sweepVotesOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = LinkedTransferQueue.class;
headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));
sweepVotesOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("sweepVotes"));
} catch (Exception e) {
throw new Error(e);
}
}
//...
}
上述比較重要的就是4個常量值的定義:
/*
* xfer方法的入參, 不同型別的方法內部呼叫xfer方法時入參不同.
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
這四個常量值,作為xfer
方法的入參,用於標識不同操作型別。其實從常量的命名也可以看出它們對應的操作含義:
NOW表示即時操作(可能失敗),即不會阻塞呼叫執行緒:poll(獲取並移除隊首元素,如果佇列為空,直接返回null);tryTransfer(嘗試將元素傳遞給消費者,如果沒有等待的消費者,則立即返回false,也不會將元素入隊)
ASYNC表示非同步操作(必然成功):offer(插入指定元素至隊尾,由於是無界佇列,所以會立即返回true);put(插入指定元素至隊尾,由於是無界佇列,所以會立即返回);add(插入指定元素至隊尾,由於是無界佇列,所以會立即返回true)
SYNC表示同步操作(阻塞呼叫執行緒):transfer(阻塞直到出現一個消費者執行緒);take(從隊首移除一個元素,如果佇列為空,則阻塞執行緒)
TIMED表示限時同步操作(限時阻塞呼叫執行緒):poll(long timeout, TimeUnit unit);tryTransfer(E e, long timeout, TimeUnit unit)
關於xfer
方法,它是LinkedTransferQueued的核心內部方法,我們後面會詳細介紹。
transfer方法
transfer
方法,用於將指定元素e傳遞給消費者執行緒(呼叫take/poll方法)。如果有消費者執行緒正在阻塞等待,則呼叫transfer方法的執行緒會直接將元素傳遞給它;如果沒有消費者執行緒等待獲取元素,則呼叫transfer方法的執行緒會將元素插入到隊尾,然後阻塞等待,直到出現一個消費者執行緒獲取元素:
/**
* 將指定元素e傳遞給消費者執行緒(呼叫take/poll方法).
*/
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
// 進入到此處, 說明呼叫執行緒被中斷了
Thread.interrupted(); // 清除中斷狀態, 然後丟擲中斷異常
throw new InterruptedException();
}
}
transfer方法的內部實際是呼叫了xfer方法,入參為SYNC=2
:
/**
* 入隊/出隊元素的真正實現.
*
* @param e 入隊操作, e非null; 出隊操作, e為null
* @param haveData true表示入隊元素, false表示出隊元素
* @param how NOW, ASYNC, SYNC, TIMED 四種常量定義
* @param nanos 限時模式下使用(納秒)
* @return 匹配成功則返回匹配的元素, 否則返回e本身
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null)) // 入隊操作, 元素e不能為null
throw new NullPointerException();
Node s = null;
retry:
for (; ; ) {
for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點
boolean isData = p.isData; // 結點型別
Object item = p.item; // 結點值
if (item != p && (item != null) == isData) { // 如果結點還未匹配過
if (isData == haveData) // 同種型別結點不能匹配
break;
if (p.casItem(item, e)) { // p指向從隊首開始向後的第一個匹配結點
for (Node q = p; q != h; ) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待執行緒
return LinkedTransferQueue.<E>cast(item); // 返回匹配結點的值
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) {
if (s == null)
s = new Node(e, haveData); // 建立一個入隊結點, 新增到隊尾
Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(佇列中只有一個結點)或null(tryAppend失敗)
if (pred == null)
continue retry; // 入隊失敗,則重試
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊執行緒
}
return e;
}
}
我們通過示例看下xfer方法到底做了哪些事:
①佇列初始狀態
②ThreadA執行緒呼叫transfer入隊元素“9”
注意,此時入隊一個數據結點,且佇列為空,所以會直接進入xfer中的下述程式碼:
if (how != NOW) {
if (s == null)
s = new Node(e, haveData); // 建立一個入隊結點, 新增到隊尾
Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(佇列中只有一個結點)或null(tryAppend失敗)
if (pred == null)
continue retry; // 入隊失敗,則重試
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊執行緒
}
上述程式碼會插入一個結點至隊尾,然後執行緒進入阻塞,等待一個出隊執行緒(消費者)的到來。
隊尾插入結點的方法是tryAppend
,由於此時佇列為空,會進入CASE1分支,設定隊首指標head指向新結點,tryAppend方法的返回值有三種情況:
- 入隊失敗,返回null;
- 入隊成功且佇列只有一個結點,返回該結點自身;
- 入隊成功且佇列不止一個結點,返回該入隊結點的前驅結點。
/**
* 嘗試將結點s新增到隊尾.
*
* @param s 待新增的結點
* @param haveData true: 資料結點
* @return 返回null表示失敗; 否則返回s的前驅結點(沒有前驅則返回s自身)
*/
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t; ; ) {
Node n, u;
if (p == null && (p = head) == null) { // CASE1: 佇列為空
if (casHead(null, s)) // 設定隊首指標head
return s;
} else if (p.cannotPrecede(haveData)) // CASE2: 結點s不能連結到結點p
return null;
else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結點
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s)) // CASE4: 插入結點s
p = p.next; // re-read on CAS failure
else { // CASE5: 嘗試進行鬆弛操作
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t) ;
}
return p;
}
}
}
等待出隊執行緒方法awaitMatch
,該方法核心作用就是進行結點匹配:
- 匹配成功,返回匹配值;
- 匹配失敗(中斷或限時等待的超時情況),返回原匹配結點的值;
- 阻塞執行緒,等待與之匹配的結點的到來。
從awaitMatch方法其實可以看到一種經典的“鎖優化”思路,就是 自旋 -> yield -> 阻塞,執行緒不會立即進入阻塞,因為執行緒上下文切換的開銷往往比較大,所以會先自旋一定次數,中途可能伴隨隨機的yield操作,讓出cpu時間片,如果自旋次數用完後,還是沒有匹配執行緒出現,再真正阻塞執行緒。
經過上述步驟,ThreadA最終會進入CASE4分支中等待,此時的佇列結構如下:
注意,此時的佇列中tail隊尾指標並不指向結點“9”,這是一種“鬆弛”策略,後面會講到。
③ThreadB執行緒呼叫transfer入隊元素“2”
由於此時隊首head指標不為null,所以會進入transfer方法中的以下迴圈:
for (Node h = head, p = h; p != null; ) {
boolean isData = p.isData; // 結點型別
Object item = p.item; // 結點值
if (item != p && (item != null) == isData) { // 如果結點還未匹配過
if (isData == haveData) // 同種型別結點不能匹配
break;
if (p.casItem(item, e)) { // match
for (Node q = p; q != h; ) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
上述方法會讀取隊首結點,判斷該結點有沒被匹配過(item != p && (item != null) == isData
):
- 如果已經被其它執行緒匹配過了,則繼續判斷下一個結點(
p.next
); - 如果還沒有被匹配,則判斷下當前的入隊結點型別是否和隊首中的一致;如果一致(
isData == haveData
)就匹配失敗,跳出迴圈,否則進行匹配操作。
顯然,目前隊首結點是“資料結點”,ThreadB執行緒的入隊結點也是“資料結點”,結點型別一致,所以匹配失敗,直接跳過迴圈,也進入以下程式碼塊:
if (how != NOW) {
if (s == null)
s = new Node(e, haveData); // 建立一個入隊結點, 新增到隊尾
Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(佇列中只有一個結點)或null(tryAppend失敗)
if (pred == null)
continue retry; // 入隊失敗,則重試
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊執行緒
}
再次呼叫tryAppend方法, 會在CASE4分支中將元素“2”插入隊尾,然後在CASE5分支中重新設定隊尾指標tail
:
/**
* 嘗試將結點s新增到隊尾.
*
* @param s 待新增的結點
* @param haveData true: 資料結點
* @return 返回null表示失敗; 否則返回s的前驅結點(沒有前驅則返回s自身)
*/
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t; ; ) {
Node n, u;
if (p == null && (p = head) == null) { // CASE1: 佇列為空
if (casHead(null, s)) // 設定隊首指標head
return s;
} else if (p.cannotPrecede(haveData)) // CASE2: 結點s不能連結到結點p
return null;
else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結點
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s)) // CASE4: 插入結點s
p = p.next; // re-read on CAS failure
else { // CASE5: 嘗試進行鬆弛操作
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t) ;
}
return p;
}
}
}
此時佇列結構如下:
最終,ThreadB也會在awaitMatch方法中進入阻塞,最終佇列結構如下:
④ThreadC執行緒呼叫transfer入隊元素“93”
過程和前幾步幾乎相同,不再贅述,最終佇列結構如下:
可以看到,隊尾指標tail
的設定實際是滯後的,這是一種“鬆弛”策略,用以提升無鎖演算法併發修改過程中的效能。
take方法
再來看下消費者執行緒呼叫的take
方法,該方法會從隊首取出一個元素,如果佇列為空,則執行緒會阻塞:
/**
* 從隊首出隊一個元素.
*/
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0); // (e == null && isData=false)表示一個請求結點
if (e != null) // 如果e!=null, 則表示匹配成功, 此時e為與之匹配的資料結點的值
return e;
Thread.interrupted();
throw new InterruptedException();
}
內部依然呼叫了xfer方法,不過此時入參有所不同,由於是消費執行緒呼叫,所以入參e == null && hasData == false
,表示一個“請求結點”:
/**
* 入隊/出隊元素的真正實現.
*
* @param e 入隊操作, e非null; 出隊操作, e為null
* @param haveData true表示入隊元素, false表示出隊元素
* @param how NOW, ASYNC, SYNC, TIMED 四種常量定義
* @param nanos 限時模式下使用(納秒)
* @return 匹配成功則返回匹配的元素, 否則返回e本身
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null)) // 入隊操作, 元素e不能為null
throw new NullPointerException();
Node s = null;
retry:
for (; ; ) {
for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點
boolean isData = p.isData; // 結點型別
Object item = p.item; // 結點值
if (item != p && (item != null) == isData) { // 如果結點還未匹配過
if (isData == haveData) // 同種型別結點不能匹配
break;
if (p.casItem(item, e)) { // p指向從隊首開始向後的第一個匹配結點
for (Node q = p; q != h; ) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待執行緒
return LinkedTransferQueue.<E>cast(item); // 返回匹配結點的值
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) {
if (s == null)
s = new Node(e, haveData); // 建立一個入隊結點, 新增到隊尾
Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(佇列中只有一個結點)或null(tryAppend失敗)
if (pred == null)
continue retry; // 入隊失敗,則重試
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊執行緒
}
return e;
}
}
還是通過示例看:
①佇列初始狀態
②ThreadD呼叫take方法,消費元素
此時,在xfer方法中,會從隊首開始,向後找到第一個匹配結點,並交換元素值,然後喚醒佇列中匹配結點上的等待執行緒:
/**
* 入隊/出隊元素的真正實現.
*
* @param e 入隊操作, e非null; 出隊操作, e為null
* @param haveData true表示入隊元素, false表示出隊元素
* @param how NOW, ASYNC, SYNC, TIMED 四種常量定義
* @param nanos 限時模式下使用(納秒)
* @return 匹配成功則返回匹配的元素, 否則返回e本身
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null)) // 入隊操作, 元素e不能為null
throw new NullPointerException();
Node s = null;
retry:
for (; ; ) {
for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點
boolean isData = p.isData; // 結點型別
Object item = p.item; // 結點值
if (item != p && (item != null) == isData) { // 如果結點還未匹配過
if (isData == haveData) // 同種型別結點不能匹配
break;
if (p.casItem(item, e)) { // p指向從隊首開始向後的第一個匹配結點
for (Node q = p; q != h; ) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待執行緒
return LinkedTransferQueue.<E>cast(item); // 返回匹配結點的值
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) {
if (s == null)
s = new Node(e, haveData); // 建立一個入隊結點, 新增到隊尾
Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(佇列中只有一個結點)或null(tryAppend失敗)
if (pred == null)
continue retry; // 入隊失敗,則重試
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊執行緒
}
return e;
}
}
最終佇列結構如下,匹配結點的值被置換為null,ThreadA被喚醒,ThreadD拿到匹配結點上的元素值“9”並返回:
③ThreadA被喚醒後繼續執行
ThreadA被喚醒後,從原阻塞處——繼續向下執行,然後進入下一次自旋,進入CASE1分支:
/**
* 自旋/yield/阻塞,直到結點s被匹配.
*
* @param s 等待被匹配的結點s
* @param pred s的前驅結點或s自身(佇列中只有一個結點的情況)
* @param e 結點s的值
* @return 匹配值, 或e本身(中斷或超時情況)
*/
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L; // 限時等待情況下使用
Thread w = Thread.currentThread();
int spins = -1; // 自旋次數, 鎖優化操作
ThreadLocalRandom randomYields = null; // bound if needed
for (; ; ) {
Object item = s.item;
if (item != e) { // CASE1: 匹配成功
// assert item != s;
s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item);
}
if ((w.isInterrupted() || (timed && nanos <= 0))
&& s.casItem(e, s)) { // CASE2: 取消(執行緒被中斷或超時)
unsplice(pred, s);
return e;
}
// CASE3: 設定輕量級鎖(自旋 -> yield)
if (spins < 0) { // 初始化自旋次數
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
} else if (spins > 0) { // 自選次數減1
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // 隨機yield執行緒
} else if (s.waiter == null) { // waiter儲存待阻塞執行緒
s.waiter = w;
} else if (timed) { // 限時等待情況, 計算剩餘有效時間
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
} else { // CASE4: 阻塞執行緒
LockSupport.park(this);
}
}
}
在CASE1分支中,由於結點的item項已經被替換成了null,所以呼叫s.forgetContents()
,並返回null
/**
* 設定當前結點的值為自身.
* 設定當前結點的等待執行緒為null.
*/
final void forgetContents() {
UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
最終佇列結構如下:
④ThreadE呼叫take方法出隊元素
ThreadE呼叫take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false
),由於head指標指向的元素已經匹配過了,所以向後繼續查詢,找到第一個未匹配過的結點“2”,然後置換結點“2”中的元素值為null,喚醒執行緒ThreadB,返回匹配結點的元素值“2”:
for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結點
boolean isData = p.isData; // 結點型別
Object item = p.item; // 結點值
if (item != p && (item != null) == isData) { // 如果結點還未匹配過
if (isData == haveData) // 同種型別結點不能匹配
break;
if (p.casItem(item, e)) { // p指向從隊首開始向後的第一個匹配結點
for (Node q = p; q != h; ) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter); // 喚醒匹配結點上的等待執行緒
return LinkedTransferQueue.<E>cast(item); // 返回匹配結點的值
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
此時佇列狀態如下,可以看到,隊首指標head一次性向後跳了2個位置,原來已經匹配過的元素的next指標指向自身,等待被GC回收,這其實就是LinkedTransferQueue的“鬆弛”策略:
⑤ThreadB被喚醒後繼續執行
過程和步驟③完全相同,在awaitMatch方法中,將結點的item置為this,然後返回匹配結點值——null,最終佇列結構如下:
⑥ThreadF呼叫take方法出隊元素
ThreadF呼叫take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false
),由於head指標指向的元素此時沒有匹配,所以不用像步驟②那樣向後查詢,而是直接置換匹配結點的元素值“93”,然後喚醒ThreadC,返回匹配值“93”。最終佇列結構如下:
⑦ThreadC被喚醒後繼續執行
過程和步驟③完全相同,在awaitMatch方法中,將結點的item置為this,然後返回匹配結點值——null,最終佇列結構如下:
此時的佇列結構,讀者移一定感到非常奇怪,並不嚴格遵守佇列的定義,這其實就是“Dual Queue”演算法的實現,為了對自旋優化,做了很多看似彆扭的操作,不必奇怪。
假設此時再有一個執行緒ThreadH呼叫take方法出隊元素會怎麼樣?其實這是佇列已經空了,ThreadH會被阻塞,但是會建立一個“請求結點”入隊:
/**
* 嘗試將結點s新增到隊尾.
*
* @param s 待新增的結點
* @param haveData true: 資料結點
* @return 返回null表示失敗; 否則返回s的前驅結點(沒有前驅則返回s自身)
*/
private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t; ; ) {
Node n, u;
if (p == null && (p = head) == null) { // CASE1: 佇列為空
if (casHead(null, s)) // 設定隊首指標head
return s;
} else if (p.cannotPrecede(haveData)) // CASE2: 結點s不能連結到結點p
return null;
else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結點
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s)) // CASE4: 插入結點s
p = p.next; // re-read on CAS failure
else { // CASE5: 嘗試進行鬆弛操作
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t) ;
}
return p;
}
}
}
呼叫完tryAppend方法後,佇列結構如下,橙色的為“請求結點”—— item==null && isData==false
:
然後ThreadH也會進入在awaitMatch方法後進入阻塞,並等待一個入隊執行緒的到來。最終佇列結構如下:
三、總結
截止本篇為止,我們已經學習完了juc-collection框架中的所有阻塞佇列,如下表所示:
佇列特性 | 有界佇列 | 近似無界佇列 | 無界佇列 | 特殊佇列 |
---|---|---|---|---|
有鎖演算法 | / | LinkedBlockingQueue、LinkedBlockingDeque | / | PriorityBlockingQueue、DelayQueue |
無鎖演算法 | ArrayBlockingQueue | / | LinkedTransferQueue | SynchronousQueue |
可以看到,LinkedTransferQueue其實兼具了SynchronousQueue的特性以及無鎖演算法的效能,並且是一種無界佇列:
- 和SynchronousQueue相比,LinkedTransferQueue可以儲存實際的資料;
- 和其它阻塞佇列相比,LinkedTransferQueue直接用無鎖演算法實現,效能有所提升。
另外,由於LinkedTransferQueue可以存放兩種不同型別的結點,所以稱之為“Dual Queue”:內部Node結點定義了一個 boolean 型欄位——isData
,表示該結點是“資料結點”還是“請求結點”。
為了節省 CAS 操作的開銷,LinkedTransferQueue使用了鬆弛(slack)操作:在結點被匹配(被刪除)之後,不會立即更新佇列的head、tail,而是當 head、tail結點與最近一個未匹配的結點之間的距離超過“鬆弛閥值”後才會更新(預設為 2)。這個“鬆弛閥值”一般為1到3,如果太大會增加沿連結串列查詢未匹配結點的時間,太小會增加 CAS 的開銷。