java 併發(五)---AbstractQueuedSynchronizer(2)
文章部分程式碼和照片來自參考資料
ConditonObject
ConditionObject 繼承 Condition 這個介面, 看一下這個介面的註解說明 :
Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.
如果Lock替換了synchronized方法和語句的使用,則Condition將替換Object監視方法的使用。
Condition 經常可以用在生產者-消費者的場景中,ArrayBlockingQueue 採用這種方式實現了生產者-消費者.
ConditionObject 是 AQS裡的一個物件,繼承Condition 介面,上一節我們提到AQS 通過同步佇列(sync queue )和 等待佇列(wait queue )還有 狀態變數(statue)進行併發控制。這節我們要講的就是在等待佇列的操作。
下面是 wait queue 和 sync queue 的圖例。
await 和 signal 方法
這兩個方法可以用下面兩種兩張圖來描述。其中await 是將節點加入到 wait queue ,然後等待喚醒。 signal 方法是從wait queue 移動到 sync queue 中,然後喚醒。
圖一. await 方法
圖二. signal 方法
Conditon 的方法實現 基於 ReetranLock 。下面原始碼分析會涉及到。
Condition 的await 方法 包括的操作有 :
- If current thread is interrupted, throw InterruptedException.
- Save lock state returned by getState.
- Invoke release with saved state as argument, throwing IllegalMonitorStateException if it fails.
- Block until signalled or interrupted.
- Reacquire by invoking specialized version of acquire with saved state as argument.
If interrupted while blocked in step 4, throw InterruptedException.
1 // 首先,這個方法是可被中斷的,不可被中斷的是另一個方法 awaitUninterruptibly() 2 // 這個方法會阻塞,直到呼叫 signal 方法(指 signal() 和 signalAll(),下同),或被中斷 3 public final void await() throws InterruptedException { 4 if (Thread.interrupted()) 5 throw new InterruptedException(); 6 // 新增到 condition 的條件佇列中 7 Node node = addConditionWaiter(); 8 // 釋放鎖,返回值是釋放鎖之前的 state 值 9 int savedState = fullyRelease(node); 10 int interruptMode = 0; 11 // 這裡退出迴圈有兩種情況,之後再仔細分析 12 // 1. isOnSyncQueue(node) 返回 true,即當前 node 已經轉移到阻塞隊列了 13 // 2. checkInterruptWhileWaiting(node) != 0 會到 break,然後退出迴圈,代表的是執行緒中斷 14 while (!isOnSyncQueue(node)) { 15 LockSupport.park(this); 16 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 17 break; 18 } 19 // 被喚醒後,將進入阻塞佇列,等待獲取鎖 20 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 21 interruptMode = REINTERRUPT; 22 if (node.nextWaiter != null) // clean up if cancelled 23 unlinkCancelledWaiters(); 24 if (interruptMode != 0) 25 reportInterruptAfterWait(interruptMode); 26 }
1 // 將當前執行緒對應的節點入隊,插入隊尾 2 private Node addConditionWaiter() { 3 Node t = lastWaiter; 4 // 如果條件佇列的最後一個節點取消了,將其清除出去 5 if (t != null && t.waitStatus != Node.CONDITION) { 6 // 這個方法會遍歷整個條件佇列,然後會將已取消的所有節點清除出佇列 7 unlinkCancelledWaiters(); 8 t = lastWaiter; 9 } 10 Node node = new Node(Thread.currentThread(), Node.CONDITION); 11 // 如果佇列為空 12 if (t == null) 13 firstWaiter = node; 14 else 15 t.nextWaiter = node; 16 lastWaiter = node; 17 return node; 18 } 19 在addWaiter 方法中,有一個 unlinkCancelledWaiters() 方法,該方法用於清除佇列中已經取消等待的節點。 20 21 當 await 的時候如果發生了取消操作(這點之後會說),或者是在節點入隊的時候,發現最後一個節點是被取消的,會呼叫一次這個方法。 22 23 // 等待佇列是一個單向連結串列,遍歷連結串列將已經取消等待的節點清除出去 24 // 純屬連結串列操作,很好理解,看不懂多看幾遍就可以了 25 private void unlinkCancelledWaiters() { 26 Node t = firstWaiter; 27 Node trail = null; 28 while (t != null) { 29 Node next = t.nextWaiter; 30 // 如果節點的狀態不是 Node.CONDITION 的話,這個節點就是被取消的 31 if (t.waitStatus != Node.CONDITION) { 32 t.nextWaiter = null; 33 if (trail == null) 34 firstWaiter = next; 35 else 36 trail.nextWaiter = next; 37 if (next == null) 38 lastWaiter = trail; 39 } 40 else 41 trail = t; 42 t = next; 43 } 44 }
ReentranLock 是可重入的,所以釋放所有的鎖。
1 // 首先,我們要先觀察到返回值 savedState 代表 release 之前的 state 值 2 // 對於最簡單的操作:先 lock.lock(),然後 condition1.await()。 3 // 那麼 state 經過這個方法由 1 變為 0,鎖釋放,此方法返回 1 4 // 相應的,如果 lock 重入了 n 次,savedState == n 5 // 如果這個方法失敗,會將節點設定為"取消"狀態,並丟擲異常 IllegalMonitorStateException 6 final int fullyRelease(Node node) { 7 boolean failed = true; 8 try { 9 int savedState = getState(); 10 // 這裡使用了當前的 state 作為 release 的引數,也就是完全釋放掉鎖,將 state 置為 0 11 if (release(savedState)) { 12 failed = false; 13 return savedState; 14 } else { 15 throw new IllegalMonitorStateException(); 16 } 17 } finally { 18 if (failed) 19 node.waitStatus = Node.CANCELLED; 20 } 21 }
1 // 在節點入條件佇列的時候,初始化時設定了 waitStatus = Node.CONDITION 2 // 前面我提到,signal 的時候需要將節點從條件佇列移到阻塞佇列, 3 // 這個方法就是判斷 node 是否已經移動到阻塞隊列了 4 final boolean isOnSyncQueue(Node node) { 5 // 移動過去的時候,node 的 waitStatus 會置為 0,這個之後在說 signal 方法的時候會說到 6 // 如果 waitStatus 還是 Node.CONDITION,也就是 -2,那肯定就是還在條件佇列中 7 // 如果 node 的前驅 prev 指向還是 null,說明肯定沒有在 阻塞佇列 8 if (node.waitStatus == Node.CONDITION || node.prev == null) 9 return false; 10 // 如果 node 已經有後繼節點 next 的時候,那肯定是在阻塞隊列了 11 if (node.next != null) 12 return true; 13 14 // 這個方法從阻塞佇列的隊尾開始從後往前遍歷找,如果找到相等的,說明在阻塞佇列,否則就是不在阻塞佇列 15 16 // 可以通過判斷 node.prev() != null 來推斷出 node 在阻塞佇列嗎?答案是:不能。 17 // 這個可以看上篇 AQS 的入隊方法,首先設定的是 node.prev 指向 tail, 18 // 然後是 CAS 操作將自己設定為新的 tail,可是這次的 CAS 是可能失敗的。 19 20 // 呼叫這個方法的時候,往往我們需要的就在隊尾的部分,所以一般都不需要完全遍歷整個佇列的 21 return findNodeFromTail(node); 22 } 23 24 // 從同步佇列的隊尾往前遍歷,如果找到,返回 true 25 private boolean findNodeFromTail(Node node) { 26 Node t = tail; 27 for (;;) { 28 if (t == node) 29 return true; 30 if (t == null) 31 return false; 32 t = t.prev; 33 } 34 } 35 }等待喚醒。
1 int interruptMode = 0; 2 while (!isOnSyncQueue(node)) { 3 // 執行緒掛起 4 LockSupport.park(this); 5 6 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 7 break; 8 }
回到前面的迴圈,isOnSyncQueue(node) 返回 false 的話,那麼進到 LockSupport.park(this); 這裡執行緒掛起。
接下來就是 signal 喚醒執行緒,轉移到阻塞佇列為了大家理解,這裡我們先看喚醒操作,因為剛剛到 LockSupport.park(this); 把執行緒掛起了,等待喚醒。喚醒操作通常由另一個執行緒來操作,就像生產者-消費者模式中,如果執行緒因為等待消費而掛起,那麼當生產者生產了一個東西后,會呼叫 signal 喚醒正在等待的執行緒來消費。
1 2 // 喚醒等待了最久的執行緒 3 // 其實就是,將這個執行緒對應的 node 從條件佇列轉移到阻塞佇列 4 public final void signal() { 5 // 呼叫 signal 方法的執行緒必須持有當前的獨佔鎖 6 if (!isHeldExclusively()) 7 throw new IllegalMonitorStateException(); 8 Node first = firstWaiter; 9 if (first != null) 10 doSignal(first); 11 } 12 13 // 從條件佇列隊頭往後遍歷,找出第一個需要轉移的 node 14 // 因為前面我們說過,有些執行緒會取消排隊,但是還在佇列中 15 private void doSignal(Node first) { 16 do { 17 // 將 firstWaiter 指向 first 節點後面的第一個 18 // 如果將隊頭移除後,後面沒有節點在等待了,那麼需要將 lastWaiter 置為 null 19 if ( (firstWaiter = first.nextWaiter) == null) 20 lastWaiter = null; 21 // 因為 first 馬上要被移到阻塞隊列了,和條件佇列的連結關係在這裡斷掉 22 first.nextWaiter = null; 23 } while (!transferForSignal(first) && 24 (first = firstWaiter) != null); 25 // 這裡 while 迴圈,如果 first 轉移不成功,那麼選擇 first 後面的第一個節點進行轉移,依此類推 26 } 27 28 // 將節點從條件佇列轉移到阻塞佇列 29 // true 代表成功轉移 30 // false 代表在 signal 之前,節點已經取消了 31 final boolean transferForSignal(Node node) { 32 33 // CAS 如果失敗,說明此 node 的 waitStatus 已不是 Node.CONDITION,說明節點已經取消, 34 // 既然已經取消,也就不需要轉移了,方法返回,轉移後面一個節點 35 // 否則,將 waitStatus 置為 0 36 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 37 return false; 38 39 // enq(node): 自旋進入阻塞佇列的隊尾 40 // 注意,這裡的返回值 p 是 node 在阻塞佇列的前驅節點 41 Node p = enq(node); 42 int ws = p.waitStatus; 43 // ws > 0 說明 node 在阻塞佇列中的前驅節點取消了等待鎖,直接喚醒 node 對應的執行緒。喚醒之後會怎麼樣,後面再解釋 44 // 如果 ws <= 0, 那麼 compareAndSetWaitStatus 將會被呼叫,上篇介紹的時候說過,節點入隊後,需要把前驅節點的狀態設為 Node.SIGNAL(-1) 45 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) 46 // 如果前驅節點取消或者 CAS 失敗,會進到這裡喚醒執行緒,之後的操作看下一節 47 LockSupport.unpark(node.thread); 48 return true; 49
接下來喚醒後檢查中斷狀態
1 int interruptMode = 0; 2 while (!isOnSyncQueue(node)) { 3 // 執行緒掛起 4 LockSupport.park(this); 5 6 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 7 break; 8 }
1 // 1. 如果在 signal 之前已經中斷,返回 THROW_IE 2 // 2. 如果是 signal 之後中斷,返回 REINTERRUPT 3 // 3. 沒有發生中斷,返回 0 4 private int checkInterruptWhileWaiting(Node node) { 5 return Thread.interrupted() ? 6 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 7 0; 8 }
1 // 只有執行緒處於中斷狀態,才會呼叫此方法 2 // 如果需要的話,將這個已經取消等待的節點轉移到阻塞佇列 3 // 返回 true:如果此執行緒在 signal 之前被取消, 4 final boolean transferAfterCancelledWait(Node node) { 5 // 用 CAS 將節點狀態設定為 0 6 // 如果這步 CAS 成功,說明是 signal 方法之前發生的中斷,因為如果 signal 先發生的話,signal 中會將 waitStatus 設定為 0 7 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 8 // 將節點放入阻塞佇列 9 // 這裡我們看到,即使中斷了,依然會轉移到阻塞佇列 10 enq(node); 11 return true; 12 } 13 14 // 到這裡是因為 CAS 失敗,肯定是因為 signal 方法已經將 waitStatus 設定為了 0 15 // signal 方法會將節點轉移到阻塞佇列,但是可能還沒完成,這邊自旋等待其完成 16 // 當然,這種事情還是比較少的吧:signal 呼叫之後,沒完成轉移之前,發生了中斷 17 while (!isOnSyncQueue(node)) 18 Thread.yield(); 19 return false; 20 }
為什麼有了sync queue 還需要 wait queue ?
ConditionObject 裡的await方法會(假如這個節點已經存在sync queue)釋放鎖移入 wait queue , signal 方法則是重新移入到 sync queue ,那麼我們就可以知道了 wait queue 的作用是臨時存放節點,移除在 sync queue 的節點(假如存在),再插入到 sync queue 的隊尾。它的作用我們可以在閱讀ArrayBlockingQueue原始碼時就可以感受到了。