1. 程式人生 > >java 並發(五)---AbstractQueuedSynchronizer(2)

java 並發(五)---AbstractQueuedSynchronizer(2)

version rst dstat oop false monit unp bool all

文章部分代碼和照片來自參考資料

ConditonObject

ConditionObject 繼承 Condition 這個接口, 看一下這個接口的註解說明 :

Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

如果Lock替換了synchronized方法和語句的使用,則Condition將替換Object監視方法的使用。

Condition 經常可以用在生產者-消費者的場景中,ArrayBlockingQueue 采用這種方式實現了生產者-消費者.

ConditionObject 是 AQS裏的一個對象,繼承Condition 接口,上一節我們提到AQS 通過同步隊列(sync queue )和 等待隊列(wait queue )還有 狀態變量(statue)進行並發控制。這節我們要講的就是在等待隊列的操作。

下面是 wait queue 和 sync queue 的圖例。

技術分享圖片

await 和 signal 方法

代碼分析來自於 一行一行源碼分析清楚 AbstractQueuedSynchronizer (二)

這兩個方法可以用下面兩種兩張圖來描述。其中await 是將節點加入到 wait queue ,然後等待喚醒。 signal 方法是從wait queue 移動到 sync queue 中,然後喚醒。

技術分享圖片

圖一. await 方法

技術分享圖片

圖二. signal 方法

Conditon 的方法實現 基於 ReetranLock 。下面源碼分析會涉及到。

Condition 的await 方法 包括的操作有 :

  • If current thread is interrupted, throw InterruptedException.
  • Save lock state returned by getState.
  • Invoke release with saved state as argument, throwing IllegalMonitorStateException if it fails.
  • Block until signalled or interrupted.
  • Reacquire by invoking specialized version of acquire with saved state as argument.
    If interrupted while blocked in step 4, throw InterruptedException.

  1 // 首先,這個方法是可被中斷的,不可被中斷的是另一個方法 awaitUninterruptibly()
  2 // 這個方法會阻塞,直到調用 signal 方法(指 signal() 和 signalAll(),下同),或被中斷
  3 public final void await() throws InterruptedException {
  4     if (Thread.interrupted())
  5         throw new InterruptedException();
  6     // 添加到 condition 的條件隊列中
  7     Node node = addConditionWaiter();
  8     // 釋放鎖,返回值是釋放鎖之前的 state 值
  9     int savedState = fullyRelease(node);
 10     int interruptMode = 0;
 11     // 這裏退出循環有兩種情況,之後再仔細分析
 12     // 1. isOnSyncQueue(node) 返回 true,即當前 node 已經轉移到阻塞隊列了
 13     // 2. checkInterruptWhileWaiting(node) != 0 會到 break,然後退出循環,代表的是線程中斷
 14     while (!isOnSyncQueue(node)) {
 15         LockSupport.park(this);
 16         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 17             break;
 18     }
 19     // 被喚醒後,將進入阻塞隊列,等待獲取鎖
 20     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 21         interruptMode = REINTERRUPT;
 22     if (node.nextWaiter != null) // clean up if cancelled
 23         unlinkCancelledWaiters();
 24     if (interruptMode != 0)
 25         reportInterruptAfterWait(interruptMode);
 26 }

  1 // 將當前線程對應的節點入隊,插入隊尾
  2 private Node addConditionWaiter() {
  3     Node t = lastWaiter;
  4     // 如果條件隊列的最後一個節點取消了,將其清除出去
  5     if (t != null && t.waitStatus != Node.CONDITION) {
  6         // 這個方法會遍歷整個條件隊列,然後會將已取消的所有節點清除出隊列
  7         unlinkCancelledWaiters();
  8         t = lastWaiter;
  9     }
 10     Node node = new Node(Thread.currentThread(), Node.CONDITION);
 11     // 如果隊列為空
 12     if (t == null)
 13         firstWaiter = node;
 14     else
 15         t.nextWaiter = node;
 16     lastWaiter = node;
 17     return node;
 18 }
 19 在addWaiter 方法中,有一個 unlinkCancelledWaiters() 方法,該方法用於清除隊列中已經取消等待的節點。
 20 
 21 當 await 的時候如果發生了取消操作(這點之後會說),或者是在節點入隊的時候,發現最後一個節點是被取消的,會調用一次這個方法。
 22 
 23 // 等待隊列是一個單向鏈表,遍歷鏈表將已經取消等待的節點清除出去
 24 // 純屬鏈表操作,很好理解,看不懂多看幾遍就可以了
 25 private void unlinkCancelledWaiters() {
 26     Node t = firstWaiter;
 27     Node trail = null;
 28     while (t != null) {
 29         Node next = t.nextWaiter;
 30         // 如果節點的狀態不是 Node.CONDITION 的話,這個節點就是被取消的
 31         if (t.waitStatus != Node.CONDITION) {
 32             t.nextWaiter = null;
 33             if (trail == null)
 34                 firstWaiter = next;
 35             else
 36                 trail.nextWaiter = next;
 37             if (next == null)
 38                 lastWaiter = trail;
 39         }
 40         else
 41             trail = t;
 42         t = next;
 43     }
 44 }

