1. 程式人生 > >【搞定Java併發程式設計】第18篇:佇列同步器AQS原始碼分析之Condition介面、等待佇列

【搞定Java併發程式設計】第18篇:佇列同步器AQS原始碼分析之Condition介面、等待佇列

AQS系列文章:

1、佇列同步器AQS原始碼分析之概要分析

2、佇列同步器AQS原始碼分析之獨佔模式

3、佇列同步器AQS原始碼分析之共享模式

4、佇列同步器AQS原始碼分析之Condition介面、等待佇列


通過前面三篇關於AQS文章的學習,我們深入瞭解了AbstractQueuedSynchronizer的內部結構和一些設計理念,知道了AbstractQueuedSynchronizer內部維護了一個同步狀態和兩個排隊區,這兩個排隊區分別是同步佇列等待佇列

要學習條件佇列,就必須先了解Condition介面。

1、Condition介面概述

每個Java物件都擁有一組監視器方法,主要包括:wait()、wait(long  timeout)、notify() 和 notifyAll()方法,這些方法與synchronized關鍵字配合,可以實現等待 / 通知模式。

Condition介面也提供了類似Object的監視器方法,與Lock配合可以實現等待 / 通知模式,但是這兩者在使用方法和功能特性上還是有差別的。

Object的監視器方法與Condition介面的對比

Condition定義了 等待 / 通知 的方法,當前執行緒呼叫這些方法時,需要提前獲取到Condition物件關聯的鎖。Condition物件是由Lock物件(呼叫Lock物件的newCondition()方法)創建出來的,即Condition物件是依賴Lock物件的。

Condition的使用方式比較簡單,需要注意在呼叫方法前獲取鎖,案例程式碼如下:

package zju.com.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionUseCase {

	Lock lock = new ReentrantLock();
	// 通過lock.newCondition()方法建立condition物件
	Condition condition = lock.newCondition();

	// 等待:conditionWait()
	public void conditionWait() throws InterruptedException {
		lock.lock();
		try {
			condition.await();   // 當前執行緒進入等待狀態
		} finally {
		}
	}

	// 通知:conditionSignal()
	public void conditionSignal() throws InterruptedException {
		lock.lock();
		try {
			condition.signal();   // 喚醒一個等待在Condition上的執行緒
		} finally {
			lock.unlock();
		}
	}
}

一般會將Condition物件作為成員變數。當呼叫await()方法後,當前執行緒會釋放鎖並在此等待,而其他執行緒呼叫Condition物件的signal()方法,通知當前執行緒後,當前執行緒才從await()方法返回,並且在返回前已經獲取了鎖。

Condition的主要方法如下表所示:

Condition的主要方法及其描述
  • Condition介面的原始碼

public interface Condition {

    // 響應執行緒中斷的條件等待
    void await() throws InterruptedException;

    // 不響應執行緒中斷的條件等待
    void awaitUninterruptibly();

    // 設定相對時間的條件等待(不進行自旋)
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    // 設定相對時間的條件等待(進行自旋)
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    // 設定絕對時間的條件等待
    boolean awaitUntil(Date deadline) throws InterruptedException;

    // 喚醒條件佇列中的頭節點
    void signal();

    // 喚醒條件佇列的所有節點
    void signalAll(); 
}

 Condition介面雖然定義了這麼多方法,但總共就分為兩類,以 await 開頭的是執行緒進入條件佇列等待的方法,以signal開頭的是將條件佇列中的執行緒“喚醒”的方法。這裡要注意的是,呼叫signal方法可能喚醒執行緒也可能不會喚醒執行緒,什麼時候會喚醒執行緒這得看具體情況。但是呼叫signal方法一定會將執行緒從條件佇列中移到同步佇列尾部。

await方法分為5種,分別是:響應執行緒中斷等待、不響應執行緒中斷等待、設定相對時間不自旋等待、設定相對時間自旋等待、設定絕對時間等待;

signal方法只有2種,分別是:只喚醒條件佇列頭節點和喚醒條件佇列所有節點。


