1. 程式人生 > >逐行分析AQS原始碼(4)——Condition介面實現

逐行分析AQS原始碼(4)——Condition介面實現

前言

本篇文章是基於執行緒間的同步與通訊(4)——Lock 和 Condtion這篇文章寫的,在那篇文章中,我們分析了Condition介面所定義的方法,本篇我們就來看看AQS對於Condition介面的這些介面方法的具體實現。

下文中筆者將假設讀者已經閱讀過那篇文章,或者已經瞭解了相關的背景知識。

概述

我們在前面介紹Conditon的時候說過,Condition介面的await/signal機制是設計用來代替監視器鎖的wait/notify機制的,因此,與監視器鎖的wait/notify機制對照著學習有助於我們更好的理解Conditon介面:

Object 方法 Condition 方法 區別
void wait() void await()
void wait(long timeout) long awaitNanos(long nanosTimeout) 時間單位,返回值
void wait(long timeout, int nanos) boolean await(long time, TimeUnit unit) 時間單位,引數型別,返回值
void notify() void signal()
void notifyAll() void signalAll()
  • | void awaitUninterruptibly() | Condition獨有
  • | boolean awaitUntil(Date deadline) | Condition獨有

這裡先做一下說明,本文說wait方法時,是泛指wait()wait(long timeout)wait(long timeout, int nanos) 三個方法,當需要指明某個特定的方法時,會帶上相應的引數。同樣的,說notify方法時,也是泛指notify(),notifyAll()方法,await方法和signal方法以此類推。

首先,我們通過wait/notify機制來類比await/signal機制:

  1. 呼叫wait方法的執行緒首先必須是已經進入了同步程式碼塊,即已經獲取了監視器鎖;與之類似,呼叫await方法的執行緒首先必須獲得lock鎖
  2. 呼叫wait方法的執行緒會釋放已經獲得的監視器鎖,進入當前監視器鎖的等待佇列(wait set)中;與之類似,呼叫await方法的執行緒會釋放已經獲得的lock鎖,進入到當前Condtion對應的條件佇列中。
  3. 呼叫監視器鎖的notify方法會喚醒等待在該監視器鎖上的執行緒,這些執行緒將開始參與鎖競爭,並在獲得鎖後,從wait方法處恢復執行;與之類似,呼叫Condtion的signal方法會喚醒對應的條件佇列中的執行緒,這些執行緒將開始參與鎖競爭,並在獲得鎖後,從await方法處開始恢復執行。

實戰

由於前面我們已經學習過了監視器鎖的wait/notify機制,await/signal的用法基本類似。在正式分析原始碼之前,我們先來看一個使用condition的例項:

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生產者方法,往數組裡面寫資料
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await(); //陣列已滿,沒有空間時,掛起等待,直到陣列“非滿”(notFull)
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            // 因為放入了一個數據,陣列肯定不是空的了
            // 此時喚醒等待這notEmpty條件上的執行緒
            notEmpty.signal(); 
        } finally {
            lock.unlock();
        }
    }

    // 消費者方法,從數組裡面拿資料
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 陣列是空的,沒有資料可拿時,掛起等待,直到陣列非空(notEmpty)
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            // 因為拿出了一個數據,陣列肯定不是滿的了
            // 此時喚醒等待這notFull條件上的執行緒
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

這是java官方文件提供的例子,是一個典型的生產者-消費者模型。這裡在同一個lock鎖上,建立了兩個條件佇列notFull, notEmpty。當陣列已滿,沒有儲存空間時,put方法在notFull條件上等待,直到陣列“not full”;當陣列空了,沒有資料可讀時,take方法在notEmpty條件上等待,直到陣列“not empty”,而notEmpty.signal()notFull.signal()則用來喚醒等待在這個條件上的執行緒。

注意,上面所說的,在notFull notEmpty條件上等待事實上是指執行緒在條件佇列(condition queue)上等待,當該執行緒被相應的signal方法喚醒後,將進入到我們前面三篇介紹的sync queue中去爭鎖,爭到鎖後才能能await方法處返回。這裡接牽涉到兩種隊列了——condition queuesync queue,它們都定義在AQS中。

為了防止大家被AQS中的佇列弄暈,這裡我們先理理清:

同步佇列 vs 條件佇列

sync queue

首先,在逐行分析AQS原始碼(1)——獨佔鎖的獲取這篇中我們說過,所有等待鎖的執行緒都會被包裝成Node扔到一個同步佇列中。該同步佇列如下:dummy head

sync queue是一個雙向連結串列,我們使用prevnext屬性來串聯節點。但是在這個同步佇列中,我們一直沒有用到nextWaiter屬性,即使是在共享鎖模式下,這一屬性也只作為一個標記,指向了一個空節點,因此,在sync queue中,我們不會用它來串聯節點。

condtion queue

每建立一個Condtion物件就會對應一個Condtion佇列,每一個呼叫了Condtion物件的await方法的執行緒都會被包裝成Node扔進一個條件佇列中,就像這樣:Conditon queue可見,每一個Condition物件對應一個Conditon佇列,每個Condtion佇列都是獨立的,互相不影響的。在上圖中,如果我們對當前執行緒呼叫了notFull.await(), 則當前執行緒就會被包裝成Node加到notFull佇列的末尾。

