1. 程式人生 > >Java多執行緒進階(三八)—— J.U.C之collections框架:LinkedTransferQueue

Java多執行緒進階(三八)—— J.U.C之collections框架:LinkedTransferQueue

一、LinkedTransferQueue簡介

LinkedTransferQueue是在JDK1.7時,J.U.C包新增的一種比較特殊的阻塞佇列,它除了具備阻塞佇列的常用功能外,還有一個比較特殊的transfer方法。

我們知道,在普通阻塞佇列中,當佇列為空時,消費者執行緒(呼叫takepoll方法的執行緒)一般會阻塞等待生產者執行緒往佇列中存入元素。而LinkedTransferQueuetransfer方法則比較特殊:

  1. 當有消費者執行緒阻塞等待時,呼叫transfer方法的生產者執行緒不會將元素存入佇列,而是直接將元素傳遞給消費者;
  2. 如果呼叫transfer方法的生產者執行緒發現沒有正在等待的消費者執行緒,則會將元素入隊,然後會阻塞等待,直到有一個消費者執行緒來獲取該元素。

clipboard.png

TransferQueue介面

可以看到,LinkedTransferQueue實現了一個名為TransferQueue的介面,TransferQueue也是JDK1.7時J.U.C包新增的介面,正是該介面提供了上述的transfer方法:

clipboard.png

除了transfer方法外,TransferQueue還提供了兩個變種方法:tryTransfer(E e)tryTransfer(E e, long timeout, TimeUnit unit)

tryTransfer(E e)當生產者執行緒呼叫tryTransfer方法時,如果沒有消費者等待接收元素,則會立即返回false。該方法和transfer方法的區別就是tryTransfer方法無論消費者是否接收,方法立即返回,而transfer方法必須等到消費者消費後才返回。

tryTransfer(E e, long timeout, TimeUnit unit)tryTransfer(E e,long timeout,TimeUnit unit)方法則是加上了限時等待功能,如果沒有消費者消費該元素,則等待指定的時間再返回;如果超時還沒消費元素,則返回false,如果在超時時間內消費了元素,則返回true。

TransferQueue介面定義:clipboard.png

LinkedTransferQueue的特點簡要概括如下:

  1. LinkedTransferQueue是一種無界阻塞佇列,底層基於單鏈表實現;
  2. LinkedTransferQueue中的結點有兩種型別:資料結點、請求結點;
  3. LinkedTransferQueue基於無鎖演算法實現。

二、LinkedTransferQueue原理

內部結構

LinkedTransferQueue提供了兩種構造器,也沒有引數設定佇列初始容量,所以是一種無界佇列

/**
 * 佇列結點定義.
 */
static final class Node {
    final boolean isData;   // true: 資料結點; false: 請求結點
    volatile Object item;   // 結點值
    volatile Node next;     // 後驅結點指標
    volatile Thread waiter; // 等待執行緒