2、Condition的原始碼分析

ConditionObject是同步器AQS的內部類,它實現了Condition介面。每個Condition物件都包含著一個佇列(以下稱之為等待佇列),該佇列是Condition物件實現 等待 / 通知 功能實現的關鍵

下面將分析Condition的實現,主要包括:等待佇列、等待和通知,下面提到的Condition如果不加說明均指的是ConditionObject。

2.1、等待佇列

等待佇列是一個FIFO的佇列,在佇列中的每個節點都包含了一個執行緒引用,該執行緒就是在Condition物件上等待的執行緒,如果一個執行緒呼叫了Condition.await()方法,那麼該執行緒將會釋放鎖、構造成節點加入等待佇列並進入等待狀態。這裡的用的節點型別也是同步器的靜態內部類Node。

這裡舉個例子,便於幫助理解等待佇列:

我們拿公共廁所做比喻,同步佇列是主要的排隊區,如果公共廁所沒開放,所有想要進入廁所的人都得在這裡排隊。而等待佇列主要是為條件等待設定的,我們想象一下如果一個人通過排隊終於成功獲取鎖進入了廁所,但在方便之前發現自己沒帶手紙,碰到這種情況雖然很無奈,但是它也必須接受這個事實,這時它只好乖乖的出去先準備好手紙(進入等待佇列等待),當然在出去之前還得把鎖給釋放了好讓其他人能夠進來,在準備好了手紙(條件滿足)之後它又得重新回到同步佇列中去排隊。

當然進入等待佇列的人並不都是因為沒帶手紙,可能還有其他一些原因必須中斷操作先去等待佇列中去排隊,所以等待佇列可以有多個,按照不同的等待條件而設定不同的等待佇列。等待佇列是一條單向連結串列,Condition介面定義了等待佇列中的所有操作,AbstractQueuedSynchronizer內部的ConditionObject類實現了Condition介面。

一個Condition包含一個等待佇列,Condition擁有首節點(firstWaiter)和尾節點(lastWaiter)。當前執行緒呼叫Condition.await()方法,將會以當前執行緒構造節點,並將節點從尾部加入等待佇列。

等待佇列的基本結構

Condition擁有尾結點的引用,而新增節點只需要將原有的尾節點nextWaiter指向它,並且更新尾節點即可。上訴節點引用更新的過程並沒有使用CAS保證,原因在於呼叫await()方法的執行緒必定是獲取了鎖的執行緒,也就是說該過程是由鎖來保證執行緒安全的。

在Object的監視器模型上,一個物件擁有一個同步佇列和等待佇列,而併發包JUC中的Lock(更確切的說是同步器)擁有一個同步佇列和多個等待佇列。

同步佇列與等待佇列

2.2、響應執行緒中斷等待

呼叫Condition的await()方法,會使當前執行緒進入等待佇列並釋放鎖,同時執行緒狀態轉變為等待狀態。當從await()方法返回時,當前執行緒一定獲取了Condition相關聯的鎖。

如果從佇列的角度看await()方法,當呼叫await()方法時,相當於同步佇列的首節點(獲取了同步狀態 / 鎖的節點)移動到Condition的等待佇列中。

下面看下ConditionObject的await()方法的原始碼:

// 響應執行緒中斷的條件等待
public final void await() throws InterruptedException {
    // 如果執行緒被中斷則丟擲異常
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    // 將當前執行緒新增到條件佇列尾部
    Node node = addConditionWaiter();
    // 在進入條件等待之前先完全釋放鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 執行緒一直在while迴圈裡進行條件等待
    while (!isOnSyncQueue(node)) {
        // 進行條件等待的執行緒都在這裡被掛起, 執行緒被喚醒的情況有以下幾種:
        // 1.同步佇列的前驅節點已取消
        // 2.設定同步佇列的前驅節點的狀態為SIGNAL失敗
        // 3.前驅節點釋放鎖後喚醒當前節點
        LockSupport.park(this);
        // 當前執行緒醒來後立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出條件佇列
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
            break;
        }
    }
    // 執行緒醒來後就會以獨佔模式獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
        interruptMode = REINTERRUPT;
    }
    // 這步操作主要為防止執行緒在signal之前中斷而導致沒與條件佇列斷絕聯絡
    if (node.nextWaiter != null) {
        unlinkCancelledWaiters();
    }
    // 根據中斷模式進行響應的中斷處理
    if (interruptMode != 0) {
        reportInterruptAfterWait(interruptMode);
    }
}