值得注意的是,condition queue是一個單向連結串列,在該連結串列中我們使用nextWaiter屬性來串聯連結串列。但是,就像在sync queue中不會使用nextWaiter屬性來串聯連結串列一樣,在condition queue中,也並不會用到prev, next屬性,它們的值都為null。也就是說,在條件佇列中,Node節點真正用到的屬性只有三個:

  • thread:代表當前正在等待某個條件的執行緒
  • waitStatus:條件的等待狀態
  • nextWaiter:指向條件佇列中的下一個節點

既然這裡又提到了waitStatus,我們這裡再回顧一下它的取值範圍:

volatile int waitStatus;
static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

在條件佇列中,我們只需要關注一個值即可——CONDITION。它表示執行緒處於正常的等待狀態,而只要waitStatus不是Condition,我們就認為執行緒不再等待了,此時就要從條件佇列中出隊。

sync queue 和 conditon queue的聯絡

一般情況下,等待鎖的sync queue和條件佇列condition queue是相互獨立的,彼此之間並沒有任何關係。但是,當我們呼叫某個條件佇列的signal方法時,會將某個或所有等待在這個條件佇列中的執行緒喚醒,被喚醒的執行緒和普通執行緒一樣需要去爭鎖,如果沒有搶到,則同樣要被加到等待鎖的sync queue中去,此時節點就從condition queue中被轉移到sync queue中:condtion queue to sync queue

但是,這裡尤其要注意的是,node是被一個一個轉移過去的,哪怕我們呼叫的是signalAll()方法也是一個一個轉移過去的,而不是將整個條件佇列接在sync queue的末尾。

同時要注意的是,我們在sync queue中只使用prevnext來串聯連結串列,而不使用nextWaiter;我們在condition queue中只使用nextWaiter來串聯連結串列,而不使用prevnext.事實上,它們就是兩個使用了同樣的Node資料結構的完全獨立的兩種連結串列。因此,將節點從condition queue中轉移到sync queue中時,我們需要斷開原來的連結(nextWaiter),建立新的連結(prev, next),這某種程度上也是需要將節點一個一個地轉移過去的原因之一。

入隊時和出隊時的鎖狀態

sync queue是等待鎖的佇列,當一個執行緒被包裝成Node加到該佇列中時,必然是沒有獲取到鎖;當處於該佇列中的節點獲取到了鎖,它將從該佇列中移除(事實上移除操作是將獲取到鎖的節點設為新的dummy head,並將thread屬性置為null)。

condition佇列是等待在特定條件下的佇列,因為呼叫await方法時,必然是已經獲得了lock鎖,所以在進入condtion佇列執行緒必然是已經獲取了鎖;在被包裝成Node扔進條件佇列中後,執行緒將釋放鎖,然後掛起;當處於該佇列中的執行緒被signal方法喚醒後,由於佇列中的節點在之前掛起的時候已經釋放了鎖,所以必須先去再次的競爭鎖,因此,該節點會被新增到sync queue中。因此,條件佇列在出隊時,執行緒並不持有鎖。

所以事實上,這兩個佇列的鎖狀態正好相反:

  • condition queue:入隊時已經持有了鎖 -> 在佇列中釋放鎖 -> 離開佇列時沒有鎖 -> 轉移到sync queue
  • sync queue:入隊時沒有鎖 -> 在佇列中爭鎖 -> 離開佇列時獲得了鎖

通過上面的介紹,我們對條件佇列已經有了感性的認識,接下來就讓我們進入到本篇的重頭戲——原始碼分析:

CondtionObject

AQS對Condition這個介面的實現主要是通過ConditionObject,上面已經說個,它的核心實現就是是一個條件佇列,每一個在某個condition上等待的執行緒都會被封裝成Node物件扔進這個條件佇列。

核心屬性

它的核心屬性只有兩個:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

這兩個屬性分別代表了條件佇列的隊頭和隊尾,每當我們新建一個conditionObject物件,都會對應一個條件佇列。

建構函式

public ConditionObject() { }

建構函式啥也沒幹,可見,條件佇列是延時初始化的,在真正用到的時候才會初始化。

Condition介面方法實現

await()第一部分分析

public final void await() throws InterruptedException {
    // 如果當前執行緒在調動await()方法前已經被中斷了,則直接丟擲InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
    // 將當前執行緒封裝成Node新增到條件佇列
    Node node = addConditionWaiter();
    // 釋放當前執行緒所佔用的鎖,儲存當前的鎖狀態
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 如果當前佇列不在同步佇列中,說明剛剛被await, 還沒有人呼叫signal方法,則直接將當前執行緒掛起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 執行緒將在這裡被掛起,停止執行
        // 能執行到這裡說明要麼是signal方法被呼叫了,要麼是執行緒被中斷了
        // 所以檢查下執行緒被喚醒的原因,如果是因為中斷被喚醒,則跳出while迴圈
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 第一部分就分析到這裡,下面的部分我們到第二部分再看, 先把它註釋起來
    /*
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    */
}

我們已經在上面的程式碼註釋中描述了大體的流程,接下來我們詳細來看看await方法中所呼叫方法的具體實現。

首先是將當前執行緒封裝成Node扔進條件佇列中的addConditionWaiter方法:

addConditionWaiter

/**
 * Adds a new waiter to wait queue.
 * @return its new wait node
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果尾節點被cancel了,則先遍歷整個連結串列,清除所有被cancel的節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 將當前執行緒包裝成Node扔進條件佇列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    /*
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
    */
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

首先我們要思考的是,存在兩個不同的執行緒同時入隊的情況嗎?不存在。為什麼呢?因為前面說過了,能呼叫await方法的執行緒必然是已經獲得了鎖,而獲得了鎖的執行緒只有一個,所以這裡不存在併發,因此不需要CAS操作。

