1. 程式人生 > >J.U.C 之 AQS:同步狀態的獲取與釋放

J.U.C 之 AQS:同步狀態的獲取與釋放

在前面提到過,AQS 是構建 Java 同步元件的基礎,我們期待它能夠成為實現大部分同步需求的基礎。

AQS 的設計模式採用的模板方法模式,子類通過繼承的方式,實現它的抽象方法來管理同步狀態。對於子類而言,它並沒有太多的活要做,AQS 已經提供了大量的模板方法來實現同步,主要是分為三類:

  • 獨佔式獲取和釋放同步狀態
  • 共享式獲取和釋放同步狀態
  • 查詢同步佇列中的等待執行緒情況。

自定義子類使用 AQS 提供的模板方法,就可以實現自己的同步語義

1. 獨佔式

獨佔式,同一時刻,僅有一個執行緒持有同步狀態

1.1 獨佔式同步狀態獲取

老艿艿:「1.1 獨佔式同步狀態獲取」 整個小節,是本文最難的部分。請一定保持耐心。

#acquire(int arg) 方法,為 AQS 提供的模板方法。該方法為獨佔式獲取同步狀態,但是該方法對中斷不敏感。也就是說,由於執行緒獲取同步狀態失敗而加入到 CLH 同步佇列中,後續對該執行緒進行中斷操作時,執行緒不會從 CLH 同步佇列中移除。程式碼如下:

1: public final void acquire(int arg) {
2:     if (!tryAcquire(arg) &&
3:         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4:         selfInterrupt();
5: }
  • 第 2 行:呼叫 #tryAcquire(int arg) 方法,去嘗試獲取同步狀態,獲取成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。若獲取成功,#acquire(int arg) 方法,直接返回,不用執行緒阻塞,自旋直到獲得同步狀態成功。

    • #tryAcquire(int arg) 方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。程式碼如下:

      protected boolean tryAcquire(int arg) {
          throw new UnsupportedOperationException();
      }
      
      • 直接丟擲 UnsupportedOperationException 異常。
  • 第 3 行:如果 #tryAcquire(int arg) 方法返回 false ,即獲取同步狀態失敗,則呼叫 #addWaiter(Node mode) 方法,將當前執行緒加入到 CLH 同步佇列尾部。並且, mode 方法引數為 Node.EXCLUSIVE ,表示獨佔模式。

  • 第 3 行:呼叫 boolean #acquireQueued(Node node, int arg) 方法,自旋直到獲得同步狀態成功。詳細解析,見 「1.1.1 acquireQueued」 中。另外,該方法的返回值型別為 boolean ,當返回 true 時,表示在這個過程中,發生過執行緒中斷。但是呢,這個方法又會清理執行緒中斷的標識,所以在種情況下,需要呼叫【第 4 行】的 #selfInterrupt() 方法,恢復執行緒中斷的標識,程式碼如下:

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    

1.1.1 acquireQueued

boolean #acquireQueued(Node node, int arg) 方法,為一個自旋的過程,也就是說,當前執行緒(Node)進入同步佇列後,就會進入一個自旋的過程,每個節點都會自省地觀察,當條件滿足,獲取到同步狀態後,就可以從這個自旋過程中退出,否則會一直執行下去。

流程圖如下:

流程圖

程式碼如下:

 1: final boolean acquireQueued(final Node node, int arg) {
 2:     // 記錄是否獲取同步狀態成功
 3:     boolean failed = true;
 4:     try {
 5:         // 記錄過程中,是否發生執行緒中斷
 6:         boolean interrupted = false;
 7:         /*
 8:          * 自旋過程,其實就是一個死迴圈而已
 9:          */
10:         for (;;) {
11:             // 當前執行緒的前驅節點
12:             final Node p = node.predecessor();
13:             // 當前執行緒的前驅節點是頭結點,且同步狀態成功
14:             if (p == head && tryAcquire(arg)) {
15:                 setHead(node);
16:                 p.next = null; // help GC
17:                 failed = false;
18:                 return interrupted;
19:             }
20:             // 獲取失敗,執行緒等待--具體後面介紹
21:             if (shouldParkAfterFailedAcquire(p, node) &&
22:                     parkAndCheckInterrupt())
23:                 interrupted = true;
24:         }
25:     } finally {
26:         // 獲取同步狀態發生異常,取消獲取。
27:         if (failed)
28:             cancelAcquire(node);
29:     }
30: }
  • 第 3 行:failed 變數,記錄是否獲取同步狀態成功。
  • 第 6 行:interrupted 變數,記錄獲取過程中,是否發生執行緒中斷
  • ========== 第 7 至 24 行:“死”迴圈,自旋直到獲得同步狀態成功。==========
  • 第 12 行:呼叫 Node#predecessor() 方法,獲得當前執行緒的前一個節點 p
  • 第 14 行:p == head 程式碼塊,若滿足,則表示當前執行緒的前一個節點為節點,因為 head最後一個獲得同步狀態成功的節點,此時呼叫 #tryAcquire(int arg) 方法,嘗試獲得同步狀態。? 在 #acquire(int arg) 方法的【第 2 行】,也呼叫了這個方法。
  • 第 15 至 18 行:當前節點( 執行緒 )獲取同步狀態成功
    • 第 15 行:設定當前節點( 執行緒 )為head
    • 第 16 行:設定節點 p 不再指向下一個節點,讓它自身更快的被 GC 。
    • 第 17 行:標記 failed = false ,表示獲取同步狀態成功。
    • 第 18 行:返回記錄獲取過程中,是否發生執行緒中斷
  • 第 20 至 24 行:獲取失敗,執行緒等待喚醒,從而進行下一次的同步狀態獲取的嘗試。詳細解析,見 《【死磕 Java 併發】—– J.U.C 之 AQS:阻塞和喚醒執行緒》 。詳細解析,見 「1.1.2 shouldParkAfterFailedAcquire」
    • 第 21 行:呼叫 #shouldParkAfterFailedAcquire(Node pre, Node node) 方法,判斷獲取失敗後,是否當前執行緒需要阻塞等待。
  • ========== 第 26 至 29 行:獲取同步狀態的過程中,發生異常,取消獲取。==========
  • 第 28 行:呼叫 #cancelAcquire(Node node) 方法,取消獲取同步狀態。詳細解析,見 「1.1.3 cancelAcquire」

1.1.2 shouldParkAfterFailedAcquire

 1: private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2:     // 獲得前一個節點的等待狀態
 3:     int ws = pred.waitStatus;
 4:     if (ws == Node.SIGNAL) //  Node.SIGNAL
 5:         /*
 6:          * This node has already set status asking a release
 7:          * to signal it, so it can safely park.
 8:          */
 9:         return true;
10:     if (ws > 0) { // Node.CANCEL
11:         /*
12:          * Predecessor was cancelled. Skip over predecessors and
13:          * indicate retry.
14:          */
15:         do {
16:             node.prev = pred = pred.prev;
17:         } while (pred.waitStatus > 0);
18:         pred.next = node;
19:     } else { // 0 或者 Node.PROPAGATE
20:         /*
21:          * waitStatus must be 0 or PROPAGATE.  Indicate that we
22:          * need a signal, but don't park yet.  Caller will need to
23:          * retry to make sure it cannot acquire before parking.
24:          */
25:         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
26:     }
27:     return false;
28: }
  • prednode 方法引數,傳入時,要求前者必須是後者的前一個節點。
  • 第 3 行:獲得前一個節點( pre )的等待狀態。下面會根據這個狀態有三種情況的處理。
  • 第 4 至 9 行:等待狀態為 Node.SIGNAL 時,表示 pred 的下一個節點 node 的執行緒需要阻塞等待。在 pred 的執行緒釋放同步狀態時,會對 node 的執行緒進行喚醒通知。所以,【第 9 行】返回 true ,表明當前執行緒可以被 park安全的阻塞等待。
  • 第 19 至 26 行:等待狀態為 0 或者 Node.PROPAGATE 時,通過 CAS 設定,將狀態修改為 Node.SIGNAL ,即下一次重新執行 #shouldParkAfterFailedAcquire(Node pred, Node node) 方法時,滿足【第 4 至 9 行】的條件。
    • 但是,對於本次執行,【第 27 行】返回 false 。
    • 另外,等待狀態不會為 Node.CONDITION ,因為它用在 ConditonObject 中。
  • 第 10 至 18 行:等待狀態為 NODE.CANCELLED 時,則表明該執行緒的前一個節點已經等待超時或者被中斷了,則需要從 CLH 佇列中將該前一個節點刪除掉,迴圈回溯,直到前一個節點狀態 <= 0
    • 對於本次執行,【第 27 行】返回 false ,需要下一次再重新執行 #shouldParkAfterFailedAcquire(Node pred, Node node) 方法,看看滿足哪個條件。
    • 整個過程如下圖:過程