當執行緒呼叫await()方法的時候,首先會將當前執行緒包裝成節點放入等待佇列的尾部。在addConditionWaiter方法中,如果發現等待佇列尾結點已取消就會呼叫unlinkCancelledWaiters方法將條件佇列所有的已取消結點清空。

這步操作是插入結點的準備工作,那麼確保了尾結點的狀態也是CONDITION之後,就會新建一個節點將當前執行緒包裝起來然後放入等待佇列尾部。注意,這個過程只是將節點新增到同步佇列尾部而沒有掛起執行緒。

下面看下釋放鎖的fullyRelease()方法的原始碼:

// 完全釋放鎖
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 獲取當前的同步狀態
        int savedState = getState();
        // 使用當前的同步狀態去釋放鎖
        if (release(savedState)) {
            failed = false;
            // 如果釋放鎖成功就返回當前同步狀態
            return savedState;
        } else {
            // 如果釋放鎖失敗就丟擲執行時異常
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 保證沒有成功釋放鎖就將該節點設定為取消狀態
        if (failed) {
            node.waitStatus = Node.CANCELLED;
        }
    }
}

將當前執行緒包裝成結點新增到等待佇列尾部後,緊接著就呼叫fullyRelease()方法釋放鎖。注意,方法名為fullyRelease也就是這步操作會完全的釋放鎖,因為鎖是可重入的,所以在進行條件等待前需要將鎖全部釋放了,不然的話別人就獲取不了鎖了。如果釋放鎖失敗的話就會丟擲一個執行時異常,如果成功釋放了鎖的話就返回之前的同步狀態。

下面來看看進行條件等待的原始碼:

// 執行緒一直在while迴圈裡進行條件等待
while (!isOnSyncQueue(node)) {
    // 進行條件等待的執行緒都在這裡被掛起, 執行緒被喚醒的情況有以下幾種:
    // 1.同步佇列的前繼結點已取消
    // 2.設定同步佇列的前繼結點的狀態為SIGNAL失敗
    // 3.前繼結點釋放鎖後喚醒當前結點
    LockSupport.park(this);
    // 當前執行緒醒來後立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出等待佇列
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
        break;
    }
}

// 檢查條件等待時的執行緒中斷情況
private int checkInterruptWhileWaiting(Node node) {
    // 中斷請求在signal操作之前:THROW_IE
    // 中斷請求在signal操作之後:REINTERRUPT
    // 期間沒有收到任何中斷請求:0
    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

// 將取消條件等待的結點從等待佇列轉移到同步佇列中
final boolean transferAfterCancelledWait(Node node) {
    // 如果這步CAS操作成功的話就表明中斷髮生在signal方法之前
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 狀態修改成功後就將該結點放入同步佇列尾部
        enq(node);
        return true;
    }
    // 到這裡表明CAS操作失敗, 說明中斷髮生在signal方法之後
    while (!isOnSyncQueue(node)) {
        // 如果sinal方法還沒有將結點轉移到同步佇列, 就通過自旋等待一下
        Thread.yield();
    }
    return false;
}

在以上兩個操作完成了之後就會進入while迴圈,可以看到while迴圈裡面首先呼叫LockSupport.park(this)將執行緒掛起了,所以執行緒就會一直在這裡阻塞。在呼叫signal方法後僅僅只是將結點從等待佇列轉移到同步佇列中去,至於會不會喚醒執行緒需要看情況。