    // 設定當前結點的後驅結點為val
    final boolean casNext(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // 設定當前結點的值為val
    final boolean casItem(Object cmp, Object val) {
        // assert cmp == null || cmp.getClass() != Node.class;
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    Node(Object item, boolean isData) {
        UNSAFE.putObject(this, itemOffset, item); // relaxed write
        this.isData = isData;
    }

    // 設定當前結點的後驅結點為自身
    final void forgetNext() {
        UNSAFE.putObject(this, nextOffset, this);
    }

    /**
     * 設定當前結點的值為自身.
     * 設定當前結點的等待執行緒為null.
     */
    final void forgetContents() {
        UNSAFE.putObject(this, itemOffset, this);
        UNSAFE.putObject(this, waiterOffset, null);
    }

    /**
     * 判斷當前結點是否匹配成功.
     * Node.item == this || (Node.isData == true && Node.item == null)
     */
    final boolean isMatched() {
        Object x = item;
        return (x == this) || ((x == null) == isData);
    }

    /**
     * 判斷是否為未匹配的請求結點.
     * Node.isData == false && Node.item == null
     */
    final boolean isUnmatchedRequest() {
        return !isData && item == null;
    }

    /**
     * 當該結點(havaData)是未匹配結點, 且與當前的結點型別不同時, 返回true.
     */
    final boolean cannotPrecede(boolean haveData) {
        boolean d = isData;
        Object x;
        return d != haveData && (x = item) != this && (x != null) == d;
    }

    /**
     * 嘗試匹配資料結點.
     */
    final boolean tryMatchData() {
        // assert isData;   當前結點必須為資料結點
        Object x = item;
        if (x != null && x != this && casItem(x, null)) {
            LockSupport.unpark(waiter);     // 喚醒等待執行緒
            return true;
        }
        return false;
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;
    private static final long waiterOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            waiterOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiter"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

關於Node結點,有以下幾點需要特別注意:

  1. Node結點有兩種型別:資料結點、請求結點,通過欄位isData區分,只有不同型別的結點才能相互匹配;
  2. Node結點的值儲存在item欄位,匹配前後值會發生變化;

Node結點的狀態變化如下表:

結點/狀態 資料結點 請求結點
匹配前 isData = true; item = 資料結點值 isData = false; item = null
匹配後 isData = true; item = null isData = false; item = this
從上表也可以看出,對於一個數據結點,當item == null表示匹配成功;對於一個請求結點,當item == this表示匹配成功。歸納起來,匹配成功的結點Node就是滿足(Node.item == this) || ((Node.item == null) == Node.isData)

LinkedTransferQueue內部的其餘欄位定義如下,主要就是通過Unsafe類操作欄位值,內部定義了很多常量欄位,比如自旋,這些都是為了非阻塞演算法的鎖優化而定義的:

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {

    /**
     * True如果是多核CPU
     */
    private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;

    /**
     * 執行緒自旋次數(僅多核CPU時用到).
     */
    private static final int FRONT_SPINS = 1 << 7;

    /**
     * 執行緒自旋次數(僅多核CPU時用到).
     */
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

    /**
     * The maximum number of estimated removal failures (sweepVotes)
     * to tolerate before sweeping through the queue unlinking
     * cancelled nodes that were not unlinked upon initial
     * removal. See above for explanation. The value must be at least
     * two to avoid useless sweeps when removing trailing nodes.
     */
    static final int SWEEP_THRESHOLD = 32;

    /**
     * 隊首結點指標.
     */
    transient volatile Node head;

    /**
     * 隊尾結點指標.
     */
    private transient volatile Node tail;

    /**
     * The number of apparent failures to unsplice removed nodes
     */
    private transient volatile int sweepVotes;

    // CAS設定隊尾tail指標為val
    private boolean casTail(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
    }

    // CAS設定隊首head指標為val
    private boolean casHead(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
    }

    private boolean casSweepVotes(int cmp, int val) {
        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
    }

    /*
     * xfer方法的入參, 不同型別的方法內部呼叫xfer方法時入參不同.
     */
    private static final int NOW = 0;   // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll, tryTransfer

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long sweepVotesOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = LinkedTransferQueue.class;
            headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
            tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));
            sweepVotesOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("sweepVotes"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    //...
}

上述比較重要的就是4個常量值的定義:

/*
 * xfer方法的入參, 不同型別的方法內部呼叫xfer方法時入參不同.
 */
private static final int NOW = 0;   // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer

這四個常量值,作為xfer方法的入參,用於標識不同操作型別。其實從常量的命名也可以看出它們對應的操作含義:

NOW表示即時操作(可能失敗),即不會阻塞呼叫執行緒:poll(獲取並移除隊首元素,如果佇列為空,直接返回null);tryTransfer(嘗試將元素傳遞給消費者,如果沒有等待的消費者,則立即返回false,也不會將元素入隊)

ASYNC表示非同步操作(必然成功):offer(插入指定元素至隊尾,由於是無界佇列,所以會立即返回true);put(插入指定元素至隊尾,由於是無界佇列,所以會立即返回);add(插入指定元素至隊尾,由於是無界佇列,所以會立即返回true)

SYNC表示同步操作(阻塞呼叫執行緒):transfer(阻塞直到出現一個消費者執行緒);take(從隊首移除一個元素,如果佇列為空,則阻塞執行緒)

TIMED表示限時同步操作(限時阻塞呼叫執行緒):poll(long timeout, TimeUnit unit);tryTransfer(E e, long timeout, TimeUnit unit)

關於xfer方法,它是LinkedTransferQueued的核心內部方法,我們後面會詳細介紹。

transfer方法

transfer方法,用於將指定元素e傳遞給消費者執行緒(呼叫take/poll方法)。如果有消費者執行緒正在阻塞等待,則呼叫transfer方法的執行緒會直接將元素傳遞給它;如果沒有消費者執行緒等待獲取元素,則呼叫transfer方法的執行緒會將元素插入到隊尾,然後阻塞等待,直到出現一個消費者執行緒獲取元素:

/**
 * 將指定元素e傳遞給消費者執行緒(呼叫take/poll方法).
 */
public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        // 進入到此處, 說明呼叫執行緒被中斷了
        Thread.interrupted();       // 清除中斷狀態, 然後丟擲中斷異常
        throw new InterruptedException();
    }
}

transfer方法的內部實際是呼叫了xfer方法,入參為SYNC=2

/**
 * 入隊/出隊元素的真正實現.
 *
 * @param e        入隊操作, e非null; 出隊操作, e為null
 * @param haveData true表示入隊元素, false表示出隊元素
 * @param how      NOW, ASYNC, SYNC, TIMED 四種常量定義
 * @param nanos    限時模式下使用(納秒)
 * @return 匹配成功則返回匹配的元素, 否則返回e本身
 */
private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))            // 入隊操作, 元素e不能為null
        throw new NullPointerException();

    Node s = null;

    retry:
    for (; ; ) {
        for (Node h = head, p = h; p != null; ) {               // 嘗試匹配p指向的結點
            boolean isData = p.isData;                          // 結點型別
            Object item = p.item;                               // 結點值
            if (item != p && (item != null) == isData) {        // 如果結點還未匹配過
                if (isData == haveData)                         // 同種型別結點不能匹配
                    break;
                if (p.casItem(item, e)) {                       // p指向從隊首開始向後的第一個匹配結點
                    for (Node q = p; q != h; ) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head) == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    LockSupport.unpark(p.waiter);               // 喚醒匹配結點上的等待執行緒
                    return LinkedTransferQueue.<E>cast(item);   // 返回匹配結點的值
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head);  // Use head if p offlist
        }

        if (how != NOW) {
            if (s == null)
                s = new Node(e, haveData);      // 建立一個入隊結點, 新增到隊尾
            Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(佇列中只有一個結點)或null(tryAppend失敗)
            if (pred == null)
                continue retry;                 // 入隊失敗,則重試
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);   // 等待出隊執行緒
        }
        return e;
    }
}