1.1.3 cancelAcquire

 1: private void cancelAcquire(Node node) {
 2:     // Ignore if node doesn't exist
 3:     if (node == null)
 4:         return;
 5: 
 6:     node.thread = null;
 7: 
 8:     // Skip cancelled predecessors
 9:     Node pred = node.prev;
10:     while (pred.waitStatus > 0)
11:         node.prev = pred = pred.prev;
12: 
13:     // predNext is the apparent node to unsplice. CASes below will
14:     // fail if not, in which case, we lost race vs another cancel
15:     // or signal, so no further action is necessary.
16:     Node predNext = pred.next;
17: 
18:     // Can use unconditional write instead of CAS here.
19:     // After this atomic step, other Nodes can skip past us.
20:     // Before, we are free of interference from other threads.
21:     node.waitStatus = Node.CANCELLED;
22: 
23:     // If we are the tail, remove ourselves.
24:     if (node == tail && compareAndSetTail(node, pred)) {
25:         compareAndSetNext(pred, predNext, null);
26:     } else {
27:         // If successor needs signal, try to set pred's next-link
28:         // so it will get one. Otherwise wake it up to propagate.
29:         int ws;
30:         if (pred != head &&
31:             ((ws = pred.waitStatus) == Node.SIGNAL ||
32:              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
33:             pred.thread != null) {
34:             Node next = node.next;
35:             if (next != null && next.waitStatus <= 0)
36:                 compareAndSetNext(pred, predNext, next);
37:         } else {
38:             unparkSuccessor(node);
39:         }
40: 
41:         node.next = node; // help GC
42:     }
43: }
  • 第 2 至 4 行:忽略,若傳入引數 node 為空。
  • 第 6 行:將節點的等待執行緒置
  • 第 9 行:獲得 node 節點的一個節點 pred
    • 第 10 至 11 行: 邏輯同 #shouldParkAfterFailedAcquire(Node pred, Node node) 的【第 15 至 17 行】。
  • 第 16 行:獲得 pred一個節點 predNext 。在這個變數上,有很“複雜”的英文,我們來理解下:predNext 從表面上看,和 node 是等價的。
    • 但是實際上,存在多執行緒併發的情況,所以在【第 25 行】或者【第 36 行】中,我們呼叫 #compareAndSetNext(...) 方法,使用 CAS 的方式,設定 pred一個節點。
    • 如果設定失敗,說明當前執行緒和其它執行緒競爭失敗,不需要做其它邏輯,因為 pred一個節點已經被其它執行緒設定成功
  • 第 21 行:設定 node 節點的為取消的等待狀態 Node.CANCELLED 。在這個變數上,有很“複雜”的英文,我們再來理解下:
    • 這裡可以使用直接寫,而不是 CAS 。
    • 在這個操作之後,其它 Node 節點可以忽略 node
    • Before, we are free of interference from other threads. TODO 9000 芋艿,如何理解。
  • 下面開始開始修改 pred的下一個節點,一共分成種情況。
  • ========== 第一種 ==========
  • 第 24 行:如果 node節點,呼叫 #compareAndSetTail(...) 方法,CAS 設定 pred節點。
    • 第 25 行:若上述操作成功,呼叫 #compareAndSetNext(...) 方法,CAS 設定 pred一個節點為空( null )。
  • ========== 第二種 ==========
  • 第 30 行:pred節點。
  • 第 31 至 32 行:pred等待狀態Node.SIGNAL ,或者可被 CASNode.SIGNAL
  • 第 33 行:pred 的執行緒非空
    • TODO 9001 芋艿,如何理解。目前能想象到的,一開始 30 行為非頭節點,在 33 的時候,結果成為頭節點,執行緒已經為空了。
  • 第 34 至 36 行:若 node一個節點 next等待狀態Node.CANCELLED ,則呼叫 #compareAndSetNext(...) 方法,CAS 設定 pred一個節點為 next
  • ========== 第三種 ==========
  • 第 37 至 39 行:如果 pred節點( ? 在【第 31 至 33 行】也會有別的情況 ),呼叫 #unparkSuccessor(Node node) 方法,喚醒 node一個節點的執行緒等待。詳細解析,見 《【死磕 Java 併發】—– J.U.C 之 AQS:阻塞和喚醒執行緒》
    • 為什麼此處需要喚醒呢?因為,pred節點,node一個節點的阻塞等待,需要 node 釋放同步狀態時進行喚醒。但是,node 取消獲取同步狀態,則不會再出現 node 釋放同步狀態時進行喚醒 node一個節點。因此,需要此處進行喚醒。
  • ========== 第 二 + 三種 ==========
  • 第 41 行:TODO 芋艿 9002 為啥是 next 為 node 。目前收集到的資料如下:

    • next 的註釋如下:

      /**
       * Link to the successor node that the current node/thread
       * unparks upon release. Assigned during enqueuing, adjusted
       * when bypassing cancelled predecessors, and nulled out (for
       * sake of GC) when dequeued.  The enq operation does not
       * assign next field of a predecessor until after attachment,
       * so seeing a null next field does not necessarily mean that
       * node is at end of queue. However, if a next field appears
       * to be null, we can scan prev's from the tail to
       * double-check.  The next field of cancelled nodes is set to
       * point to the node itself instead of null, to make life
       * easier for isOnSyncQueue.
       */
      
      • 最後一句話

1.2 獨佔式獲取響應中斷

AQS 提供了acquire(int arg) 方法,以供獨佔式獲取同步狀態,但是該方法對中斷不響應,對執行緒進行中斷操作後,該執行緒會依然位於CLH同步佇列中,等待著獲取同步狀態。為了響應中斷,AQS 提供了 #acquireInterruptibly(int arg) 方法。該方法在等待獲取同步狀態時,如果當前執行緒被中斷了,會立刻響應中斷,並丟擲 InterruptedException 異常。

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
  • 首先,校驗該執行緒是否已經中斷了,如果是,則丟擲InterruptedException 異常。
  • 然後,呼叫 #tryAcquire(int arg) 方法,嘗試獲取同步狀態,如果獲取成功,則直接返回。
  • 最後,呼叫 #doAcquireInterruptibly(int arg) 方法,自旋直到獲得同步狀態成功,或執行緒中斷丟擲 InterruptedException 異常
  • 應該不僅僅 help gc

1.2.1 doAcquireInterruptibly

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException(); // <1>
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

它與 #acquire(int arg) 方法僅有兩個差別

  1. 方法宣告丟擲 InterruptedException 異常。
  2. 在中斷方法處不再是使用 interrupted 標誌,而是直接丟擲 InterruptedException 異常,即 <1> 處。

1.3 獨佔式超時獲取

AQS 除了提供上面兩個方法外,還提供了一個增強版的方法 #tryAcquireNanos(int arg, long nanos) 。該方法為 #acquireInterruptibly(int arg) 方法的進一步增強,它除了響應中斷外,還有超時控制。即如果當前執行緒沒有在指定時間內獲取同步狀態,則會返回 false ,否則返回 true 。

流程圖如下:

流程圖

程式碼如下:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
  • 首先,校驗該執行緒是否已經中斷了,如果是,則丟擲InterruptedException 異常。
  • 然後,呼叫 #tryAcquire(int arg) 方法,嘗試獲取同步狀態,如果獲取成功,則直接返回。
  • 最後,呼叫 #tryAcquireNanos(int arg) 方法,自旋直到獲得同步狀態成功,或執行緒中斷丟擲 InterruptedException 異常,或超過指定時間返回獲取同步狀態失敗

1.3.1 tryAcquireNanos

static final long spinForTimeoutThreshold = 1000L;

  1: private boolean doAcquireNanos(int arg, long nanosTimeout)
  2:         throws InterruptedException {
  3:     // nanosTimeout <= 0
  4:     if (nanosTimeout <= 0L)
  5:         return false;
  6:     // 超時時間
  7:     final long deadline = System.nanoTime() + nanosTimeout;
  8:     // 新增 Node 節點
  9:     final Node node = addWaiter(Node.EXCLUSIVE);
 10:     boolean failed = true;
 11:     try {
 12:         // 自旋
 13:         for (;;) {
 14:             final Node p = node.predecessor();
 15:             // 獲取同步狀態成功
 16:             if (p == head && tryAcquire(arg)) {
 17:                 setHead(node);
 18:                 p.next = null; // help GC
 19:                 failed = false;
 20:                 return true;
 21:             }
 22:             /*
 23:              * 獲取失敗,做超時、中斷判斷
 24:              */
 25:             // 重新計算需要休眠的時間
 26:             nanosTimeout = deadline - System.nanoTime();
 27:             // 已經超時,返回false
 28:             if (nanosTimeout <= 0L)
 29:                 return false;
 30:             // 如果沒有超時,則等待nanosTimeout納秒
 31:             // 注:該執行緒會直接從LockSupport.parkNanos中返回,
 32:             // LockSupport 為 J.U.C 提供的一個阻塞和喚醒的工具類,後面做詳細介紹
 33:             if (shouldParkAfterFailedAcquire(p, node) &&
 34:                     nanosTimeout > spinForTimeoutThreshold)
 35:                 LockSupport.parkNanos(this, nanosTimeout);
 36:             // 執行緒是否已經中斷了
 37:             if (Thread.interrupted())
 38:                 throw new InterruptedException();
 39:         }
 40:     } finally {
 41:         if (failed)
 42:             cancelAcquire(node);
 43:     }
 44: }
  • 因為是在 #doAcquireInterruptibly(int arg) 方法的基礎上,做了超時控制的增強,所以相同部分,我們直接跳過
  • 第 3 至 5 行:如果超時時間小於 0 ,直接返回 false ,已經超時。
  • 第 7 行:計算最終超時時間 deadline
  • 第 9 行:【相同,跳過】
  • 第 10 行:【相同,跳過】
  • 第 13 行:【相同,跳過】
  • 第 14 行:【相同,跳過】
  • 第 15 至 21 行:【相同,跳過】
  • 第 26 行:重新計算剩餘可獲取同步狀態的時間 nanosTimeout
  • 第 27 至 29 行:如果剩餘時間小於 0 ,直接返回 false ,已經超時。
  • 第 33 行:【相同,跳過】
  • 第 34 至 35 行:如果剩餘時間大於 spinForTimeoutThreshold ,則呼叫 LockSupport#parkNanos(Object blocker, long nanos) 方法,休眠 nanosTimeout 秒。否則,就不需要休眠了,直接進入快速自旋的過程。原因在於,spinForTimeoutThreshold 已經非常小了,非常短的時間等待無法做到十分精確,如果這時再次進行超時等待,相反會讓 nanosTimeout 的超時從整體上面表現得不是那麼精確。所以,在超時非常短的場景中,AQS 會進行無條件的快速自旋
  • 第 36 至 39 行:若執行緒已經中斷了,丟擲 InterruptedException 異常。
  • 第 40 至 43 行:【相同,跳過】

1.4 獨佔式同步狀態釋放

當執行緒獲取同步狀態後,執行完相應邏輯後,就需要釋放同步狀態。AQS 提供了#release(int arg)方法,釋放同步狀態。程式碼如下:

1: public final boolean release(int arg) {
2:     if (tryRelease(arg)) {
3:         Node h = head;
4:         if (h != null && h.waitStatus != 0)
5:             unparkSuccessor(h);
6:         return true;
7:     }
8:     return false;
9: }
  • 第 2 行:呼叫 #tryRelease(int arg) 方法,去嘗試釋放同步狀態,釋放成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。同時,它們分別對應【第 3 至 6】和【第 8 行】的邏輯。

    • #tryRelease(int arg) 方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的釋放同步狀態。程式碼如下:

      protected boolean tryRelease(int arg) {
          throw new UnsupportedOperationException();
      }
      
      • 直接丟擲 UnsupportedOperationException 異常。
  • 第 3 行:獲得當前head ,避免併發問題。

  • 第 4 行:頭結點不為空,並且頭結點狀態不為 0 ( INITAL 未初始化)。為什麼會出現 0 的情況呢?老艿艿的想法是,以 ReentrantReadWriteLock ( ? 內部基於 AQS 實現 ) 舉例子:

    • 執行緒 A 和執行緒 B ,都獲取了讀鎖。
    • 執行緒 A 和執行緒 B ,基本同時釋放鎖,那麼此時【第 3 行】h 可能獲取到的是相同head 節點。此時,A 執行緒恰好先執行了 #unparkSuccessor(Node node) 方法,會將 waitStatus 設定為 0 。因此,#unparkSuccessor(Node node) 方法,對於 B 執行緒就不需要呼叫了。當然,更極端的情況下,可能 A 和 B 執行緒,都呼叫了 #unparkSuccessor(Node node) 方法。

      老艿艿:如上是我的猜想,並未實際驗證。如果不正確,或者有其他情況,歡迎斧正。

  • 第 5 行:呼叫 #unparkSuccessor(Node node) 方法,喚醒下一個節點的執行緒等待。詳細解析,見 《【死磕 Java 併發】—– J.U.C 之 AQS:阻塞和喚醒執行緒》

1.5 總結

這裡稍微總結下:

在 AQS 中維護著一個 FIFO 的同步佇列。

  • 當執行緒獲取同步狀態失敗後,則會加入到這個 CLH 同步佇列的對尾,並一直保持著自旋。
  • 在 CLH 同步佇列中的執行緒在自旋時,會判斷其前驅節點是否為首節點,如果為首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步佇列。
  • 當執行緒執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。

2. 共享式

共享式與獨佔式的最主要區別在於,同一時刻

  • 獨佔式只能有一個執行緒獲取同步狀態。
  • 共享式可以有多個執行緒獲取同步狀態。

例如,讀操作可以有多個執行緒同時進行,而寫操作同一時刻只能有一個執行緒進行寫操作,其他操作都會被阻塞。參見 ReentrantReadWriteLock 。

2.1 共享式同步狀態獲取

AQS 提供 #acquireShared(int arg) 方法,共享式獲取同步狀態。程式碼如下:

#acquireShared(int arg) 方法,對標 #acquire(int arg) 方法。

1: public final void acquireShared(int arg) {
2:     if (tryAcquireShared(arg) < 0)
3:         doAcquireShared(arg);
4: }
  • 第 2 行:呼叫 #tryAcquireShared(int arg) 方法,嘗試獲取同步狀態,獲取成功則設定鎖狀態並返回大於等於 0 ,否則獲取失敗,返回小於 0 。若獲取成功,直接返回,不用執行緒阻塞,自旋直到獲得同步狀態成功。

    • #tryAcquireShared(int arg) 方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。程式碼如下:

      protected int tryAcquireShared(int arg) {
          throw new UnsupportedOperationException();
      }
      
      • 直接丟擲 UnsupportedOperationException 異常。

2.1.1 doAcquireShared

 1: private void doAcquireShared(int arg) {
 2:     // 共享式節點
 3:     final Node node = addWaiter(Node.SHARED);
 4:     boolean failed = true;
 5:     try {
 6:         boolean interrupted = false;
 7:         for (;;) {
 8:             // 前驅節點
 9:             final Node p = node.predecessor();
10:             // 如果其前驅節點,獲取同步狀態
11:             if (p == head) {
12:                 // 嘗試獲取同步
13:                 int r = tryAcquireShared(arg);
14:                 if (r >= 0) {
15:                     setHeadAndPropagate(node, r);
16:                     p.next = null; // help GC
17:                     if (interrupted)
18:                         selfInterrupt();
19:                     failed = false;
20:                     return;
21:                 }
22:             }
23:             if (shouldParkAfterFailedAcquire(p, node) &&
24:                     parkAndCheckInterrupt())
25:                 interrupted = true;
26:         }
27:     } finally {
28:         if (failed)
29:             cancelAcquire(node);
30:     }
31: }
  • 因為和 #acquireQueued(int arg) 方法的基礎上,所以相同部分,我們直接跳過
  • 第 3 行:呼叫 #addWaiter(Node mode) 方法,將當前執行緒加入到 CLH 同步佇列尾部。並且, mode 方法引數為 Node.SHARED ,表示共享模式。
  • 第 6 行:【相同,跳過】
  • 第 9 至 22 行:【大體相同,部分跳過】
    • 第 13 行:呼叫 #tryAcquireShared(int arg) 方法,嘗試獲得同步狀態。? 在 #acquireShared(int arg) 方法的【第 2 行】,也呼叫了這個方法。
    • 第 15 行:呼叫 #setHeadAndPropagate(Node node, int propagate) 方法,設定的首節點,並根據條件,喚醒下一個節點。詳細解析,見 「2.1.2 setHeadAndPropagate」
      • 這裡和獨佔式同步狀態獲取很大的不同:通過這樣的方式,不斷喚醒下一個共享式同步狀態, 從而實現同步狀態被多個執行緒的共享獲取
    • 第 17 至 18 行:和 #acquire(int arg) 方法,對於執行緒中斷的處理方式相同,只是程式碼放置的位置不同。
  • 第 23 至 25 行:【相同,跳過】
  • 第 27 至 30 行:【相同,跳過】

2.1.2 setHeadAndPropagate

 1: private void setHeadAndPropagate(Node node, int propagate) {
 2:     Node h = head; // Record old head for check below
 3:     setHead(node);
 4:     /*
 5:      * Try to signal next queued node if:
 6:      *   Propagation was indicated by caller,
 7:      *     or was recorded (as h.waitStatus either before
 8:      *     or after setHead) by a previous operation
 9:      *     (note: this uses sign-check of waitStatus because
10:      *      PROPAGATE status may transition to SIGNAL.)
11:      * and
12:      *   The next node is waiting in shared mode,
13:      *     or we don't know, because it appears null
14:      *
15:      * The conservatism in both of these checks may cause
16:      * unnecessary wake-ups, but only when there are multiple
17:      * racing acquires/releases, so most need signals now or soon
18:      * anyway.
19:      */
20:     if (propagate > 0 || h == null || h.waitStatus < 0 ||
21:         (h = head) == null || h.waitStatus < 0) {
22:         Node s = node.next;
23:         if (s == null || s.isShared())
24:             doReleaseShared();
25:     }
26: }
  • 第 2 行:記錄原來節點 h
  • 第 3 行:呼叫 #setHead(Node node) 方法,設定 node節點。
  • 第 20 行:propagate > 0 程式碼塊,說明同步狀態還能被其他執行緒獲取。
  • 第 20 至 21 行:判斷原來的或者節點,等待狀態Node.PROPAGATE 或者 Node.SIGNAL 時,可以繼續向下喚醒
  • 第 23 行:呼叫 Node#isShared() 方法,判斷一個節點為共享式獲取同步狀態。
  • 第 24 行:呼叫 #doReleaseShared() 方法,喚醒後續的共享式獲取同步狀態的節點。詳細解析,見 「2.1.2 setHeadAndPropagate」

2.2 共享式獲取響應中斷

#acquireSharedInterruptibly(int arg) 方法,程式碼如下:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

2.3 共享式超時獲取

#tryAcquireSharedNanos(int arg, long nanosTimeout) 方法,程式碼如下:

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

2.4 共享式同步狀態釋放

當執行緒獲取同步狀態後,執行完相應邏輯後,就需要釋放同步狀態。AQS 提供了#releaseShared(int arg)方法,釋放同步狀態。程式碼如下:

1: public final boolean releaseShared(int arg) {
2:     if (tryReleaseShared(arg)) {
3:         doReleaseShared();
4:         return true;
5:     }
6:     return false;
7: }
  • 第 2 行:呼叫 #tryReleaseShared(int arg) 方法,去嘗試釋放同步狀態,釋放成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。同時,它們分別對應【第 3 至 5】和【第 6 行】的邏輯。

    • #tryReleaseShared(int arg) 方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的釋放同步狀態。程式碼如下:

      protected boolean tryReleaseShared(int arg) {
          throw new UnsupportedOperationException();
      }
      
      • 直接丟擲 UnsupportedOperationException 異常。
  • 第 3 行:呼叫 #doReleaseShared() 方法,喚醒後續的共享式獲取同步狀態的節點。

2.4.1 doReleaseShared

 1: private void doReleaseShared() {
 2:     /*
 3:      * Ensure that a release propagates, even if there are other
 4:      * in-progress acquires/releases.  This proceeds in the usual
 5:      * way of trying to unparkSuccessor of head if it needs
 6:      * signal. But if it does not, status is set to PROPAGATE to
 7:      * ensure that upon release, propagation continues.
 8:      * Additionally, we must loop in case a new node is added
 9:      * while we are doing this. Also, unlike other uses of
10:      * unparkSuccessor, we need to know if CAS to reset status
11:      * fails, if so rechecking.
12:      */
13:     for (;;) {
14:         Node h = head;
15:         if (h != null && h != tail) {
16:             int ws = h.waitStatus;
17:             if (ws == Node.SIGNAL) {
18:                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
19:                     continue;            // loop to recheck cases
20:                 unparkSuccessor(h);
21:             }
22:             else if (ws == 0 &&
23:                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
24:                 continue;                // loop on failed CAS
25:         }
26:         if (h == head)                   // loop if head changed
27:             break;
28:     }
29: }

參考資料