【搞定Java併發程式設計】第18篇:佇列同步器AQS原始碼分析之Condition介面、等待佇列
AQS系列文章:
4、佇列同步器AQS原始碼分析之Condition介面、等待佇列
通過前面三篇關於AQS文章的學習,我們深入瞭解了AbstractQueuedSynchronizer的內部結構和一些設計理念,知道了AbstractQueuedSynchronizer內部維護了一個同步狀態和兩個排隊區,這兩個排隊區分別是同步佇列和等待佇列。
要學習條件佇列,就必須先了解Condition介面。
1、Condition介面概述
每個Java物件都擁有一組監視器方法,主要包括:wait()、wait(long timeout)、notify() 和 notifyAll()方法,這些方法與synchronized關鍵字配合,可以實現等待 / 通知模式。
Condition介面也提供了類似Object的監視器方法,與Lock配合可以實現等待 / 通知模式,但是這兩者在使用方法和功能特性上還是有差別的。
Condition定義了 等待 / 通知 的方法,當前執行緒呼叫這些方法時,需要提前獲取到Condition物件關聯的鎖。Condition物件是由Lock物件(呼叫Lock物件的newCondition()方法)創建出來的,即Condition物件是依賴Lock物件的。
Condition的使用方式比較簡單,需要注意在呼叫方法前獲取鎖,案例程式碼如下:
package zju.com.lock; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionUseCase { Lock lock = new ReentrantLock(); // 通過lock.newCondition()方法建立condition物件 Condition condition = lock.newCondition(); // 等待:conditionWait() public void conditionWait() throws InterruptedException { lock.lock(); try { condition.await(); // 當前執行緒進入等待狀態 } finally { } } // 通知:conditionSignal() public void conditionSignal() throws InterruptedException { lock.lock(); try { condition.signal(); // 喚醒一個等待在Condition上的執行緒 } finally { lock.unlock(); } } }
一般會將Condition物件作為成員變數。當呼叫await()方法後,當前執行緒會釋放鎖並在此等待,而其他執行緒呼叫Condition物件的signal()方法,通知當前執行緒後,當前執行緒才從await()方法返回,並且在返回前已經獲取了鎖。
Condition的主要方法如下表所示:
-
Condition介面的原始碼
public interface Condition {
// 響應執行緒中斷的條件等待
void await() throws InterruptedException;
// 不響應執行緒中斷的條件等待
void awaitUninterruptibly();
// 設定相對時間的條件等待(不進行自旋)
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 設定相對時間的條件等待(進行自旋)
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 設定絕對時間的條件等待
boolean awaitUntil(Date deadline) throws InterruptedException;
// 喚醒條件佇列中的頭節點
void signal();
// 喚醒條件佇列的所有節點
void signalAll();
}
Condition介面雖然定義了這麼多方法,但總共就分為兩類,以 await 開頭的是執行緒進入條件佇列等待的方法,以signal開頭的是將條件佇列中的執行緒“喚醒”的方法。這裡要注意的是,呼叫signal方法可能喚醒執行緒也可能不會喚醒執行緒,什麼時候會喚醒執行緒這得看具體情況。但是呼叫signal方法一定會將執行緒從條件佇列中移到同步佇列尾部。
await方法分為5種,分別是:響應執行緒中斷等待、不響應執行緒中斷等待、設定相對時間不自旋等待、設定相對時間自旋等待、設定絕對時間等待;
signal方法只有2種,分別是:只喚醒條件佇列頭節點和喚醒條件佇列所有節點。
2、Condition的原始碼分析
ConditionObject是同步器AQS的內部類,它實現了Condition介面。每個Condition物件都包含著一個佇列(以下稱之為等待佇列),該佇列是Condition物件實現 等待 / 通知 功能實現的關鍵。
下面將分析Condition的實現,主要包括:等待佇列、等待和通知,下面提到的Condition如果不加說明均指的是ConditionObject。
2.1、等待佇列
等待佇列是一個FIFO的佇列,在佇列中的每個節點都包含了一個執行緒引用,該執行緒就是在Condition物件上等待的執行緒,如果一個執行緒呼叫了Condition.await()方法,那麼該執行緒將會釋放鎖、構造成節點加入等待佇列並進入等待狀態。這裡的用的節點型別也是同步器的靜態內部類Node。
這裡舉個例子,便於幫助理解等待佇列:
我們拿公共廁所做比喻,同步佇列是主要的排隊區,如果公共廁所沒開放,所有想要進入廁所的人都得在這裡排隊。而等待佇列主要是為條件等待設定的,我們想象一下如果一個人通過排隊終於成功獲取鎖進入了廁所,但在方便之前發現自己沒帶手紙,碰到這種情況雖然很無奈,但是它也必須接受這個事實,這時它只好乖乖的出去先準備好手紙(進入等待佇列等待),當然在出去之前還得把鎖給釋放了好讓其他人能夠進來,在準備好了手紙(條件滿足)之後它又得重新回到同步佇列中去排隊。
當然進入等待佇列的人並不都是因為沒帶手紙,可能還有其他一些原因必須中斷操作先去等待佇列中去排隊,所以等待佇列可以有多個,按照不同的等待條件而設定不同的等待佇列。等待佇列是一條單向連結串列,Condition介面定義了等待佇列中的所有操作,AbstractQueuedSynchronizer內部的ConditionObject類實現了Condition介面。
一個Condition包含一個等待佇列,Condition擁有首節點(firstWaiter)和尾節點(lastWaiter)。當前執行緒呼叫Condition.await()方法,將會以當前執行緒構造節點,並將節點從尾部加入等待佇列。
Condition擁有尾結點的引用,而新增節點只需要將原有的尾節點nextWaiter指向它,並且更新尾節點即可。上訴節點引用更新的過程並沒有使用CAS保證,原因在於呼叫await()方法的執行緒必定是獲取了鎖的執行緒,也就是說該過程是由鎖來保證執行緒安全的。
在Object的監視器模型上,一個物件擁有一個同步佇列和等待佇列,而併發包JUC中的Lock(更確切的說是同步器)擁有一個同步佇列和多個等待佇列。
2.2、響應執行緒中斷等待
呼叫Condition的await()方法,會使當前執行緒進入等待佇列並釋放鎖,同時執行緒狀態轉變為等待狀態。當從await()方法返回時,當前執行緒一定獲取了Condition相關聯的鎖。
如果從佇列的角度看await()方法,當呼叫await()方法時,相當於同步佇列的首節點(獲取了同步狀態 / 鎖的節點)移動到Condition的等待佇列中。
下面看下ConditionObject的await()方法的原始碼:
// 響應執行緒中斷的條件等待
public final void await() throws InterruptedException {
// 如果執行緒被中斷則丟擲異常
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 將當前執行緒新增到條件佇列尾部
Node node = addConditionWaiter();
// 在進入條件等待之前先完全釋放鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 執行緒一直在while迴圈裡進行條件等待
while (!isOnSyncQueue(node)) {
// 進行條件等待的執行緒都在這裡被掛起, 執行緒被喚醒的情況有以下幾種:
// 1.同步佇列的前驅節點已取消
// 2.設定同步佇列的前驅節點的狀態為SIGNAL失敗
// 3.前驅節點釋放鎖後喚醒當前節點
LockSupport.park(this);
// 當前執行緒醒來後立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出條件佇列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
// 執行緒醒來後就會以獨佔模式獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
// 這步操作主要為防止執行緒在signal之前中斷而導致沒與條件佇列斷絕聯絡
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
// 根據中斷模式進行響應的中斷處理
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
}
當執行緒呼叫await()方法的時候,首先會將當前執行緒包裝成節點放入等待佇列的尾部。在addConditionWaiter方法中,如果發現等待佇列尾結點已取消就會呼叫unlinkCancelledWaiters方法將條件佇列所有的已取消結點清空。
這步操作是插入結點的準備工作,那麼確保了尾結點的狀態也是CONDITION之後,就會新建一個節點將當前執行緒包裝起來然後放入等待佇列尾部。注意,這個過程只是將節點新增到同步佇列尾部而沒有掛起執行緒。
下面看下釋放鎖的fullyRelease()方法的原始碼:
// 完全釋放鎖
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 獲取當前的同步狀態
int savedState = getState();
// 使用當前的同步狀態去釋放鎖
if (release(savedState)) {
failed = false;
// 如果釋放鎖成功就返回當前同步狀態
return savedState;
} else {
// 如果釋放鎖失敗就丟擲執行時異常
throw new IllegalMonitorStateException();
}
} finally {
// 保證沒有成功釋放鎖就將該節點設定為取消狀態
if (failed) {
node.waitStatus = Node.CANCELLED;
}
}
}
將當前執行緒包裝成結點新增到等待佇列尾部後,緊接著就呼叫fullyRelease()方法釋放鎖。注意,方法名為fullyRelease也就是這步操作會完全的釋放鎖,因為鎖是可重入的,所以在進行條件等待前需要將鎖全部釋放了,不然的話別人就獲取不了鎖了。如果釋放鎖失敗的話就會丟擲一個執行時異常,如果成功釋放了鎖的話就返回之前的同步狀態。
下面來看看進行條件等待的原始碼:
// 執行緒一直在while迴圈裡進行條件等待
while (!isOnSyncQueue(node)) {
// 進行條件等待的執行緒都在這裡被掛起, 執行緒被喚醒的情況有以下幾種:
// 1.同步佇列的前繼結點已取消
// 2.設定同步佇列的前繼結點的狀態為SIGNAL失敗
// 3.前繼結點釋放鎖後喚醒當前結點
LockSupport.park(this);
// 當前執行緒醒來後立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出等待佇列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
// 檢查條件等待時的執行緒中斷情況
private int checkInterruptWhileWaiting(Node node) {
// 中斷請求在signal操作之前:THROW_IE
// 中斷請求在signal操作之後:REINTERRUPT
// 期間沒有收到任何中斷請求:0
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
// 將取消條件等待的結點從等待佇列轉移到同步佇列中
final boolean transferAfterCancelledWait(Node node) {
// 如果這步CAS操作成功的話就表明中斷髮生在signal方法之前
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 狀態修改成功後就將該結點放入同步佇列尾部
enq(node);
return true;
}
// 到這裡表明CAS操作失敗, 說明中斷髮生在signal方法之後
while (!isOnSyncQueue(node)) {
// 如果sinal方法還沒有將結點轉移到同步佇列, 就通過自旋等待一下
Thread.yield();
}
return false;
}
在以上兩個操作完成了之後就會進入while迴圈,可以看到while迴圈裡面首先呼叫LockSupport.park(this)將執行緒掛起了,所以執行緒就會一直在這裡阻塞。在呼叫signal方法後僅僅只是將結點從等待佇列轉移到同步佇列中去,至於會不會喚醒執行緒需要看情況。
如果轉移節點時發現同步佇列中的前驅節點已取消,或者是更新前驅節點的狀態為SIGNAL失敗,這兩種情況都會立即喚醒執行緒,否則的話在signal方法結束時就不會去喚醒已在同步佇列中的執行緒,而是等到它的前驅節點來喚醒。
當然,執行緒阻塞在這裡除了可以呼叫signal方法喚醒之外,執行緒還可以響應中斷,如果執行緒在這裡收到中斷請求就會繼續往下執行。可以看到執行緒醒來後會馬上檢查是否是由於中斷喚醒的還是通過signal方法喚醒的,如果是因為中斷喚醒的同樣會將這個節點轉移到同步佇列中去,只不過是通過呼叫transferAfterCancelledWait方法來實現的。最後執行完這一步之後就會返回中斷情況並跳出while迴圈。
結點移出等待佇列後的操作:
// 執行緒醒來後就會以獨佔模式獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
// 這步操作主要為防止執行緒在signal之前中斷而導致沒與等待佇列斷絕聯絡
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
// 根據中斷模式進行響應的中斷處理
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
// 結束條件等待後根據中斷情況做出相應處理
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
// 如果中斷模式是THROW_IE就丟擲異常
if (interruptMode == THROW_IE) {
throw new InterruptedException();
// 如果中斷模式是REINTERRUPT就自己掛起
} else if (interruptMode == REINTERRUPT) {
selfInterrupt();
}
}
當執行緒終止了while迴圈也就是條件等待後,就會回到同步佇列中。不管是因為呼叫signal方法回去的還是因為執行緒中斷導致的,節點最終都會在同步佇列中。這時就會呼叫acquireQueued方法執行在同步佇列中獲取鎖的操作,這個方法我們在獨佔模式這一篇已經詳細的講過。
也就是說,節點從等待隊列出來後又是乖乖的走獨佔模式下獲取鎖的那一套,等這個結點再次獲得鎖之後,就會呼叫reportInterruptAfterWait方法來根據這期間的中斷情況做出相應的響應。如果中斷髮生在signal方法之前,interruptMode就為THROW_IE,再次獲得鎖後就丟擲異常;如果中斷髮生在signal方法之後,interruptMode就為REINTERRUPT,再次獲得鎖後就重新中斷。
2.3、不響應執行緒中斷等待
// 不響應執行緒中斷的條件等待
public final void awaitUninterruptibly() {
// 將當前執行緒新增到等待佇列尾部
Node node = addConditionWaiter();
// 完全釋放鎖並返回當前同步狀態
int savedState = fullyRelease(node);
boolean interrupted = false;
// 結點一直在while迴圈裡進行條件等待
while (!isOnSyncQueue(node)) {
// 等待佇列中所有的執行緒都在這裡被掛起
LockSupport.park(this);
// 執行緒醒來發現中斷並不會馬上去響應
if (Thread.interrupted()) {
interrupted = true;
}
}
if (acquireQueued(node, savedState) || interrupted) {
// 在這裡響應所有中斷請求, 滿足以下兩個條件之一就會將自己掛起
// 1.執行緒在條件等待時收到中斷請求
// 2.執行緒在acquireQueued方法裡收到中斷請求
selfInterrupt();
}
}
2.4、設定相對時間不自旋等待
// 設定定時條件等待(相對時間), 不進行自旋等待
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
// 如果執行緒被中斷則丟擲異常
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 將當前執行緒新增到等待佇列尾部
Node node = addConditionWaiter();
// 在進入條件等待之前先完全釋放鎖
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 判斷超時時間是否用完了
if (nanosTimeout <= 0L) {
// 如果已超時就需要執行取消條件等待操作
transferAfterCancelledWait(node);
break;
}
// 將當前執行緒掛起一段時間, 執行緒在這期間可能被喚醒, 也可能自己醒來
LockSupport.parkNanos(this, nanosTimeout);
// 執行緒醒來後先檢查中斷資訊
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
long now = System.nanoTime();
// 超時時間每次減去條件等待的時間
nanosTimeout -= now - lastTime;
lastTime = now;
}
// 執行緒醒來後就會以獨佔模式獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
// 由於transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這裡要再清理一遍
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
// 根據中斷模式進行響應的中斷處理
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
// 返回剩餘時間
return nanosTimeout - (System.nanoTime() - lastTime);
}
2.5、設定相對時間自旋等待
// 設定定時條件等待(相對時間), 進行自旋等待
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
if (unit == null) { throw new NullPointerException(); }
// 獲取超時時間的毫秒數
long nanosTimeout = unit.toNanos(time);
// 如果執行緒被中斷則丟擲異常
if (Thread.interrupted()) { throw new InterruptedException(); }
// 將當前執行緒新增等待佇列尾部
Node node = addConditionWaiter();
// 在進入條件等待之前先完全釋放鎖
int savedState = fullyRelease(node);
/ /獲取當前時間的毫秒數
long lastTime = System.nanoTime();
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 如果超時就需要執行取消條件等待操作
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
// 如果超時時間大於自旋時間, 就將執行緒掛起一段時間
if (nanosTimeout >= spinForTimeoutThreshold) {
LockSupport.parkNanos(this, nanosTimeout);
}
// 執行緒醒來後先檢查中斷資訊
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
long now = System.nanoTime();
// 超時時間每次減去條件等待的時間
nanosTimeout -= now - lastTime;
lastTime = now;
}
// 執行緒醒來後就會以獨佔模式獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
// 由於transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這裡要再清理一遍
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
// 根據中斷模式進行響應的中斷處理
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
// 返回是否超時標誌
return !timedout;
}
2.6、設定絕對時間等待
// 設定定時條件等待(絕對時間)
public final boolean awaitUntil(Date deadline) throws InterruptedException {
if (deadline == null) { throw new NullPointerException(); }
// 獲取絕對時間的毫秒數
long abstime = deadline.getTime();
// 如果執行緒被中斷則丟擲異常
if(Thread.interrupted()) { throw new InterruptedException(); }
// 將當前執行緒新增到等待佇列尾部
Node node = addConditionWaiter();
// 在進入條件等待之前先完全釋放鎖
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 如果超時就需要執行取消條件等待操作
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
// 將執行緒掛起一段時間, 期間執行緒可能被喚醒, 也可能到了點自己醒來
LockSupport.parkUntil(this, abstime);
// 執行緒醒來後先檢查中斷資訊
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
// 執行緒醒來後就會以獨佔模式獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
// 由於transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這裡要再清理一遍
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
// 根據中斷模式進行響應的中斷處理
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
// 返回是否超時標誌
return !timedout;
}
2.7、喚醒等待佇列中的頭節點
呼叫ConditionObject的signal()方法,將會喚醒在等待時間最長的節點(即首節點),在喚醒首節點之前,會將節點移動到同步佇列中。
// 喚醒條件佇列中的下一個結點
public final void signal() {
// 判斷當前執行緒是否持有鎖
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
// 如果等待佇列中有排隊者
if (first != null) {
// 喚醒等待 佇列中的頭結點
doSignal(first);
}
}
//喚醒條件佇列中的頭結點
private void doSignal(Node first) {
do {
//1.將firstWaiter引用向後移動一位
if ( (firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
//2.將頭結點的後繼結點引用置空
first.nextWaiter = null;
//3.將頭結點轉移到同步佇列, 轉移完成後有可能喚醒執行緒
//4.如果transferForSignal操作失敗就去喚醒下一個結點
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
//將指定結點從條件佇列轉移到同步佇列中
final boolean transferForSignal(Node node) {
//將等待狀態從CONDITION設定為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//如果更新狀態的操作失敗就直接返回false
//可能是transferAfterCancelledWait方法先將狀態改變了, 導致這步CAS操作失敗
return false;
}
//將該結點新增到同步佇列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
//出現以下情況就會喚醒當前執行緒
//1.前繼結點是取消狀態
//2.更新前繼結點的狀態為SIGNAL操作失敗
LockSupport.unpark(node.thread);
}
return true;
}
可以看到signal方法最終的核心就是去呼叫transferForSignal方法,在transferForSignal方法中首先會用CAS操作將節點的狀態從CONDITION設定為0,然後再呼叫enq方法將該節點新增到同步佇列尾部。
我們再看接下來的 if 判斷語句,這個判斷語句主要是用來判斷什麼時候會去喚醒執行緒,出現下面這兩種情況就會立即喚醒執行緒:一種是當發現前繼結點的狀態是取消狀態時,還有一種是更新前繼結點的狀態失敗時。
這兩種情況都會馬上去喚醒執行緒,否則的話就僅僅只是將節點從條件佇列中轉移到同步佇列中就完了,而不會立馬去喚醒節點中的執行緒。signalAll方法也大致類似,只不過它是去迴圈遍歷條件佇列中的所有節點,並將它們轉移到同步佇列,轉移節點的方法也還是呼叫transferForSignal方法。
2.8、喚醒等待佇列中的所有節點
// 喚醒等待佇列後面的全部節點
public final void signalAll() {
// 判斷當前執行緒是否持有鎖
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 獲取等待佇列頭節點
Node first = firstWaiter;
if (first != null) {
// 喚醒等待佇列的所有結點
doSignalAll(first);
}
}
// 喚醒等待佇列的所有結點
private void doSignalAll(Node first) {
// 先把頭節點和尾節點的引用置空
lastWaiter = firstWaiter = null;
do {
// 先獲取後繼節點的引用
Node next = first.nextWaiter;
// 把即將轉移的節點的後繼引用置空
first.nextWaiter = null;
// 將節點從條件佇列轉移到同步佇列
transferForSignal(first);
// 將引用指向下一個節點
first = next;
} while (first != null);
}
AQS系列文章:
4、佇列同步器AQS原始碼分析之Condition介面、等待佇列
參考及推薦:
1、AbstractQueuedSynchronizer原始碼分析之概要分析
2、AbstractQueuedSynchronizer原始碼分析之獨佔模式