如果轉移節點時發現同步佇列中的前驅節點已取消,或者是更新前驅節點的狀態為SIGNAL失敗,這兩種情況都會立即喚醒執行緒,否則的話在signal方法結束時就不會去喚醒已在同步佇列中的執行緒,而是等到它的前驅節點來喚醒。

當然,執行緒阻塞在這裡除了可以呼叫signal方法喚醒之外,執行緒還可以響應中斷,如果執行緒在這裡收到中斷請求就會繼續往下執行。可以看到執行緒醒來後會馬上檢查是否是由於中斷喚醒的還是通過signal方法喚醒的,如果是因為中斷喚醒的同樣會將這個節點轉移到同步佇列中去,只不過是通過呼叫transferAfterCancelledWait方法來實現的。最後執行完這一步之後就會返回中斷情況並跳出while迴圈。

結點移出等待佇列後的操作:

// 執行緒醒來後就會以獨佔模式獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
    interruptMode = REINTERRUPT;
}
// 這步操作主要為防止執行緒在signal之前中斷而導致沒與等待佇列斷絕聯絡
if (node.nextWaiter != null) {
    unlinkCancelledWaiters();
}
// 根據中斷模式進行響應的中斷處理
if (interruptMode != 0) {
    reportInterruptAfterWait(interruptMode);
}
 
// 結束條件等待後根據中斷情況做出相應處理
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    // 如果中斷模式是THROW_IE就丟擲異常
    if (interruptMode == THROW_IE) {
        throw new InterruptedException();
    // 如果中斷模式是REINTERRUPT就自己掛起
    } else if (interruptMode == REINTERRUPT) {
        selfInterrupt();
    }
}

當執行緒終止了while迴圈也就是條件等待後,就會回到同步佇列中。不管是因為呼叫signal方法回去的還是因為執行緒中斷導致的,節點最終都會在同步佇列中。這時就會呼叫acquireQueued方法執行在同步佇列中獲取鎖的操作,這個方法我們在獨佔模式這一篇已經詳細的講過。

也就是說,節點從等待隊列出來後又是乖乖的走獨佔模式下獲取鎖的那一套,等這個結點再次獲得鎖之後,就會呼叫reportInterruptAfterWait方法來根據這期間的中斷情況做出相應的響應。如果中斷髮生在signal方法之前,interruptMode就為THROW_IE,再次獲得鎖後就丟擲異常;如果中斷髮生在signal方法之後,interruptMode就為REINTERRUPT,再次獲得鎖後就重新中斷。

2.3、不響應執行緒中斷等待

// 不響應執行緒中斷的條件等待
public final void awaitUninterruptibly() {
    // 將當前執行緒新增到等待佇列尾部
    Node node = addConditionWaiter();
    // 完全釋放鎖並返回當前同步狀態
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    // 結點一直在while迴圈裡進行條件等待
    while (!isOnSyncQueue(node)) {
        // 等待佇列中所有的執行緒都在這裡被掛起
        LockSupport.park(this);
        // 執行緒醒來發現中斷並不會馬上去響應
        if (Thread.interrupted()) {
            interrupted = true;
        }
    }
    if (acquireQueued(node, savedState) || interrupted) {
        // 在這裡響應所有中斷請求, 滿足以下兩個條件之一就會將自己掛起
        // 1.執行緒在條件等待時收到中斷請求
        // 2.執行緒在acquireQueued方法裡收到中斷請求
        selfInterrupt();
    }
}

2.4、設定相對時間不自旋等待