我們通過示例看下xfer方法到底做了哪些事:

①佇列初始狀態

clipboard.png

②ThreadA執行緒呼叫transfer入隊元素“9”

注意,此時入隊一個數據結點,且佇列為空,所以會直接進入xfer中的下述程式碼:

if (how != NOW) {
    if (s == null)
        s = new Node(e, haveData);      // 建立一個入隊結點, 新增到隊尾
    Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(佇列中只有一個結點)或null(tryAppend失敗)
    if (pred == null)
        continue retry;                 // 入隊失敗,則重試
    if (how != ASYNC)
        return awaitMatch(s, pred, e, (how == TIMED), nanos);   // 等待出隊執行緒
}

上述程式碼會插入一個結點至隊尾,然後執行緒進入阻塞,等待一個出隊執行緒(消費者)的到來。

隊尾插入結點的方法是tryAppend,由於此時佇列為空,會進入CASE1分支,設定隊首指標head指向新結點,tryAppend方法的返回值有三種情況:

  1. 入隊失敗,返回null;
  2. 入隊成功且佇列只有一個結點,返回該結點自身;
  3. 入隊成功且佇列不止一個結點,返回該入隊結點的前驅結點。
/**
 * 嘗試將結點s新增到隊尾.
 *
 * @param s        待新增的結點
 * @param haveData true: 資料結點
 * @return 返回null表示失敗; 否則返回s的前驅結點(沒有前驅則返回s自身)
 */
private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t; ; ) {
        Node n, u;
        if (p == null && (p = head) == null) {      // CASE1: 佇列為空
            if (casHead(null, s))   // 設定隊首指標head
                return s;
        } else if (p.cannotPrecede(haveData))       // CASE2: 結點s不能連結到結點p
            return null;
        else if ((n = p.next) != null)              // CASE3: 遍歷至隊尾結點
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))          // CASE4: 插入結點s
            p = p.next;                   // re-read on CAS failure
        else {                                      // CASE5: 嘗試進行鬆弛操作
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                    (t = tail) != null &&
                    (s = t.next) != null && // advance and retry
                    (s = s.next) != null && s != t) ;
            }
            return p;
        }
    }
}