在這個方法中,我們就是簡單的將當前執行緒封裝成Node加到條件佇列的末尾。這和將一個執行緒封裝成Node加入等待佇列略有不同:

  1. 節點加入sync queuewaitStatus的值為0,但節點加入condition queuewaitStatus的值為Node.CONDTION
  2. sync queue的頭節點為dummy節點,如果佇列為空,則會先建立一個dummy節點,再建立一個代表當前節點的Node新增在dummy節點的後面;而condtion queue 沒有dummy節點,初始化時,直接將firstWaiterlastWaiter直接指向新建的節點就行了。
  3. sync queue是一個雙向佇列,在節點入隊後,要同時修改當前節點的前驅前驅節點的後繼;而在condtion queue中,我們只修改了前驅節點的nextWaiter,也就是說,condtion queue是作為單向佇列來使用的。

如果入隊時發現尾節點已經取消等待了,那麼我們就不應該接在它的後面,此時需要呼叫unlinkCancelledWaiters來剔除那些已經取消等待的執行緒:

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

該方法將從頭節點開始遍歷整個佇列,剔除其中waitStatus不為Node.CONDTION的節點,這裡使用了兩個指標firstWaitertrail來分別記錄第一個和最後一個waitStatus不為Node.CONDTION的節點,這些都是基礎的連結串列操作,很容易理解,這裡不再贅述了。

fullyRelease

在節點被成功新增到佇列的末尾後,我們將呼叫fullyRelease來釋放當前執行緒所佔用的鎖:

/**
 * Invokes release with current state value; returns saved state.
 * Cancels node and throws exception on failure.
 * @param node the condition node for this wait
 * @return previous sync state
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

首先,當我們呼叫這個方法時,說明當前執行緒已經被封裝成Node扔進條件隊列了。在該方法中,我們通過release方法釋放鎖,還記得release方法嗎,我們在逐行分析AQS原始碼(2)——獨佔鎖的釋放中已經詳細講過了,這裡不再贅述了。

值得注意的是,這是一次性釋放了所有的鎖,即對於可重入鎖而言,無論重入了幾次,這裡是一次性釋放完的,這也就是為什麼該方法的名字叫fullyRelease。但這裡尤其要注意的是release(savedState)方法是有可能丟擲IllegalMonitorStateException的,這是因為當前執行緒可能並不是持有鎖的執行緒。但是咱前面不是說,只有持有鎖的執行緒才能呼叫await方法嗎?既然fullyRelease方法在await方法中,為啥當前執行緒還有可能並不是持有鎖的執行緒呢?

雖然話是這麼說,但是在呼叫await方法時,我們其實並沒有檢測Thread.currentThread() == getExclusiveOwnerThread(),換句話說,也就是執行到fullyRelease這一步,我們才會檢測這一點,而這一點檢測是由AQS子類實現tryRelease方法來保證的,例如,ReentrantLock對tryRelease方法的實現如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

當發現當前執行緒不是持有鎖的執行緒時,我們就會進入finally塊,將當前Node的狀態設為Node.CANCELLED,這也就是為什麼上面的addConditionWaiter在新增新節點前每次都會檢查尾節點是否已經被取消了。

在當前執行緒的鎖被完全釋放了之後,我們就可以呼叫LockSupport.park(this)把當前執行緒掛起,等待被signal了。但是,在掛起當前執行緒之前我們先用isOnSyncQueue確保了它不在sync queue中,這是為什麼呢?當前執行緒不是在一個和sync queue無關的條件佇列中嗎?怎麼可能會出現在sync queue中的情況?

/**
 * Returns true if a node, always one that was initially placed on
 * a condition queue, is now waiting to reacquire on sync queue.
 * @param node the node
 * @return true if is reacquiring
 */
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}
/**
 * Returns true if node is on sync queue by searching backwards from tail.
 * Called only when needed by isOnSyncQueue.
 * @return true if present
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

為了解釋這一問題,我們先來看看signal方法

signalAll()

在看signalAll之前,我們首先要區分呼叫signalAll方法的執行緒與signalAll方法要喚醒的執行緒(等待在對應的條件佇列裡的執行緒):

  • 呼叫signalAll方法的執行緒本身是已經持有了鎖,現在準備釋放鎖了;
  • 在條件佇列裡的執行緒是已經在對應的條件上掛起了,等待著被signal喚醒,然後去爭鎖。

首先,與呼叫notify時執行緒必須是已經持有了監視器鎖類似,在呼叫condition的signal方法時,執行緒也必須是已經持有了lock鎖:

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

該方法首先檢查當前呼叫signal方法的執行緒是不是持有鎖的執行緒,這是通過isHeldExclusively方法來實現的,該方法由繼承AQS的子類來實現,例如,ReentrantLock對該方法的實現為:

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

因為exclusiveOwnerThread儲存了當前持有鎖的執行緒,這裡只要檢測它是不是等於當前執行緒就行了。接下來先通過firstWaiter是否為空判斷條件佇列是否為空,如果條件佇列不為空,則呼叫doSignalAll方法:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

首先我們通過lastWaiter = firstWaiter = null;將整個條件佇列清空,然後通過一個do-while迴圈,將原先的條件佇列裡面的節點一個一個拿出來(令nextWaiter = null),再通過transferForSignal方法一個一個新增到sync queue的末尾:

/**
 * Transfers a node from a condition queue onto sync queue.
 * Returns true if successful.
 * @param node the node
 * @return true if successfully transferred (else the node was
 * cancelled before signal)
 */