ReentranLock 是可重入的,所以釋放所有的鎖。

  1 // 首先,我們要先觀察到返回值 savedState 代表 release 之前的 state 值
  2 // 對於最簡單的操作:先 lock.lock(),然後 condition1.await()。
  3 //         那麽 state 經過這個方法由 1 變為 0,鎖釋放,此方法返回 1
  4 //         相應的,如果 lock 重入了 n 次,savedState == n
  5 // 如果這個方法失敗,會將節點設置為"取消"狀態,並拋出異常 IllegalMonitorStateException
  6 final int fullyRelease(Node node) {
  7     boolean failed = true;
  8     try {
  9         int savedState = getState();
 10         // 這裏使用了當前的 state 作為 release 的參數,也就是完全釋放掉鎖,將 state 置為 0
 11         if (release(savedState)) {
 12             failed = false;
 13             return savedState;
 14         } else {
 15             throw new IllegalMonitorStateException();
 16         }
 17     } finally {
 18         if (failed)
 19             node.waitStatus = Node.CANCELLED;
 20     }
 21 }
  1 // 在節點入條件隊列的時候,初始化時設置了 waitStatus = Node.CONDITION
  2 // 前面我提到,signal 的時候需要將節點從條件隊列移到阻塞隊列,
  3 // 這個方法就是判斷 node 是否已經移動到阻塞隊列了
  4 final boolean isOnSyncQueue(Node node) {
  5     // 移動過去的時候,node 的 waitStatus 會置為 0,這個之後在說 signal 方法的時候會說到
  6     // 如果 waitStatus 還是 Node.CONDITION,也就是 -2,那肯定就是還在條件隊列中
  7     // 如果 node 的前驅 prev 指向還是 null,說明肯定沒有在 阻塞隊列
  8     if (node.waitStatus == Node.CONDITION || node.prev == null)
  9         return false;
 10     // 如果 node 已經有後繼節點 next 的時候,那肯定是在阻塞隊列了
 11     if (node.next != null)
 12         return true;
 13 
 14     // 這個方法從阻塞隊列的隊尾開始從後往前遍歷找,如果找到相等的,說明在阻塞隊列,否則就是不在阻塞隊列
 15 
 16     // 可以通過判斷 node.prev() != null 來推斷出 node 在阻塞隊列嗎?答案是:不能。
 17     // 這個可以看上篇 AQS 的入隊方法,首先設置的是 node.prev 指向 tail,
 18     // 然後是 CAS 操作將自己設置為新的 tail,可是這次的 CAS 是可能失敗的。
 19 
 20     // 調用這個方法的時候,往往我們需要的就在隊尾的部分,所以一般都不需要完全遍歷整個隊列的
 21     return findNodeFromTail(node);
 22 }
 23 
 24 // 從同步隊列的隊尾往前遍歷,如果找到,返回 true
 25 private boolean findNodeFromTail(Node node) {
 26     Node t = tail;
 27     for (;;) {
 28         if (t == node)
 29             return true;
 30         if (t == null)
 31             return false;
 32         t = t.prev;
 33     }
 34 }
 35 }
等待喚醒。
  1 int interruptMode = 0;
  2 while (!isOnSyncQueue(node)) {
  3     // 線程掛起
  4     LockSupport.park(this);
  5 
  6     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  7         break;
  8 }

回到前面的循環,isOnSyncQueue(node) 返回 false 的話,那麽進到 LockSupport.park(this); 這裏線程掛起。