等待出隊執行緒方法awaitMatch,該方法核心作用就是進行結點匹配:

  1. 匹配成功,返回匹配值;
  2. 匹配失敗(中斷或限時等待的超時情況),返回原匹配結點的值;
  3. 阻塞執行緒,等待與之匹配的結點的到來。
從awaitMatch方法其實可以看到一種經典的“鎖優化”思路,就是 自旋 -> yield -> 阻塞,執行緒不會立即進入阻塞,因為執行緒上下文切換的開銷往往比較大,所以會先自旋一定次數,中途可能伴隨隨機的yield操作,讓出cpu時間片,如果自旋次數用完後,還是沒有匹配執行緒出現,再真正阻塞執行緒。

經過上述步驟,ThreadA最終會進入CASE4分支中等待,此時的佇列結構如下:clipboard.png

注意,此時的佇列中tail隊尾指標並不指向結點“9”,這是一種“鬆弛”策略,後面會講到。

③ThreadB執行緒呼叫transfer入隊元素“2”

由於此時隊首head指標不為null,所以會進入transfer方法中的以下迴圈:

for (Node h = head, p = h; p != null; ) {
    boolean isData = p.isData;                          // 結點型別
    Object item = p.item;                               // 結點值
    if (item != p && (item != null) == isData) {        // 如果結點還未匹配過
        if (isData == haveData)                         // 同種型別結點不能匹配
            break;
        if (p.casItem(item, e)) { // match
            for (Node q = p; q != h; ) {
                Node n = q.next;  // update by 2 unless singleton
                if (head == h && casHead(h, n == null ? q : n)) {
                    h.forgetNext();
                    break;
                }                 // advance and retry
                if ((h = head) == null ||
                    (q = h.next) == null || !q.isMatched())
                    break;        // unless slack < 2
            }
            LockSupport.unpark(p.waiter);
            return LinkedTransferQueue.<E>cast(item);
        }
    }
    Node n = p.next;
    p = (p != n) ? n : (h = head);  // Use head if p offlist
}

上述方法會讀取隊首結點,判斷該結點有沒被匹配過(item != p && (item != null) == isData):

  1. 如果已經被其它執行緒匹配過了,則繼續判斷下一個結點(p.next);
  2. 如果還沒有被匹配,則判斷下當前的入隊結點型別是否和隊首中的一致;如果一致(isData == haveData)就匹配失敗,跳出迴圈,否則進行匹配操作。

顯然,目前隊首結點是“資料結點”,ThreadB執行緒的入隊結點也是“資料結點”,結點型別一致,所以匹配失敗,直接跳過迴圈,也進入以下程式碼塊:

if (how != NOW) {
    if (s == null)
        s = new Node(e, haveData);      // 建立一個入隊結點, 新增到隊尾
    Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(佇列中只有一個結點)或null(tryAppend失敗)
    if (pred == null)
        continue retry;                 // 入隊失敗,則重試
    if (how != ASYNC)
        return awaitMatch(s, pred, e, (how == TIMED), nanos);   // 等待出隊執行緒
}

再次呼叫tryAppend方法, 會在CASE4分支中將元素“2”插入隊尾,然後在CASE5分支中重新設定隊尾指標tail

/**
 * 嘗試將結點s新增到隊尾.
 *
 * @param s        待新增的結點
 * @param haveData true: 資料結點
 * @return 返回null表示失敗; 否則返回s的前驅結點(沒有前驅則返回s自身)
 */
private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t; ; ) {
        Node n, u;
        if (p == null && (p = head) == null) {      // CASE1: 佇列為空
            if (casHead(null, s))   // 設定隊首指標head
                return s;
        } else if (p.cannotPrecede(haveData))       // CASE2: 結點s不能連結到結點p
            return null;
        else if ((n = p.next) != null)              // CASE3: 遍歷至隊尾結點
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))          // CASE4: 插入結點s
            p = p.next;                   // re-read on CAS failure
        else {                                      // CASE5: 嘗試進行鬆弛操作
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                    (t = tail) != null &&
                    (s = t.next) != null && // advance and retry
                    (s = s.next) != null && s != t) ;
            }
            return p;
        }
    }
}

此時佇列結構如下:clipboard.png

最終,ThreadB也會在awaitMatch方法中進入阻塞,最終佇列結構如下:clipboard.png