final boolean transferForSignal(Node node) {
    // 如果該節點在呼叫signal方法前已經被取消了,則直接跳過這個節點
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 如果該節點在條件佇列中正常等待,則利用enq方法將該節點新增至sync queue佇列的尾部
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); 
    return true;
}

transferForSignal方法中,我們先使用CAS操作將當前節點的waitStatus狀態由CONDTION設為0,如果修改不成功,則說明該節點已經被CANCEL了,則我們直接返回,操作下一個節點;如果修改成功,則說明我們已經將該節點從等待的條件佇列中成功“喚醒”了,但此時該節點對應的執行緒並沒有真正被喚醒,它還要和其他普通執行緒一樣去爭鎖,因此它將被新增到sync queue的末尾等待獲取鎖。

我們這裡通過enq方法將該節點新增進sync queue的末尾。關於該方法,我們在逐行分析AQS原始碼(1)——獨佔鎖的獲取中已經詳細講過了,這裡不再贅述。不過這裡尤其注意的是,enq方法將node節點新增進佇列時,返回的是node的前驅節點

在將節點成功新增進sync queue中後,我們得到了該節點在sync queue中的前驅節點。我們前面說過,在sync queque中的節點都要靠前驅節點去喚醒,所以,這裡我們要做的就是將前驅節點的waitStatus設為Node.SIGNAL, 這一點和shouldParkAfterFailedAcquire所做的工作類似:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

所不同的是,shouldParkAfterFailedAcquire將會向前查詢,跳過那些被cancel的節點,然後將找到的第一個沒有被cancel的節點的waitStatus設成SIGNAL,最後再掛起。而在transferForSignal中,當前Node所代表的執行緒本身就已經被掛起了,所以這裡做的更像是一個複合操作——只要前驅節點處於被取消的狀態或者無法將前驅節點的狀態修成Node.SIGNAL,那我們就將Node所代表的執行緒喚醒,但這個條件並不意味著當前lock處於可獲取的狀態,有可能執行緒被喚醒了,但是鎖還是被佔有的狀態,不過這樣做至少是無害的,因為我們線上程被喚醒後還要去爭鎖,如果搶不到鎖,則大不了再次被掛起。

值得注意的是,transferForSignal是有返回值的,但是我們在這個方法中並沒有用到,它將在signal()方法中被使用。

在繼續往下看signal()方法之前,這裡我們再總結一下signalAll()方法:

  1. 將條件佇列清空(只是令lastWaiter = firstWaiter = null,佇列中的節點和連線關係仍然還存在)
  2. 將條件佇列中的頭節點取出,使之成為孤立節點(nextWaiter,prev,next屬性都為null)
  3. 如果該節點處於被Cancelled了的狀態,則直接跳過該節點(由於是孤立節點,則會被GC回收)
  4. 如果該節點處於正常狀態,則通過enq方法將它新增到sync queue的末尾
  5. 判斷是否需要將該節點喚醒(包括設定該節點的前驅節點的狀態為SIGNAL),如有必要,直接喚醒該節點
  6. 重複2-5,直到整個條件佇列中的節點都被處理完

signal()

signalAll()方法不同,signal()方法只會喚醒一個節點,對於AQS的實現來說,就是喚醒條件佇列中第一個沒有被Cancel的節點,弄懂了signalAll()方法,signal()方法就很容易理解了,因為它們大同小異:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

首先依然是檢查呼叫該方法的執行緒(即當前執行緒)是不是已經持有了鎖,這一點和上面的signalAll()方法一樣,所不一樣的是,接下來呼叫的是doSignal方法:

private void doSignal(Node first) {
    do {
        // 將firstWaiter指向條件佇列隊頭的下一個節點
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 將條件佇列原來的隊頭從條件佇列中斷開,則此時該節點成為一個孤立的節點
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

這個方法也是一個do-while迴圈,目的是遍歷整個條件佇列,找到第一個沒有被cancelled的節點,並將它新增到條件佇列的末尾。如果條件佇列裡面已經沒有節點了,則將條件佇列清空(firstWaiter=lasterWaiter=null)。

在這裡,我們用的依然用的是transferForSignal方法,但是用到了它的返回值,只要節點被成功新增到sync queue中,transferForSignal就返回true, 此時while迴圈的條件就不滿足了,整個方法就結束了,即呼叫signal()方法,只會喚醒一個執行緒。

總結: 呼叫signal()方法會從當前條件佇列中取出第一個沒有被cancel的節點新增到sync佇列的末尾。

await()第二部分分析

前面我們已經分析了signal方法,它會將節點新增進sync queue佇列中,並要麼立即喚醒執行緒,要麼等待前驅節點釋放鎖後將自己喚醒,無論怎樣,被喚醒的執行緒要從哪裡恢復執行呢?當然是被掛起的地方呀,我們在哪裡被掛起的呢?還記得嗎?當然是呼叫了await方法的地方,以await()方法為例:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 我們在這裡被掛起了,被喚醒後,將從這裡繼續往下執行
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

這裡值得注意的是,當我們被喚醒時,其實並不知道是因為什麼原因被喚醒,有可能是因為其他執行緒呼叫了signal方法,也有可能是因為當前執行緒被中斷了。

但是,無論是被中斷喚醒還是被signal喚醒,被喚醒的執行緒最後都將離開condition queue,進入到sync queue中,這一點我們在下面分析原始碼的時候詳細說。

隨後,執行緒將在sync queue中利用進行acquireQueued方法進行“阻塞式”爭鎖,搶到鎖就返回,搶不到鎖就繼續被掛起。因此,當await()方法返回時,必然是保證了當前執行緒已經持有了lock鎖。

另外有一點這裡我們提前說明一下,這一點對於我們下面理解原始碼很重要,那就是:

如果從執行緒被喚醒,到執行緒獲取到鎖這段過程中發生過中斷,該怎麼處理?

我們前面分析中斷的時候說過,中斷對於當前執行緒只是個建議,由當前執行緒決定怎麼對其做出處理。在acquireQueued方法中,我們對中斷是不響應的,只是簡單的記錄搶鎖過程中的中斷狀態,並在搶到鎖後將這個中斷狀態返回,交於上層呼叫的函式處理,而這裡“上層呼叫的函式”就是我們的await()方法。

那麼await()方法是怎麼對待這個中斷的呢?這取決於:

中斷髮生時,執行緒是否已經被signal過?

如果中斷髮生時,當前執行緒並沒有被signal過,則說明當前執行緒還處於條件佇列中,屬於正常在等待中的狀態,此時中斷將導致當前執行緒的正常等待行為被打斷,進入到sync queue中搶鎖,因此,在我們從await方法返回後,需要丟擲InterruptedException,表示當前執行緒因為中斷而被喚醒。

如果中斷髮生時,當前執行緒已經被signal過了,則說明這個中斷來的太晚了,既然當前執行緒已經被signal過了,那麼就說明在中斷髮生前,它就已經正常地被從condition queue中喚醒了,所以隨後即使發生了中斷(注意,這個中斷可以發生在搶鎖之前,也可以發生在搶鎖的過程中),我們都將忽略它,僅僅是在await()方法返回後,再自我中斷一下,補一下這個中斷。就好像這個中斷是在await()方法呼叫結束之後才發生的一樣。這裡之所以要“補一下”這個中斷,是因為我們在用Thread.interrupted()方法檢測是否發生中斷的同時,會將中斷狀態清除,因此如果選擇了忽略中斷,則應該在await()方法退出後將它設成原來的樣子。

關於“這個中斷來的太晚了”這一點如果大家不太容易理解的話,這裡打個比方,**這就好比我們去飯店吃飯,都快吃完了,有一個菜到現在還沒有上,於是我們常常會把服務員叫來問:這個菜有沒有在做?要是還沒做我們就不要了。然後服務員會跑到廚房去問,之後跑回來說:對不起,這個菜已經下鍋在炒了,請再耐心等待一下。這裡,這個“這個菜我們不要了”(發起的中斷)就來的太晚了,因為菜已經下鍋了(已經被signal過了)。**

理清了上面的概念,我們再來看看await()方法是怎麼做的,它用中斷模式interruptMode這個變數記錄中斷事件,該變數有三個值:

  1. 0 : 代表整個過程中一直沒有中斷髮生。
  2. THROW_IE : 表示退出await()方法時需要丟擲InterruptedException,這種模式對應於中斷髮生在signal之前
  3. REINTERRUPT : 表示退出await()方法時只需要再自我中斷以下,這種模式對應於中斷髮生在signal之後,即中斷來的太晚了。
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT =  1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE    = -1;

接下來我們就從執行緒被喚醒的地方繼續往下走,一步步分析原始碼:

情況1:中斷髮生時,執行緒還沒有被signal過

執行緒被喚醒後,我們將首先使用checkInterruptWhileWaiting方法檢測中斷的模式:

/**
 * Checks for interrupt, returning THROW_IE if interrupted
 * before signalled, REINTERRUPT if after signalled, or
 * 0 if not interrupted.
 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

這裡假設已經發生過中斷,則Thread.interrupted()方法必然返回true,接下來就是用transferAfterCancelledWait進一步判斷是否發生了signal:

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

上面已經說過,判斷一個node是否被signal過,一個簡單有效的方法就是判斷它是否離開了condition queue, 進入到sync queue中。

換句話說,只要一個節點的waitStatus還是Node.CONDITION,那就說明它還沒有被signal過。由於現在我們分析情況1,則當前節點的waitStatus必然是Node.CONDITION,則會成功執行compareAndSetWaitStatus(node, Node.CONDITION, 0),將該節點的狀態設定成0,然後呼叫enq(node)方法將當前節點新增進sync queue中,然後返回true。這裡值得注意的是,我們此時並沒有斷開node的nextWaiter,所以最後一定不要忘記將這個連結斷開。

再回到transferAfterCancelledWait呼叫處,可知,由於transferAfterCancelledWait將返回true,現在checkInterruptWhileWaiting將返回THROW_IE,這表示我們在離開await方法時應當要丟擲THROW_IE異常。

再回到checkInterruptWhileWaiting的呼叫處:

public final void await() throws InterruptedException {
    /*
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    */
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 我們現在在這裡!!!
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

interruptMode現在為THROW_IE,則我們將執行break,跳出while迴圈。

接下來我們將執行acquireQueued(node, savedState)進行爭鎖,注意,這裡傳入的需要獲取鎖的重入數量是savedState,即之前釋放了多少,這裡就需要再次獲取多少:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 如果執行緒獲取不到鎖,則將在這裡被阻塞住
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued我們在前面的文章中已經詳細分析過了,它是一個阻塞式的方法,獲取到鎖則退出,獲取不到鎖則會被掛起。該方法只有在最終獲取到了鎖後,才會退出,並且退出時會返回當前執行緒的中斷狀態,如果我們在獲取鎖的過程中又被中斷了,則會返回true,否則會返回false。但是其實這裡返回true還是false已經不重要了,因為前面已經發生過中斷了,我們就是因為中斷而被喚醒的不是嗎?所以無論如何,我們在退出await()方法時,必然會丟擲InterruptedException

我們這裡假設它獲取到了鎖了,則它將回到上面的呼叫處,由於我們這時的interruptMode = THROW_IE,則會跳過if語句。接下來我們將執行:

if (node.nextWaiter != null) 
    unlinkCancelledWaiters();

上面我們說過,當前節點的nextWaiter是有值的,它並沒有和原來的condition佇列斷開,這裡我們已經獲取到鎖了,根據逐行分析AQS原始碼(1)——獨佔鎖的獲取中的分析,我們通過setHead方法已經將它的thread屬性置為null,從而將當前執行緒從sync queue"移除"了,接下來應當將它從condition佇列裡面移除。由於condition佇列是一個單向佇列,我們無法獲取到它的前驅節點,所以只能從頭開始遍歷整個條件佇列,然後找到這個節點,再移除它。

然而,事實上呢,我們並沒有這麼做。因為既然已經必須從頭開始遍歷連結串列了,我們就乾脆一次性把連結串列中所有沒有在等待的節點都拿出去,所以這裡呼叫了unlinkCancelledWaiters方法,該方法我們在前面await()第一部分的分析的時候已經講過了,它就是簡單的遍歷連結串列,找到所有waitStatus不為CONDITION的節點,並把它們從佇列中移除。

節點被移除後,接下來就是最後一步了——彙報中斷狀態:

if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

這裡我們的interruptMode=THROW_IE,說明發生了中斷,則將呼叫reportInterruptAfterWait

/**
 * Throws InterruptedException, reinterrupts current thread, or
 * does nothing, depending on mode.
 */
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

可以看出,在interruptMode=THROW_IE時,我們就是簡單的丟擲了一個InterruptedException

至此,情況1(中斷髮生於signal之前)我們就分析完了,這裡我們簡單總結一下:

  1. 執行緒因為中斷,從掛起的地方被喚醒
  2. 隨後,我們通過transferAfterCancelledWait確認了執行緒的waitStatus值為Node.CONDITION,說明並沒有signal發生過
  3. 然後我們修改執行緒的waitStatus為0,並通過enq(node)方法將其新增到sync queue
  4. 接下來執行緒將在sync queue中以阻塞的方式獲取,如果獲取不到鎖,將會被再次掛起
  5. 執行緒在sync queue中獲取到鎖後,將呼叫unlinkCancelledWaiters方法將自己從條件佇列中移除,該方法還會順便移除其他取消等待的鎖
  6. 最後我們通過reportInterruptAfterWait丟擲了InterruptedException

由此可以看出,一個呼叫了await方法掛起的執行緒在被中斷後不會立即丟擲InterruptedException,而是會被新增到sync queue中去爭鎖,如果爭不到,還是會被掛起;只有爭到了鎖之後,該執行緒才得以從sync queuecondition queue中移除,最後丟擲InterruptedException

所以說,一個呼叫了await方法的執行緒,即使被中斷了,它依舊還是會被阻塞住,直到它獲取到鎖之後才能返回,並在返回時丟擲InterruptedException。中斷對它意義更多的是體現在將它從condition queue中移除,加入到sync queue中去爭鎖,從這個層面上看,中斷和signal的效果其實很像,所不同的是,在await()方法返回後,如果是因為中斷被喚醒,則await()方法需要丟擲InterruptedException異常,表示是它是被非正常喚醒的(正常喚醒是指被signal喚醒)。

情況2:中斷髮生時,執行緒已經被signal過了

這種情況對應於“中斷來的太晚了”,即REINTERRUPT模式,我們在拿到鎖退出await()方法後,只需要再自我中斷一下,不需要丟擲InterruptedException。

值得注意的是這種情況其實包含了兩個子情況:

  1. 被喚醒時,已經發生了中斷,但此時執行緒已經被signal過了
  2. 被喚醒時,並沒有發生中斷,但是在搶鎖的過程中發生了中斷

下面我們分別來分析:

情況2.1:被喚醒時,已經發生了中斷,但此時執行緒已經被signal過了

對於這種情況,與前面中斷髮生於signal之前的主要差別在於transferAfterCancelledWait方法:

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //執行緒A執行到這裡,CAS操作將會失敗
        enq(node);
        return true;
    }
    // 由於中斷髮生前,執行緒已經被signal了,則這裡只需要等待執行緒成功進入sync queue即可
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

在這裡,由於signal已經發生過了,則由我們之前分析的signal方法可知,此時當前節點的waitStatus必定不為Node.CONDITION,他將跳過if語句。此時當前執行緒可能已經在sync queue中,或者正在進入到sync queue的路上

為什麼這裡會出現“正在進入到sync queue的路上”的情況呢? 這裡我們解釋下:

假設當前執行緒為執行緒A, 它被喚醒之後檢測到發生了中斷,來到了transferAfterCancelledWait這裡,而另一個執行緒B在這之前已經呼叫了signal方法,該方法會呼叫transferForSignal將當前執行緒新增到sync queue的末尾:

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 執行緒B執行到這裡,CAS操作將會成功
        return false; 
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

因為執行緒A和執行緒B是併發執行的,而這裡我們分析的是“中斷髮生在signal之後”,則此時,執行緒B的compareAndSetWaitStatus先於執行緒A執行。這時可能出現執行緒B已經成功修改了node的waitStatus狀態,但是還沒來得及呼叫enq(node)方法,執行緒A就執行到了transferAfterCancelledWait方法,此時它發現waitStatus已經不是Condition,但是其實當前節點還沒有被新增到sync node佇列中,因此,它接下來將通過自旋,等待執行緒B執行完transferForSignal方法。

執行緒A在自旋過程中會不斷地判斷節點有沒有被成功新增進sync queue,判斷的方法就是isOnSyncQueue

/**
 * Returns true if a node, always one that was initially placed on
 * a condition queue, is now waiting to reacquire on sync queue.
 * @param node the node
 * @return true if is reacquiring
 */
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}