// 設定定時條件等待(相對時間), 不進行自旋等待
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    // 如果執行緒被中斷則丟擲異常
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    // 將當前執行緒新增到等待佇列尾部
    Node node = addConditionWaiter();
    // 在進入條件等待之前先完全釋放鎖
    int savedState = fullyRelease(node);
    long lastTime = System.nanoTime();
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 判斷超時時間是否用完了
        if (nanosTimeout <= 0L) {
            // 如果已超時就需要執行取消條件等待操作
            transferAfterCancelledWait(node);
            break;
        }
        // 將當前執行緒掛起一段時間, 執行緒在這期間可能被喚醒, 也可能自己醒來
        LockSupport.parkNanos(this, nanosTimeout);
        // 執行緒醒來後先檢查中斷資訊
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
            break;
        }
        long now = System.nanoTime();
        // 超時時間每次減去條件等待的時間
        nanosTimeout -= now - lastTime;
        lastTime = now;
    }
    // 執行緒醒來後就會以獨佔模式獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
        interruptMode = REINTERRUPT;
    }
    // 由於transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這裡要再清理一遍
    if (node.nextWaiter != null) {
        unlinkCancelledWaiters();
    }
    // 根據中斷模式進行響應的中斷處理
    if (interruptMode != 0) {
        reportInterruptAfterWait(interruptMode);
    }
    // 返回剩餘時間
    return nanosTimeout - (System.nanoTime() - lastTime);
}

2.5、設定相對時間自旋等待

// 設定定時條件等待(相對時間), 進行自旋等待
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
    if (unit == null) { throw new NullPointerException(); }
    // 獲取超時時間的毫秒數
    long nanosTimeout = unit.toNanos(time);
    // 如果執行緒被中斷則丟擲異常
    if (Thread.interrupted()) { throw new InterruptedException(); }
    // 將當前執行緒新增等待佇列尾部
    Node node = addConditionWaiter();
    // 在進入條件等待之前先完全釋放鎖
    int savedState = fullyRelease(node);
    / /獲取當前時間的毫秒數
    long lastTime = System.nanoTime();
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 如果超時就需要執行取消條件等待操作
        if (nanosTimeout <= 0L) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        // 如果超時時間大於自旋時間, 就將執行緒掛起一段時間
        if (nanosTimeout >= spinForTimeoutThreshold) {
            LockSupport.parkNanos(this, nanosTimeout);
        }
        // 執行緒醒來後先檢查中斷資訊
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
            break;
        }
        long now = System.nanoTime();
        // 超時時間每次減去條件等待的時間
        nanosTimeout -= now - lastTime;
        lastTime = now;
    }
    // 執行緒醒來後就會以獨佔模式獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
        interruptMode = REINTERRUPT;
    }
    // 由於transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這裡要再清理一遍
    if (node.nextWaiter != null) {
        unlinkCancelledWaiters();
    }
    // 根據中斷模式進行響應的中斷處理
    if (interruptMode != 0) {
        reportInterruptAfterWait(interruptMode);
    }
    // 返回是否超時標誌
    return !timedout;
}

2.6、設定絕對時間等待

// 設定定時條件等待(絕對時間)
public final boolean awaitUntil(Date deadline) throws InterruptedException {
    if (deadline == null) { throw new NullPointerException(); } 
    // 獲取絕對時間的毫秒數
    long abstime = deadline.getTime();
    // 如果執行緒被中斷則丟擲異常
    if(Thread.interrupted()) { throw new InterruptedException(); }
    // 將當前執行緒新增到等待佇列尾部
    Node node = addConditionWaiter();
    // 在進入條件等待之前先完全釋放鎖
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 如果超時就需要執行取消條件等待操作
        if (System.currentTimeMillis() > abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        // 將執行緒掛起一段時間, 期間執行緒可能被喚醒, 也可能到了點自己醒來
        LockSupport.parkUntil(this, abstime);
        // 執行緒醒來後先檢查中斷資訊
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
            break;
        }
    }
    // 執行緒醒來後就會以獨佔模式獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
        interruptMode = REINTERRUPT;
    }
    // 由於transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這裡要再清理一遍
    if (node.nextWaiter != null) {
        unlinkCancelledWaiters();
    }
    // 根據中斷模式進行響應的中斷處理
    if (interruptMode != 0) {
        reportInterruptAfterWait(interruptMode);
    }
    // 返回是否超時標誌
    return !timedout;
}

2.7、喚醒等待佇列中的頭節點