④ThreadC執行緒呼叫transfer入隊元素“93”

過程和前幾步幾乎相同,不再贅述,最終佇列結構如下:clipboard.png

可以看到,隊尾指標tail的設定實際是滯後的,這是一種“鬆弛”策略,用以提升無鎖演算法併發修改過程中的效能。

take方法

再來看下消費者執行緒呼叫的take方法,該方法會從隊首取出一個元素,如果佇列為空,則執行緒會阻塞:

/**
 * 從隊首出隊一個元素.
 */
public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);   // (e == null && isData=false)表示一個請求結點
    if (e != null)  // 如果e!=null, 則表示匹配成功, 此時e為與之匹配的資料結點的值
        return e;
    
    Thread.interrupted();
    throw new InterruptedException();
}

內部依然呼叫了xfer方法,不過此時入參有所不同,由於是消費執行緒呼叫,所以入參e == null && hasData == false,表示一個“請求結點”:

/**
 * 入隊/出隊元素的真正實現.
 *
 * @param e        入隊操作, e非null; 出隊操作, e為null
 * @param haveData true表示入隊元素, false表示出隊元素
 * @param how      NOW, ASYNC, SYNC, TIMED 四種常量定義
 * @param nanos    限時模式下使用(納秒)
 * @return 匹配成功則返回匹配的元素, 否則返回e本身
 */
private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))            // 入隊操作, 元素e不能為null
        throw new NullPointerException();

    Node s = null;

    retry:
    for (; ; ) {
        for (Node h = head, p = h; p != null; ) {               // 嘗試匹配p指向的結點
            boolean isData = p.isData;                          // 結點型別
            Object item = p.item;                               // 結點值
            if (item != p && (item != null) == isData) {        // 如果結點還未匹配過
                if (isData == haveData)                         // 同種型別結點不能匹配
                    break;
                if (p.casItem(item, e)) {                       // p指向從隊首開始向後的第一個匹配結點
                    for (Node q = p; q != h; ) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head) == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    LockSupport.unpark(p.waiter);               // 喚醒匹配結點上的等待執行緒
                    return LinkedTransferQueue.<E>cast(item);   // 返回匹配結點的值
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head);  // Use head if p offlist
        }

        if (how != NOW) {
            if (s == null)
                s = new Node(e, haveData);      // 建立一個入隊結點, 新增到隊尾
            Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(佇列中只有一個結點)或null(tryAppend失敗)
            if (pred == null)
                continue retry;                 // 入隊失敗,則重試
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);   // 等待出隊執行緒
        }
        return e;
    }
}

還是通過示例看:

①佇列初始狀態

clipboard.png

②ThreadD呼叫take方法,消費元素

此時,在xfer方法中,會從隊首開始,向後找到第一個匹配結點,並交換元素值,然後喚醒佇列中匹配結點上的等待執行緒:

/**
 * 入隊/出隊元素的真正實現.
 *
 * @param e        入隊操作, e非null; 出隊操作, e為null
 * @param haveData true表示入隊元素, false表示出隊元素
 * @param how      NOW, ASYNC, SYNC, TIMED 四種常量定義
 * @param nanos    限時模式下使用(納秒)
 * @return 匹配成功則返回匹配的元素, 否則返回e本身
 */
private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))            // 入隊操作, 元素e不能為null
        throw new NullPointerException();

    Node s = null;

    retry:
    for (; ; ) {
        for (Node h = head, p = h; p != null; ) {               // 嘗試匹配p指向的結點
            boolean isData = p.isData;                          // 結點型別
            Object item = p.item;                               // 結點值
            if (item != p && (item != null) == isData) {        // 如果結點還未匹配過
                if (isData == haveData)                         // 同種型別結點不能匹配
                    break;
                if (p.casItem(item, e)) {                       // p指向從隊首開始向後的第一個匹配結點
                    for (Node q = p; q != h; ) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head) == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    LockSupport.unpark(p.waiter);               // 喚醒匹配結點上的等待執行緒
                    return LinkedTransferQueue.<E>cast(item);   // 返回匹配結點的值
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head);  // Use head if p offlist
        }

        if (how != NOW) {
            if (s == null)
                s = new Node(e, haveData);      // 建立一個入隊結點, 新增到隊尾
            Node pred = tryAppend(s, haveData); // pred指向s的前驅結點或s(佇列中只有一個結點)或null(tryAppend失敗)
            if (pred == null)
                continue retry;                 // 入隊失敗,則重試
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);   // 等待出隊執行緒
        }
        return e;
    }
}

