1. 程式人生 > >Java併發系列(4)AbstractQueuedSynchronizer原始碼分析之條件佇列

Java併發系列(4)AbstractQueuedSynchronizer原始碼分析之條件佇列

通過前面三篇的分析,我們深入瞭解了AbstractQueuedSynchronizer的內部結構和一些設計理念,知道了AbstractQueuedSynchronizer內部維護了一個同步狀態和兩個排隊區,這兩個排隊區分別是同步佇列和條件佇列。我們還是拿公共廁所做比喻,同步佇列是主要的排隊區,如果公共廁所沒開放,所有想要進入廁所的人都得在這裡排隊。而條件佇列主要是為條件等待設定的,我們想象一下如果一個人通過排隊終於成功獲取鎖進入了廁所,但在方便之前發現自己沒帶手紙,碰到這種情況雖然很無奈,但是它也必須接受這個事實,這時它只好乖乖的出去先準備好手紙(進入條件佇列等待),當然在出去之前還得把鎖給釋放了好讓其他人能夠進來,在準備好了手紙(條件滿足)之後它又得重新回到同步佇列中去排隊。當然進入房間的人並不都是因為沒帶手紙,可能還有其他一些原因必須中斷操作先去條件佇列中去排隊,所以條件佇列可以有多個,依不同的等待條件而設定不同的條件佇列。條件佇列是一條單向連結串列,Condition介面定義了條件佇列中的所有操作,AbstractQueuedSynchronizer內部的ConditionObject類實現了Condition介面,下面我們看看Condition介面都定義了哪些操作。

複製程式碼
 1 public interface Condition {
 2     
 3     //響應執行緒中斷的條件等待
 4     void await() throws InterruptedException;
 5     
 6     //不響應執行緒中斷的條件等待
 7     void awaitUninterruptibly();
 8     
 9     //設定相對時間的條件等待(不進行自旋)
10     long awaitNanos(long nanosTimeout) throws InterruptedException;
11     
12     //設定相對時間的條件等待(進行自旋)
13 boolean await(long time, TimeUnit unit) throws InterruptedException; 14 15 //設定絕對時間的條件等待 16 boolean awaitUntil(Date deadline) throws InterruptedException; 17 18 //喚醒條件佇列中的頭結點 19 void signal(); 20 21 //喚醒條件佇列的所有結點 22 void signalAll(); 23 24 }
複製程式碼

Condition介面雖然定義了這麼多方法,但總共就分為兩類,以await開頭的是執行緒進入條件佇列等待的方法,以signal開頭的是將條件佇列中的執行緒“喚醒”的方法。這裡要注意的是,呼叫signal方法可能喚醒執行緒也可能不會喚醒執行緒,什麼時候會喚醒執行緒這得看情況,後面會講到,但是呼叫signal方法一定會將執行緒從條件佇列中移到同步佇列尾部。這裡為了敘述方便,我們先暫時不糾結這麼多,統一稱signal方法為喚醒條件佇列執行緒的操作。大家注意看一下,await方法分為5種,分別是響應執行緒中斷等待,不響應執行緒中斷等待,設定相對時間不自旋等待,設定相對時間自旋等待,設定絕對時間等待;signal方法只有2種,分別是隻喚醒條件佇列頭結點和喚醒條件佇列所有結點的操作。同一類的方法基本上是相通的,由於篇幅所限,我們不可能也不需要將這些方法全部仔細的講到,只需要將一個代表方法搞懂了再看其他方法就能夠觸類旁通。所以在本文中我只會細講await方法和signal方法,其他方法不細講但會貼出原始碼來以供大家參考。

1. 響應執行緒中斷的條件等待

複製程式碼
 1 //響應執行緒中斷的條件等待
 2 public final void await() throws InterruptedException {
 3     //如果執行緒被中斷則丟擲異常
 4     if (Thread.interrupted()) {
 5         throw new InterruptedException();
 6     }
 7     //將當前執行緒新增到條件佇列尾部
 8     Node node = addConditionWaiter();
 9     //在進入條件等待之前先完全釋放鎖
10     int savedState = fullyRelease(node);
11     int interruptMode = 0;
12     //執行緒一直在while迴圈裡進行條件等待
13     while (!isOnSyncQueue(node)) {
14         //進行條件等待的執行緒都在這裡被掛起, 執行緒被喚醒的情況有以下幾種:
15         //1.同步佇列的前繼結點已取消
16         //2.設定同步佇列的前繼結點的狀態為SIGNAL失敗
17         //3.前繼結點釋放鎖後喚醒當前結點
18         LockSupport.park(this);
19         //當前執行緒醒來後立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出條件佇列
20         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
21             break;
22         }
23     }
24     //執行緒醒來後就會以獨佔模式獲取鎖
25     if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
26         interruptMode = REINTERRUPT;
27     }
28     //這步操作主要為防止執行緒在signal之前中斷而導致沒與條件佇列斷絕聯絡
29     if (node.nextWaiter != null) {
30         unlinkCancelledWaiters();
31     }
32     //根據中斷模式進行響應的中斷處理
33     if (interruptMode != 0) {
34         reportInterruptAfterWait(interruptMode);
35     }
36 }
複製程式碼