接下來就是 signal 喚醒線程,轉移到阻塞隊列為了大家理解,這裏我們先看喚醒操作,因為剛剛到 LockSupport.park(this); 把線程掛起了,等待喚醒。喚醒操作通常由另一個線程來操作,就像生產者-消費者模式中,如果線程因為等待消費而掛起,那麽當生產者生產了一個東西後,會調用 signal 喚醒正在等待的線程來消費。

  1 
  2 // 喚醒等待了最久的線程
  3 // 其實就是,將這個線程對應的 node 從條件隊列轉移到阻塞隊列
  4 public final void signal() {
  5     // 調用 signal 方法的線程必須持有當前的獨占鎖
  6     if (!isHeldExclusively())
  7         throw new IllegalMonitorStateException();
  8     Node first = firstWaiter;
  9     if (first != null)
 10         doSignal(first);
 11 }
 12 
 13 // 從條件隊列隊頭往後遍歷,找出第一個需要轉移的 node
 14 // 因為前面我們說過,有些線程會取消排隊,但是還在隊列中
 15 private void doSignal(Node first) {
 16     do {
 17           // 將 firstWaiter 指向 first 節點後面的第一個
 18         // 如果將隊頭移除後,後面沒有節點在等待了,那麽需要將 lastWaiter 置為 null
 19         if ( (firstWaiter = first.nextWaiter) == null)
 20             lastWaiter = null;
 21         // 因為 first 馬上要被移到阻塞隊列了,和條件隊列的鏈接關系在這裏斷掉
 22         first.nextWaiter = null;
 23     } while (!transferForSignal(first) &&
 24              (first = firstWaiter) != null);
 25       // 這裏 while 循環,如果 first 轉移不成功,那麽選擇 first 後面的第一個節點進行轉移,依此類推
 26 }
 27 
 28 // 將節點從條件隊列轉移到阻塞隊列
 29 // true 代表成功轉移
 30 // false 代表在 signal 之前,節點已經取消了
 31 final boolean transferForSignal(Node node) {
 32 
 33     // CAS 如果失敗,說明此 node 的 waitStatus 已不是 Node.CONDITION,說明節點已經取消,
 34     // 既然已經取消,也就不需要轉移了,方法返回,轉移後面一個節點
 35     // 否則,將 waitStatus 置為 0
 36     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 37         return false;
 38 
 39     // enq(node): 自旋進入阻塞隊列的隊尾
 40     // 註意,這裏的返回值 p 是 node 在阻塞隊列的前驅節點
 41     Node p = enq(node);
 42     int ws = p.waitStatus;
 43     // ws > 0 說明 node 在阻塞隊列中的前驅節點取消了等待鎖,直接喚醒 node 對應的線程。喚醒之後會怎麽樣,後面再解釋
 44     // 如果 ws <= 0, 那麽 compareAndSetWaitStatus 將會被調用,上篇介紹的時候說過,節點入隊後,需要把前驅節點的狀態設為 Node.SIGNAL(-1)
 45     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 46         // 如果前驅節點取消或者 CAS 失敗,會進到這裏喚醒線程,之後的操作看下一節
 47         LockSupport.unpark(node.thread);
 48     return true;
 49 

接下來喚醒後檢查中斷狀態

  1 int interruptMode = 0;
  2 while (!isOnSyncQueue(node)) {
  3     // 線程掛起
  4     LockSupport.park(this);
  5 
  6     if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  7         break;
  8 }

  1 // 1. 如果在 signal 之前已經中斷,返回 THROW_IE
  2 // 2. 如果是 signal 之後中斷,返回 REINTERRUPT
  3 // 3. 沒有發生中斷,返回 0
  4 private int checkInterruptWhileWaiting(Node node) {
  5     return Thread.interrupted() ?
  6         (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  7         0;
  8 }

  1 // 只有線程處於中斷狀態,才會調用此方法
  2 // 如果需要的話,將這個已經取消等待的節點轉移到阻塞隊列
  3 // 返回 true:如果此線程在 signal 之前被取消,
  4 final boolean transferAfterCancelledWait(Node node) {
  5     // 用 CAS 將節點狀態設置為 0 
  6     // 如果這步 CAS 成功,說明是 signal 方法之前發生的中斷,因為如果 signal 先發生的話,signal 中會將 waitStatus 設置為 0
  7     if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  8         // 將節點放入阻塞隊列
  9         // 這裏我們看到,即使中斷了,依然會轉移到阻塞隊列
 10         enq(node);
 11         return true;
 12     }
 13 
 14     // 到這裏是因為 CAS 失敗,肯定是因為 signal 方法已經將 waitStatus 設置為了 0
 15     // signal 方法會將節點轉移到阻塞隊列,但是可能還沒完成,這邊自旋等待其完成
 16     // 當然,這種事情還是比較少的吧:signal 調用之後,沒完成轉移之前,發生了中斷
 17     while (!isOnSyncQueue(node))
 18         Thread.yield();
 19     return false;
 20 }

為什麽有了sync queue 還需要 wait queue ?

ConditionObject 裏的await方法會(假如這個節點已經存在sync queue)釋放鎖移入 wait queue , signal 方法則是重新移入到 sync queue ,那麽我們就可以知道了 wait queue 的作用是臨時存放節點,移除在 sync queue 的節點(假如存在),再插入到 sync queue 的隊尾。它的作用我們可以在閱讀ArrayBlockingQueue源碼時就可以感受到了。

參考資料

  • 一行一行源碼分析清楚 AbstractQueuedSynchronizer (二)
  • 一行一行源碼分析清楚 AbstractQueuedSynchronizer (三)
  • https://www.aliyun.com/jiaocheng/792638.html

java 並發(五)---AbstractQueuedSynchronizer(2)