1. 程式人生 > >java 併發(五)---AbstractQueuedSynchronizer(2)

java 併發(五)---AbstractQueuedSynchronizer(2)

 

       文章部分程式碼和照片來自參考資料

 

ConditonObject

         ConditionObject 繼承 Condition 這個介面, 看一下這個介面的註解說明 :

 

Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

       如果Lock替換了synchronized方法和語句的使用,則Condition將替換Object監視方法的使用。

 

         Condition 經常可以用在生產者-消費者的場景中,ArrayBlockingQueue 採用這種方式實現了生產者-消費者.

         ConditionObject 是 AQS裡的一個物件,繼承Condition 介面,上一節我們提到AQS 通過同步佇列(sync queue )和 等待佇列(wait queue )還有 狀態變數(statue)進行併發控制。這節我們要講的就是在等待佇列的操作。

         下面是 wait queue 和 sync queue 的圖例。

 

wait_queue

 

await 和 signal 方法

 

      程式碼分析來自於 一行一行原始碼分析清楚 AbstractQueuedSynchronizer (二)

         

        這兩個方法可以用下面兩種兩張圖來描述。其中await 是將節點加入到 wait queue ,然後等待喚醒。 signal 方法是從wait queue 移動到 sync queue 中,然後喚醒。

       

        wait_queue_await      

                                                                            圖一. await 方法

 