呼叫ConditionObject的signal()方法,將會喚醒在等待時間最長的節點(即首節點),在喚醒首節點之前,會將節點移動到同步佇列中。

// 喚醒條件佇列中的下一個結點
public final void signal() {
    // 判斷當前執行緒是否持有鎖
    if (!isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    }
    Node first = firstWaiter;
    // 如果等待佇列中有排隊者
    if (first != null) {
        // 喚醒等待 佇列中的頭結點
        doSignal(first);
    }
}

//喚醒條件佇列中的頭結點
private void doSignal(Node first) {
    do {
        //1.將firstWaiter引用向後移動一位
        if ( (firstWaiter = first.nextWaiter) == null) {
            lastWaiter = null;
        }
        //2.將頭結點的後繼結點引用置空
        first.nextWaiter = null;
        //3.將頭結點轉移到同步佇列, 轉移完成後有可能喚醒執行緒
        //4.如果transferForSignal操作失敗就去喚醒下一個結點
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

//將指定結點從條件佇列轉移到同步佇列中
final boolean transferForSignal(Node node) {
    //將等待狀態從CONDITION設定為0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //如果更新狀態的操作失敗就直接返回false
        //可能是transferAfterCancelledWait方法先將狀態改變了, 導致這步CAS操作失敗
        return false;
    }
    //將該結點新增到同步佇列尾部
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
        //出現以下情況就會喚醒當前執行緒
        //1.前繼結點是取消狀態
        //2.更新前繼結點的狀態為SIGNAL操作失敗
        LockSupport.unpark(node.thread);
    }
    return true;
}

可以看到signal方法最終的核心就是去呼叫transferForSignal方法,在transferForSignal方法中首先會用CAS操作將節點的狀態從CONDITION設定為0,然後再呼叫enq方法將該節點新增到同步佇列尾部。

我們再看接下來的 if 判斷語句,這個判斷語句主要是用來判斷什麼時候會去喚醒執行緒,出現下面這兩種情況就會立即喚醒執行緒:一種是當發現前繼結點的狀態是取消狀態時,還有一種是更新前繼結點的狀態失敗時。

這兩種情況都會馬上去喚醒執行緒,否則的話就僅僅只是將節點從條件佇列中轉移到同步佇列中就完了,而不會立馬去喚醒節點中的執行緒。signalAll方法也大致類似,只不過它是去迴圈遍歷條件佇列中的所有節點,並將它們轉移到同步佇列,轉移節點的方法也還是呼叫transferForSignal方法。

節點從等待佇列移動到同步佇列

2.8、喚醒等待佇列中的所有節點

// 喚醒等待佇列後面的全部節點
public final void signalAll() {
    // 判斷當前執行緒是否持有鎖
    if (!isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    }
    // 獲取等待佇列頭節點
    Node first = firstWaiter;
    if (first != null) {
        // 喚醒等待佇列的所有結點
        doSignalAll(first);
    }
}

// 喚醒等待佇列的所有結點
private void doSignalAll(Node first) {
    // 先把頭節點和尾節點的引用置空
    lastWaiter = firstWaiter = null;
    do {
        // 先獲取後繼節點的引用
        Node next = first.nextWaiter;
        // 把即將轉移的節點的後繼引用置空
        first.nextWaiter = null;
        // 將節點從條件佇列轉移到同步佇列
        transferForSignal(first);
        // 將引用指向下一個節點
        first = next;
    } while (first != null);
}

AQS系列文章:

1、佇列同步器AQS原始碼分析之概要分析

2、佇列同步器AQS原始碼分析之獨佔模式

3、佇列同步器AQS原始碼分析之共享模式

4、佇列同步器AQS原始碼分析之Condition介面、等待佇列

參考及推薦:

1、AbstractQueuedSynchronizer原始碼分析之概要分析

2、AbstractQueuedSynchronizer原始碼分析之獨佔模式

3、AbstractQueuedSynchronizer原始碼分析之共享模式

4、AbstractQueuedSynchronizer原始碼分析之條件佇列