當執行緒呼叫await方法的時候,首先會將當前執行緒包裝成node結點放入條件佇列尾部。在addConditionWaiter方法中,如果發現條件佇列尾結點已取消就會呼叫unlinkCancelledWaiters方法將條件佇列所有的已取消結點清空。這步操作是插入結點的準備工作,那麼確保了尾結點的狀態也是CONDITION之後,就會新建一個node結點將當前執行緒包裝起來然後放入條件佇列尾部。注意,這個過程只是將結點新增到同步佇列尾部而沒有掛起執行緒哦。

第二步:完全將鎖釋放

複製程式碼
 1 //完全釋放鎖
 2 final int fullyRelease(Node node) {
 3     boolean failed = true;
 4     try {
 5         //獲取當前的同步狀態
 6         int savedState = getState();
 7         //使用當前的同步狀態去釋放鎖
 8         if (release(savedState)) {
 9             failed = false;
10             //如果釋放鎖成功就返回當前同步狀態
11             return savedState;
12         } else {
13             //如果釋放鎖失敗就丟擲執行時異常
14             throw new IllegalMonitorStateException();
15         }
16     } finally {
17         //保證沒有成功釋放鎖就將該結點設定為取消狀態
18         if (failed) {
19             node.waitStatus = Node.CANCELLED;
20         }
21     }
22 }
複製程式碼

將當前執行緒包裝成結點新增到條件佇列尾部後,緊接著就呼叫fullyRelease方法釋放鎖。注意,方法名為fullyRelease也就這步操作會完全的釋放鎖,因為鎖是可重入的,所以在進行條件等待前需要將鎖全部釋放了,不然的話別人就獲取不了鎖了。如果釋放鎖失敗的話就會丟擲一個執行時異常,如果成功釋放了鎖的話就返回之前的同步狀態。

第三步:進行條件等待

複製程式碼
 1 //執行緒一直在while迴圈裡進行條件等待
 2 while (!isOnSyncQueue(node)) {
 3     //進行條件等待的執行緒都在這裡被掛起, 執行緒被喚醒的情況有以下幾種:
 4     //1.同步佇列的前繼結點已取消
 5     //2.設定同步佇列的前繼結點的狀態為SIGNAL失敗
 6     //3.前繼結點釋放鎖後喚醒當前結點
 7     LockSupport.park(this);
 8     //當前執行緒醒來後立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出條件佇列
 9     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
10         break;
11     }
12 }
13 
14 //檢查條件等待時的執行緒中斷情況
15 private int checkInterruptWhileWaiting(Node node) {
16     //中斷請求在signal操作之前:THROW_IE
17     //中斷請求在signal操作之後:REINTERRUPT
18     //期間沒有收到任何中斷請求:0
19     return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
20 }
21 
22 //將取消條件等待的結點從條件佇列轉移到同步佇列中
23 final boolean transferAfterCancelledWait(Node node) {
24     //如果這步CAS操作成功的話就表明中斷髮生在signal方法之前
25     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
26         //狀態修改成功後就將該結點放入同步佇列尾部
27         enq(node);
28         return true;
29     }
30     //到這裡表明CAS操作失敗, 說明中斷髮生在signal方法之後
31     while (!isOnSyncQueue(node)) {
32         //如果sinal方法還沒有將結點轉移到同步佇列, 就通過自旋等待一下
33         Thread.yield();
34     }
35     return false;
36 }
複製程式碼

在以上兩個操作完成了之後就會進入while迴圈,可以看到while迴圈裡面首先呼叫LockSupport.park(this)將執行緒掛起了,所以執行緒就會一直在這裡阻塞。在呼叫signal方法後僅僅只是將結點從條件佇列轉移到同步佇列中去,至於會不會喚醒執行緒需要看情況。如果轉移結點時發現同步佇列中的前繼結點已取消,或者是更新前繼結點的狀態為SIGNAL失敗,這兩種情況都會立即喚醒執行緒,否則的話在signal方法結束時就不會去喚醒已在同步佇列中的執行緒,而是等到它的前繼結點來喚醒。當然,執行緒阻塞在這裡除了可以呼叫signal方法喚醒之外,執行緒還可以響應中斷,如果執行緒在這裡收到中斷請求就會繼續往下執行。可以看到執行緒醒來後會馬上檢查是否是由於中斷喚醒的還是通過signal方法喚醒的,如果是因為中斷喚醒的同樣會將這個結點轉移到同步佇列中去,只不過是通過呼叫transferAfterCancelledWait方法來實現的。最後執行完這一步之後就會返回中斷情況並跳出while迴圈。