最終佇列結構如下,匹配結點的值被置換為null,ThreadA被喚醒,ThreadD拿到匹配結點上的元素值“9”並返回:

clipboard.png

③ThreadA被喚醒後繼續執行

ThreadA被喚醒後,從原阻塞處——繼續向下執行,然後進入下一次自旋,進入CASE1分支:

/**
 * 自旋/yield/阻塞,直到結點s被匹配.
 *
 * @param s    等待被匹配的結點s
 * @param pred s的前驅結點或s自身(佇列中只有一個結點的情況)
 * @param e    結點s的值
 * @return 匹配值, 或e本身(中斷或超時情況)
 */
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;   // 限時等待情況下使用
    Thread w = Thread.currentThread();
    int spins = -1;                                                 // 自旋次數, 鎖優化操作
    ThreadLocalRandom randomYields = null; // bound if needed

    for (; ; ) {
        Object item = s.item;
        if (item != e) {                    // CASE1: 匹配成功
            // assert item != s;
            s.forgetContents();             // avoid garbage
            return LinkedTransferQueue.<E>cast(item);
        }
        if ((w.isInterrupted() || (timed && nanos <= 0))
            && s.casItem(e, s)) {           // CASE2: 取消(執行緒被中斷或超時)
            unsplice(pred, s);
            return e;
        }

        // CASE3: 設定輕量級鎖(自旋 -> yield)

        if (spins < 0) {                    // 初始化自旋次數
            if ((spins = spinsFor(pred, s.isData)) > 0)
                randomYields = ThreadLocalRandom.current();
        } else if (spins > 0) {             // 自選次數減1
            --spins;
            if (randomYields.nextInt(CHAINED_SPINS) == 0)
                Thread.yield();             // 隨機yield執行緒
        } else if (s.waiter == null) {      // waiter儲存待阻塞執行緒
            s.waiter = w;
        } else if (timed) {                 // 限時等待情況, 計算剩餘有效時間
            nanos = deadline - System.nanoTime();
            if (nanos > 0L)
                LockSupport.parkNanos(this, nanos);
        } else {                            // CASE4: 阻塞執行緒
            LockSupport.park(this);
        }
    }
}

在CASE1分支中,由於結點的item項已經被替換成了null,所以呼叫s.forgetContents(),並返回null

/**
 * 設定當前結點的值為自身.
 * 設定當前結點的等待執行緒為null.
 */
final void forgetContents() {
    UNSAFE.putObject(this, itemOffset, this);
    UNSAFE.putObject(this, waiterOffset, null);
}

最終佇列結構如下:

clipboard.png

④ThreadE呼叫take方法出隊元素

ThreadE呼叫take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false),由於head指標指向的元素已經匹配過了,所以向後繼續查詢,找到第一個未匹配過的結點“2”,然後置換結點“2”中的元素值為null,喚醒執行緒ThreadB,返回匹配結點的元素值“2”:

for (Node h = head, p = h; p != null; ) {               // 嘗試匹配p指向的結點
    boolean isData = p.isData;                          // 結點型別
    Object item = p.item;                               // 結點值
    if (item != p && (item != null) == isData) {        // 如果結點還未匹配過
        if (isData == haveData)                         // 同種型別結點不能匹配
            break;
        if (p.casItem(item, e)) {                       // p指向從隊首開始向後的第一個匹配結點
            for (Node q = p; q != h; ) {
                Node n = q.next;  // update by 2 unless singleton
                if (head == h && casHead(h, n == null ? q : n)) {
                    h.forgetNext();
                    break;
                }                 // advance and retry
                if ((h = head) == null ||
                    (q = h.next) == null || !q.isMatched())
                    break;        // unless slack < 2
            }
            LockSupport.unpark(p.waiter);               // 喚醒匹配結點上的等待執行緒
            return LinkedTransferQueue.<E>cast(item);   // 返回匹配結點的值
        }
    }
    Node n = p.next;
    p = (p != n) ? n : (h = head);  // Use head if p offlist
}

此時佇列狀態如下,可以看到,隊首指標head一次性向後跳了2個位置,原來已經匹配過的元素的next指標指向自身,等待被GC回收,這其實就是LinkedTransferQueue的“鬆弛”策略:

clipboard.png

⑤ThreadB被喚醒後繼續執行

過程和步驟③完全相同,在awaitMatch方法中,將結點的item置為this,然後返回匹配結點值——null,最終佇列結構如下:

clipboard.png

⑥ThreadF呼叫take方法出隊元素

ThreadF呼叫take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false),由於head指標指向的元素此時沒有匹配,所以不用像步驟②那樣向後查詢,而是直接置換匹配結點的元素值“93”,然後喚醒ThreadC,返回匹配值“93”。最終佇列結構如下:

clipboard.png

⑦ThreadC被喚醒後繼續執行

過程和步驟③完全相同,在awaitMatch方法中,將結點的item置為this,然後返回匹配結點值——null,最終佇列結構如下:

clipboard.png

此時的佇列結構,讀者移一定感到非常奇怪,並不嚴格遵守佇列的定義,這其實就是“Dual Queue”演算法的實現,為了對自旋優化,做了很多看似彆扭的操作,不必奇怪。

假設此時再有一個執行緒ThreadH呼叫take方法出隊元素會怎麼樣?其實這是佇列已經空了,ThreadH會被阻塞,但是會建立一個“請求結點”入隊:

/**
 * 嘗試將結點s新增到隊尾.
 *
 * @param s        待新增的結點
 * @param haveData true: 資料結點
 * @return 返回null表示失敗; 否則返回s的前驅結點(沒有前驅則返回s自身)
 */
private Node tryAppend(Node s, boolean haveData) {
    for (Node t = tail, p = t; ; ) {
        Node n, u;
        if (p == null && (p = head) == null) {      // CASE1: 佇列為空
            if (casHead(null, s))   // 設定隊首指標head
                return s;
        } else if (p.cannotPrecede(haveData))       // CASE2: 結點s不能連結到結點p
            return null;
        else if ((n = p.next) != null)              // CASE3: 遍歷至隊尾結點
            p = p != t && t != (u = tail) ? (t = u) : // stale tail
                (p != n) ? n : null;      // restart if off list
        else if (!p.casNext(null, s))          // CASE4: 插入結點s
            p = p.next;                   // re-read on CAS failure
        else {                                      // CASE5: 嘗試進行鬆弛操作
            if (p != t) {                 // update if slack now >= 2
                while ((tail != t || !casTail(t, s)) &&
                    (t = tail) != null &&
                    (s = t.next) != null && // advance and retry
                    (s = s.next) != null && s != t) ;
            }
            return p;
        }
    }
}

呼叫完tryAppend方法後,佇列結構如下,橙色的為“請求結點”—— item==null && isData==false

clipboard.png

然後ThreadH也會進入在awaitMatch方法後進入阻塞,並等待一個入隊執行緒的到來。最終佇列結構如下:

clipboard.png

三、總結

截止本篇為止,我們已經學習完了juc-collection框架中的所有阻塞佇列,如下表所示:

佇列特性 有界佇列 近似無界佇列 無界佇列 特殊佇列
有鎖演算法 / LinkedBlockingQueue、LinkedBlockingDeque / PriorityBlockingQueue、DelayQueue
無鎖演算法 ArrayBlockingQueue / LinkedTransferQueue SynchronousQueue

可以看到,LinkedTransferQueue其實兼具了SynchronousQueue的特性以及無鎖演算法的效能,並且是一種無界佇列:

  1. 和SynchronousQueue相比,LinkedTransferQueue可以儲存實際的資料;
  2. 和其它阻塞佇列相比,LinkedTransferQueue直接用無鎖演算法實現,效能有所提升。

另外,由於LinkedTransferQueue可以存放兩種不同型別的結點,所以稱之為“Dual Queue”:內部Node結點定義了一個 boolean 型欄位——isData,表示該結點是“資料結點”還是“請求結點”。

為了節省 CAS 操作的開銷,LinkedTransferQueue使用了鬆弛(slack)操作:在結點被匹配(被刪除)之後,不會立即更新佇列的head、tail,而是當 head、tail結點與最近一個未匹配的結點之間的距離超過“鬆弛閥值”後才會更新(預設為 2)。這個“鬆弛閥值”一般為1到3,如果太大會增加沿連結串列查詢未匹配結點的時間,太小會增加 CAS 的開銷。