該方法很好理解,只要waitStatus的值還為Node.CONDITION,則它一定還在condtion佇列中,自然不可能在sync裡面;而每一個呼叫了enq方法入隊的執行緒:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) { //即使這一步失敗了next.prev一定是有值的
                t.next = node; // 如果t.next有值,說明上面的compareAndSetTail方法一定成功了,則當前節點成為了新的尾節點
                return t; // 返回了當前節點的前驅節點
            }
        }
    }
}

哪怕在設定compareAndSetTail這一步失敗了,它的prev必然也是有值的,因此這兩個條件只要有一個滿足,就說明節點必然不在sync queue佇列中。

另一方面,如果node.next有值,則說明它不僅在sync queue中,並且在它後面還有別的節點,則只要它有值,該節點必然在sync queue中。如果以上都不滿足,說明這裡出現了尾部分叉(關於尾部分叉,參見這裡)的情況,我們就從尾節點向前尋找這個節點:

/**
 * Returns true if node is on sync queue by searching backwards from tail.
 * Called only when needed by isOnSyncQueue.
 * @return true if present
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

這裡當然還是有可能出現從尾部反向遍歷找不到的情況,但是不用擔心,我們還在while迴圈中,無論如何,節點最後總會入隊成功的。最終,transferAfterCancelledWait將返回false。

再回到transferAfterCancelledWait呼叫處:

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

則這裡,由於transferAfterCancelledWait返回了false,則checkInterruptWhileWaiting方法將返回REINTERRUPT,這說明我們在退出該方法時只需要再次中斷。

再回到checkInterruptWhileWaiting方法的呼叫處:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //我們在這裡!!!
            break;
    }
    //當前interruptMode=REINTERRUPT,無論這裡是否進入if體,該值不變
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

此時,interruptMode的值為REINTERRUPT,我們將直接跳出while迴圈。

接下來就和上面的情況1一樣了,我們依然還是去爭鎖,這一步依然是阻塞式的,獲取到鎖則退出,獲取不到鎖則會被掛起。

另外由於現在interruptMode的值已經為REINTERRUPT,因此無論在爭鎖的過程中是否發生過中斷interruptMode的值都還是REINTERRUPT

接著就是將節點從condition queue中剔除,與情況1不同的是,在signal方法成功將node加入到sync queue時,該節點的nextWaiter已經是null了,所以這裡這一步不需要執行。

再接下來就是報告中斷狀態了:

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

注意,這裡並沒有丟擲中斷異常,而只是將當前執行緒再中斷一次。

至此,情況2.1(被喚醒時,已經發生了中斷,但此時執行緒已經被signal過了)我們就分析完了,這裡我們簡單總結一下:

  1. 執行緒從掛起的地方被喚醒,此時既發生過中斷,又發生過signal
  2. 隨後,我們通過transferAfterCancelledWait確認了執行緒的waitStatus值已經不為Node.CONDITION,說明signal發生於中斷之前
  3. 然後,我們通過自旋的方式,等待signal方法執行完成,確保當前節點已經被成功新增到sync queue
  4. 接下來執行緒將在sync queue中以阻塞的方式獲取鎖,如果獲取不到,將會被再次掛起
  5. 最後我們通過reportInterruptAfterWait將當前執行緒再次中斷,但是不會丟擲InterruptedException
情況2.2:被喚醒時,並沒有發生中斷,但是在搶鎖的過程中發生了中斷

這種情況就比上面的情況簡單一點了,既然被喚醒時沒有發生中斷,那基本可以確信執行緒是被signal喚醒的,但是不要忘記還存在“假喚醒”這種情況,因此我們依然還是要檢測被喚醒的原因。

那麼怎麼區分到底是假喚醒還是因為是被signal喚醒了呢?

如果執行緒是因為signal而被喚醒,則由前面分析的signal方法可知,執行緒最終都會離開condition queue 進入sync queue中,所以我們只需要判斷被喚醒時,執行緒是否已經在sync queue中即可:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);  // 我們在這裡,執行緒將在這裡被喚醒
        // 由於現在沒有發生中斷,所以interruptMode目前為0
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

執行緒被喚醒時,暫時還沒有發生中斷,所以這裡interruptMode = 0, 表示沒有中斷髮生,所以我們將繼續while迴圈,這時我們將通過isOnSyncQueue方法判斷當前執行緒是否已經在sync queue中了。由於已經發生過signal了,則此時node必然已經在sync queue中了,所以isOnSyncQueue將返回true,我們將退出while迴圈。

不過這裡插一句,如果isOnSyncQueue檢測到當前節點不在sync queue中,則說明既沒有發生中斷,也沒有發生過signal,則當前執行緒是被“假喚醒”的,那麼我們將再次進入迴圈體,將執行緒掛起。

退出while迴圈後接下來還是利用acquireQueued爭鎖,因為前面沒有發生中斷,則interruptMode=0,這時,如果在爭鎖的過程中發生了中斷,則acquireQueued將返回true,則此時interruptMode將變為REINTERRUPT

接下是判斷node.nextWaiter != null,由於在呼叫signal方法時已經將節點移出了佇列,所有這個條件也不成立。

最後就是彙報中斷狀態了,此時interruptMode的值為REINTERRUPT,說明執行緒在被signal後又發生了中斷,這個中斷髮生在搶鎖的過程中,這個中斷來的太晚了,因此我們只是再次自我中斷一下。

至此,情況2.2(被喚醒時,並沒有發生中斷,但是在搶鎖的過程中發生了中斷)我們就分析完了,這種情況和2.1很像,區別就是一個是在喚醒後就被發現已經發生了中斷,一個個喚醒後沒有發生中斷,但是在搶鎖的過成中發生了中斷,但無論如何,這兩種情況都會被歸結為“中斷來的太晚了”,中斷模式為REINTERRUPT,情況2.2的總結如下:

  1. 執行緒被signal方法喚醒,此時並沒有發生過中斷
  2. 因為沒有發生過中斷,我們將從checkInterruptWhileWaiting處返回,此時interruptMode=0
  3. 接下來我們回到while迴圈中,因為signal方法保證了將節點新增到sync queue中,此時while迴圈條件不成立,迴圈退出
  4. 接下來執行緒將在sync queue中以阻塞的方式獲取,如果獲取不到鎖,將會被再次掛起
  5. 執行緒獲取到鎖返回後,我們檢測到在獲取鎖的過程中發生過中斷,並且此時interruptMode=0,這時,我們將interruptMode修改為REINTERRUPT
  6. 最後我們通過reportInterruptAfterWait將當前執行緒再次中斷,但是不會丟擲InterruptedException

這裡我們再總結以下情況2(中斷髮生時,執行緒已經被signal過了),這種情況對應於中斷髮生signal之後,我們不管這個中斷是在搶鎖之前就已經發生了還是搶鎖的過程中發生了,只要它是在signal之後發生的,我們就認為它來的太晚了,我們將忽略這個中斷。因此,從await()方法返回的時候,我們只會將當前執行緒重新中斷一下,而不會丟擲中斷異常。

情況3: 一直沒有中斷髮生

這種情況就更簡單了,它的大體流程和上面的情況2.2差不多,只是在搶鎖的過程中也沒有發生異常,則interruptMode為0,沒有發生過中斷,因此不需要彙報中斷。則執行緒就從await()方法處正常返回。

await()總結

至此,我們總算把await()方法完整的分析完了,這裡我們對整個方法做出總結:

  1. 進入await()時必須是已經持有了鎖
  2. 離開await()時同樣必須是已經持有了鎖
  3. 呼叫await()會使得當前執行緒被封裝成Node扔進條件佇列,然後釋放所持有的鎖
  4. 釋放鎖後,當前執行緒將在condition queue中被掛起,等待signal或者中斷
  5. 執行緒被喚醒後會將會離開condition queue進入sync queue中進行搶鎖
  6. 若線上程搶到鎖之前發生過中斷,則根據中斷髮生在signal之前還是之後記錄中斷模式
  7. 執行緒在搶到鎖後進行善後工作(離開condition queue, 處理中斷異常)
  8. 執行緒已經持有了鎖,從await()方法返回

await() 方法流程

在這一過程中我們尤其要關注中斷,如前面所說,中斷和signal所起到的作用都是將執行緒從condition queue中移除,加入到sync queue中去爭鎖,所不同的是,signal方法被認為是正常喚醒執行緒,中斷方法被認為是非正常喚醒執行緒,如果中斷髮生在signal之前,則我們在最終返回時,應當丟擲InterruptedException;如果中斷髮生在signal之後,我們就認為執行緒本身已經被正常喚醒了,這個中斷來的太晚了,我們直接忽略它,並在await()返回時再自我中斷一下,這種做法相當於將中斷推遲至await()返回時再發生。

awaitUninterruptibly()

在前面我們分析的await()方法中,中斷起到了和signal同樣的效果,但是中斷屬於將一個等待中的執行緒非正常喚醒,可能即使執行緒被喚醒後,也搶到了鎖,但是卻發現當前的等待條件並沒有滿足,則還是得把執行緒掛起。因此我們有時候並不希望await方法被中斷,awaitUninterruptibly()方法即實現了這個功能:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true; // 發生了中斷後執行緒依舊留在了condition queue中,將會再次被掛起
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

首先,從方法簽名上就可以看出,這個方法不會丟擲中斷異常,我們拿它和await()方法對比一下:

public final void await() throws InterruptedException {
    if (Thread.interrupted())  // 不同之處
        throw new InterruptedException(); // 不同之處
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;  // 不同之處
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  // 不同之處
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  // 不同之處
        interruptMode = REINTERRUPT;  // 不同之處
    if (node.nextWaiter != null)  // 不同之處
        unlinkCancelledWaiters(); // 不同之處
    if (interruptMode != 0) // 不同之處
        reportInterruptAfterWait(interruptMode); // 不同之處
}

由此可見,awaitUninterruptibly()全程忽略中斷,即使是當前執行緒因為中斷被喚醒,該方法也只是簡單的記錄中斷狀態,然後再次被掛起(因為並沒有並沒有任何操作將它新增到sync queue中)

要使當前執行緒離開condition queue去爭鎖,則必須是發生了signal事件。

最後,當執行緒在獲取鎖的過程中發生了中斷,該方法也是不響應,只是在最終