第四步:結點移出條件佇列後的操作

複製程式碼
 1 //執行緒醒來後就會以獨佔模式獲取鎖
 2 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
 3     interruptMode = REINTERRUPT;
 4 }
 5 //這步操作主要為防止執行緒在signal之前中斷而導致沒與條件佇列斷絕聯絡
 6 if (node.nextWaiter != null) {
 7     unlinkCancelledWaiters();
 8 }
 9 //根據中斷模式進行響應的中斷處理
10 if (interruptMode != 0) {
11     reportInterruptAfterWait(interruptMode);
12 }
13 
14 //結束條件等待後根據中斷情況做出相應處理
15 private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
16     //如果中斷模式是THROW_IE就丟擲異常
17     if (interruptMode == THROW_IE) {
18         throw new InterruptedException();
19     //如果中斷模式是REINTERRUPT就自己掛起
20     } else if (interruptMode == REINTERRUPT) {
21         selfInterrupt();
22     }
23 }
複製程式碼

當執行緒終止了while迴圈也就是條件等待後,就會回到同步佇列中。不管是因為呼叫signal方法回去的還是因為執行緒中斷導致的,結點最終都會在同步佇列中。這時就會呼叫acquireQueued方法執行在同步佇列中獲取鎖的操作,這個方法我們在獨佔模式這一篇已經詳細的講過。也就是說,結點從條件隊列出來後又是乖乖的走獨佔模式下獲取鎖的那一套,等這個結點再次獲得鎖之後,就會呼叫reportInterruptAfterWait方法來根據這期間的中斷情況做出相應的響應。如果中斷髮生在signal方法之前,interruptMode就為THROW_IE,再次獲得鎖後就丟擲異常;如果中斷髮生在signal方法之後,interruptMode就為REINTERRUPT,再次獲得鎖後就重新中斷。

2.不響應執行緒中斷的條件等待

複製程式碼
 1 //不響應執行緒中斷的條件等待
 2 public final void awaitUninterruptibly() {
 3     //將當前執行緒新增到條件佇列尾部
 4     Node node = addConditionWaiter();
 5     //完全釋放鎖並返回當前同步狀態
 6     int savedState = fullyRelease(node);
 7     boolean interrupted = false;
 8     //結點一直在while迴圈裡進行條件等待
 9     while (!isOnSyncQueue(node)) {
10         //條件佇列中所有的執行緒都在這裡被掛起
11         LockSupport.park(this);
12         //執行緒醒來發現中斷並不會馬上去響應
13         if (Thread.interrupted()) {
14             interrupted = true;
15         }
16     }
17     if (acquireQueued(node, savedState) || interrupted) {
18         //在這裡響應所有中斷請求, 滿足以下兩個條件之一就會將自己掛起
19         //1.執行緒在條件等待時收到中斷請求
20         //2.執行緒在acquireQueued方法裡收到中斷請求
21         selfInterrupt();
22     }
23 }
複製程式碼

3.設定相對時間的條件等待(不進行自旋)

複製程式碼
 1 //設定定時條件等待(相對時間), 不進行自旋等待
 2 public final long awaitNanos(long nanosTimeout) throws InterruptedException {
 3     //如果執行緒被中斷則丟擲異常
 4     if (Thread.interrupted()) {
 5         throw new InterruptedException();
 6     }
 7     //將當前執行緒新增到條件佇列尾部
 8     Node node = addConditionWaiter();
 9     //在進入條件等待之前先完全釋放鎖
10     int savedState = fullyRelease(node);
11     long lastTime = System.nanoTime();
12     int interruptMode = 0;
13     while (!isOnSyncQueue(node)) {
14         //判斷超時時間是否用完了
15         if (nanosTimeout <= 0L) {
16             //如果已超時就需要執行取消條件等待操作
17             transferAfterCancelledWait(node);
18             break;
19         }
20         //將當前執行緒掛起一段時間, 執行緒在這期間可能被喚醒, 也可能自己醒來
21         LockSupport.parkNanos(this, nanosTimeout);
22         //執行緒醒來後先檢查中斷資訊
23         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
24             break;
25         }
26         long now = System.nanoTime();
27         //超時時間每次減去條件等待的時間
28         nanosTimeout -= now - lastTime;
29         lastTime = now;
30 <