wait_queue_signal

                                                                            圖二. signal 方法

 

              Conditon 的方法實現 基於 ReetranLock 。下面原始碼分析會涉及到。

              Condition 的await 方法 包括的操作有 :

 

  • If current thread is interrupted, throw InterruptedException.
  • Save lock state returned by getState.
  • Invoke release with saved state as argument, throwing IllegalMonitorStateException if it fails.
  • Block until signalled or interrupted.
  • Reacquire by invoking specialized version of acquire with saved state as argument.
    If interrupted while blocked in step 4, throw InterruptedException.

 

  1 // 首先,這個方法是可被中斷的,不可被中斷的是另一個方法 awaitUninterruptibly()
  2 // 這個方法會阻塞,直到呼叫 signal 方法(指 signal() 和 signalAll(),下同),或被中斷
  3 public final void await() throws InterruptedException {
  4     if (Thread.interrupted())
  5         throw new InterruptedException();
  6     // 新增到 condition 的條件佇列中
  7     Node node = addConditionWaiter();
  8     // 釋放鎖,返回值是釋放鎖之前的 state 值
  9     int savedState = fullyRelease(node);
 10     int interruptMode = 0;
 11     // 這裡退出迴圈有兩種情況,之後再仔細分析
 12     // 1. isOnSyncQueue(node) 返回 true,即當前 node 已經轉移到阻塞隊列了
 13     // 2. checkInterruptWhileWaiting(node) != 0 會到 break,然後退出迴圈,代表的是執行緒中斷
 14     while (!isOnSyncQueue(node)) {
 15         LockSupport.park(this);
 16         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 17             break;
 18     }
 19     // 被喚醒後,將進入阻塞佇列,等待獲取鎖
 20     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 21         interruptMode = REINTERRUPT;
 22     if (node.nextWaiter != null) // clean up if cancelled
 23         unlinkCancelledWaiters();
 24     if (interruptMode != 0)
 25         reportInterruptAfterWait(interruptMode);
 26 }

       

  1 // 將當前執行緒對應的節點入隊,插入隊尾
  2 private Node addConditionWaiter() {
  3     Node t = lastWaiter;
  4     // 如果條件佇列的最後一個節點取消了,將其清除出去
  5     if (t != null && t.waitStatus != Node.CONDITION) {
  6         // 這個方法會遍歷整個條件佇列,然後會將已取消的所有節點清除出佇列
  7         unlinkCancelledWaiters();
  8         t = lastWaiter;
  9     }
 10     Node node = new Node(Thread.currentThread(), Node.CONDITION);
 11     // 如果佇列為空
 12     if (t == null)
 13         firstWaiter = node;
 14     else
 15         t.nextWaiter = node;
 16     lastWaiter = node;
 17     return node;
 18 }
 19 在addWaiter 方法中,有一個 unlinkCancelledWaiters() 方法,該方法用於清除佇列中已經取消等待的節點。
 20 
 21 當 await 的時候如果發生了取消操作(這點之後會說),或者是在節點入隊的時候,發現最後一個節點是被取消的,會呼叫一次這個方法。
 22 
 23 // 等待佇列是一個單向連結串列,遍歷連結串列將已經取消等待的節點清除出去
 24 // 純屬連結串列操作,很好理解,看不懂多看幾遍就可以了
 25 private void unlinkCancelledWaiters() {
 26     Node t = firstWaiter;
 27     Node trail = null;
 28     while (t != null) {
 29         Node next = t.nextWaiter;
 30         // 如果節點的狀態不是 Node.CONDITION 的話,這個節點就是被取消的
 31         if (t.waitStatus != Node.CONDITION) {
 32             t.nextWaiter = null;
 33             if (trail == null)
 34                 firstWaiter = next;
 35             else
 36                 trail.nextWaiter = next;
 37             if (next == null)
 38                 lastWaiter = trail;
 39         }
 40         else
 41             trail = t;
 42         t = next;
 43     }
 44 }

          ReentranLock 是可重入的,所以釋放所有的鎖。

  1 // 首先,我們要先觀察到返回值 savedState 代表 release 之前的 state 值
  2 // 對於最簡單的操作:先 lock.lock(),然後 condition1.await()。
  3 //         那麼 state 經過這個方法由 1 變為 0,鎖釋放,此方法返回 1
  4 //         相應的,如果 lock 重入了 n 次,savedState == n
  5 // 如果這個方法失敗,會將節點設定為"取消"狀態,並丟擲異常 IllegalMonitorStateException
  6 final int fullyRelease(Node node) {
  7     boolean failed = true;
  8     try {
  9         int savedState = getState();
 10         // 這裡使用了當前的 state 作為 release 的引數,也就是完全釋放掉鎖,將 state 置為 0
 11         if (release(savedState)) {
 12             failed = false;
 13             return savedState;
 14         } else {
 15             throw new IllegalMonitorStateException();
 16         }
 17     } finally {
 18         if (failed)
 19             node.waitStatus = Node.CANCELLED;
 20     }
 21 }
  1 // 在節點入條件佇列的時候,初始化時設定了 waitStatus = Node.CONDITION
  2 // 前面我提到,signal 的時候需要將節點從條件佇列移到阻塞佇列,
  3 // 這個方法就是判斷 node 是否已經移動到阻塞隊列了
  4 final boolean isOnSyncQueue(Node node) {
  5     // 移動過去的時候,node 的 waitStatus 會置為 0,這個之後在說 signal 方法的時候會說到
  6     // 如果 waitStatus 還是 Node.CONDITION,也就是 -2,那肯定就是還在條件佇列中
  7     // 如果 node 的前驅 prev 指向還是 null,說明肯定沒有在 阻塞佇列
  8     if (node.waitStatus == Node.CONDITION || node.prev == null)
  9         return false;
 10     // 如果 node 已經有後繼節點 next 的時候,那肯定是在阻塞隊列了
 11     if (node.next != null)
 12         return true;
 13 
 14     // 這個方法從阻塞佇列的隊尾開始從後往前遍歷找,如果找到相等的,說明在阻塞佇列,否則就是不在阻塞佇列
 15 
 16     // 可以通過判斷 node.prev() != null 來推斷出 node 在阻塞佇列嗎?答案是:不能。
 17     // 這個可以看上篇 AQS 的入隊方法,首先設定的是 node.prev 指向 tail,
 18     // 然後是 CAS 操作將自己設定為新的 tail,可是這次的 CAS 是可能失敗的。
 19 
 20     // 呼叫這個方法的時候,往往我們需要的就在隊尾的部分,所以一般都不需要完全遍歷整個佇列的
 21     return findNodeFromTail(node);
 22 }
 23 
 24 // 從同步佇列的隊尾往前遍歷,如果找到,返回 true
 25 private boolean findNodeFromTail(Node node) {
 26     Node t = tail;
 27     for (;;) {
 28         if (t == node)
 29             return true;
 30         if (t == null)
 31             return false;
 32         t = t.prev;
 33     }
 34 }
 35 }
           等待喚醒。
  1 int interruptMode = 0;
  2 while (!isOnSyncQueue(node)) {
  3     // 執行緒掛起
  4     LockSupport.park(this);
  5 
  6     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  7         break;
  8 }

         回到前面的迴圈,isOnSyncQueue(node) 返回 false 的話,那麼進到 LockSupport.park(this); 這裡執行緒掛起。

         接下來就是 signal 喚醒執行緒,轉移到阻塞佇列為了大家理解,這裡我們先看喚醒操作,因為剛剛到 LockSupport.park(this); 把執行緒掛起了,等待喚醒。喚醒操作通常由另一個執行緒來操作,就像生產者-消費者模式中,如果執行緒因為等待消費而掛起,那麼當生產者生產了一個東西后,會呼叫 signal 喚醒正在等待的執行緒來消費。

  1 
  2 // 喚醒等待了最久的執行緒
  3 // 其實就是,將這個執行緒對應的 node 從條件佇列轉移到阻塞佇列
  4 public final void signal() {
  5     // 呼叫 signal 方法的執行緒必須持有當前的獨佔鎖
  6     if (!isHeldExclusively())
  7         throw new IllegalMonitorStateException();
  8     Node first = firstWaiter;
  9     if (first != null)
 10         doSignal(first);
 11 }
 12 
 13 // 從條件佇列隊頭往後遍歷,找出第一個需要轉移的 node
 14 // 因為前面我們說過,有些執行緒會取消排隊,但是還在佇列中
 15 private void doSignal(Node first) {
 16     do {
 17           // 將 firstWaiter 指向 first 節點後面的第一個
 18         // 如果將隊頭移除後,後面沒有節點在等待了,那麼需要將 lastWaiter 置為 null
 19         if ( (firstWaiter = first.nextWaiter) == null)
 20             lastWaiter = null;
 21         // 因為 first 馬上要被移到阻塞隊列了,和條件佇列的連結關係在這裡斷掉
 22         first.nextWaiter = null;
 23     } while (!transferForSignal(first) &&
 24              (first = firstWaiter) != null);
 25       // 這裡 while 迴圈,如果 first 轉移不成功,那麼選擇 first 後面的第一個節點進行轉移,依此類推
 26 }
 27 
 28 // 將節點從條件佇列轉移到阻塞佇列
 29 // true 代表成功轉移
 30 // false 代表在 signal 之前,節點已經取消了
 31 final boolean transferForSignal(Node node) {
 32 
 33     // CAS 如果失敗,說明此 node 的 waitStatus 已不是 Node.CONDITION,說明節點已經取消,
 34     // 既然已經取消,也就不需要轉移了,方法返回,轉移後面一個節點
 35     // 否則,將 waitStatus 置為 0
 36     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 37         return false;
 38 
 39     // enq(node): 自旋進入阻塞佇列的隊尾
 40     // 注意,這裡的返回值 p 是 node 在阻塞佇列的前驅節點
 41     Node p = enq(node);
 42     int ws = p.waitStatus;
 43     // ws > 0 說明 node 在阻塞佇列中的前驅節點取消了等待鎖,直接喚醒 node 對應的執行緒。喚醒之後會怎麼樣,後面再解釋
 44     // 如果 ws <= 0, 那麼 compareAndSetWaitStatus 將會被呼叫,上篇介紹的時候說過,節點入隊後,需要把前驅節點的狀態設為 Node.SIGNAL(-1)
 45     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 46         // 如果前驅節點取消或者 CAS 失敗,會進到這裡喚醒執行緒,之後的操作看下一節
 47         LockSupport.unpark(node.thread);
 48     return true;
 49 

        接下來喚醒後檢查中斷狀態

  1 int interruptMode = 0;
  2 while (!isOnSyncQueue(node)) {
  3     // 執行緒掛起
  4     LockSupport.park(this);
  5 
  6     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  7         break;
  8 }

 

  1 // 1. 如果在 signal 之前已經中斷,返回 THROW_IE
  2 // 2. 如果是 signal 之後中斷,返回 REINTERRUPT
  3 // 3. 沒有發生中斷,返回 0
  4 private int checkInterruptWhileWaiting(Node node) {
  5     return Thread.interrupted() ?
  6         (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  7         0;
  8 }

 

  1 // 只有執行緒處於中斷狀態,才會呼叫此方法
  2 // 如果需要的話,將這個已經取消等待的節點轉移到阻塞佇列
  3 // 返回 true:如果此執行緒在 signal 之前被取消,
  4 final boolean transferAfterCancelledWait(Node node) {
  5     // 用 CAS 將節點狀態設定為 0 
  6     // 如果這步 CAS 成功,說明是 signal 方法之前發生的中斷,因為如果 signal 先發生的話,signal 中會將 waitStatus 設定為 0
  7     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  8         // 將節點放入阻塞佇列
  9         // 這裡我們看到,即使中斷了,依然會轉移到阻塞佇列
 10         enq(node);
 11         return true;
 12     }
 13 
 14     // 到這裡是因為 CAS 失敗,肯定是因為 signal 方法已經將 waitStatus 設定為了 0
 15     // signal 方法會將節點轉移到阻塞佇列,但是可能還沒完成,這邊自旋等待其完成
 16     // 當然,這種事情還是比較少的吧:signal 呼叫之後,沒完成轉移之前,發生了中斷
 17     while (!isOnSyncQueue(node))
 18         Thread.yield();
 19     return false;
 20 }

 

為什麼有了sync queue 還需要 wait queue ?

          ConditionObject  裡的await方法會(假如這個節點已經存在sync queue)釋放鎖移入 wait queue , signal 方法則是重新移入到 sync queue ,那麼我們就可以知道了 wait queue 的作用是臨時存放節點,移除在 sync queue 的節點(假如存在),再插入到 sync queue 的隊尾。它的作用我們可以在閱讀ArrayBlockingQueue原始碼時就可以感受到了。

 

 

參考資料