J.U.C 之 AQS:同步狀態的獲取與釋放
在前面提到過,AQS 是構建 Java 同步元件的基礎,我們期待它能夠成為實現大部分同步需求的基礎。
AQS 的設計模式採用的模板方法模式,子類通過繼承的方式,實現它的抽象方法來管理同步狀態。對於子類而言,它並沒有太多的活要做,AQS 已經提供了大量的模板方法來實現同步,主要是分為三類:
- 獨佔式獲取和釋放同步狀態
- 共享式獲取和釋放同步狀態
- 查詢同步佇列中的等待執行緒情況。
自定義子類使用 AQS 提供的模板方法,就可以實現自己的同步語義。
1. 獨佔式
獨佔式,同一時刻,僅有一個執行緒持有同步狀態。
1.1 獨佔式同步狀態獲取
老艿艿:「1.1 獨佔式同步狀態獲取」 整個小節,是本文最難的部分。請一定保持耐心。
#acquire(int arg)
方法,為 AQS 提供的模板方法。該方法為獨佔式獲取同步狀態,但是該方法對中斷不敏感。也就是說,由於執行緒獲取同步狀態失敗而加入到 CLH 同步佇列中,後續對該執行緒進行中斷操作時,執行緒不會從 CLH 同步佇列中移除。程式碼如下:
1: public final void acquire(int arg) { 2: if (!tryAcquire(arg) && 3: acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4: selfInterrupt(); 5: } |
-
第 2 行:呼叫
#tryAcquire(int arg)
方法,去嘗試獲取同步狀態,獲取成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。若獲取成功,#acquire(int arg)
方法,直接返回,不用執行緒阻塞,自旋直到獲得同步狀態成功。-
#tryAcquire(int arg)
方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。程式碼如下:protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
- 直接丟擲 UnsupportedOperationException 異常。
-
-
第 3 行:如果
#tryAcquire(int arg)
方法返回 false ,即獲取同步狀態失敗,則呼叫#addWaiter(Node mode)
方法,將當前執行緒加入到 CLH 同步佇列尾部。並且,mode
方法引數為Node.EXCLUSIVE
,表示獨佔模式。 -
第 3 行:呼叫
boolean #acquireQueued(Node node, int arg)
方法,自旋直到獲得同步狀態成功。詳細解析,見 「1.1.1 acquireQueued」 中。另外,該方法的返回值型別為boolean
,當返回 true 時,表示在這個過程中,發生過執行緒中斷。但是呢,這個方法又會清理執行緒中斷的標識,所以在種情況下,需要呼叫【第 4 行】的#selfInterrupt()
方法,恢復執行緒中斷的標識,程式碼如下:static void selfInterrupt() { Thread.currentThread().interrupt(); }
1.1.1 acquireQueued
boolean #acquireQueued(Node node, int arg)
方法,為一個自旋的過程,也就是說,當前執行緒(Node)進入同步佇列後,就會進入一個自旋的過程,每個節點都會自省地觀察,當條件滿足,獲取到同步狀態後,就可以從這個自旋過程中退出,否則會一直執行下去。
流程圖如下:
程式碼如下:
1: final boolean acquireQueued(final Node node, int arg) { 2: // 記錄是否獲取同步狀態成功 3: boolean failed = true; 4: try { 5: // 記錄過程中,是否發生執行緒中斷 6: boolean interrupted = false; 7: /* 8: * 自旋過程,其實就是一個死迴圈而已 9: */ 10: for (;;) { 11: // 當前執行緒的前驅節點 12: final Node p = node.predecessor(); 13: // 當前執行緒的前驅節點是頭結點,且同步狀態成功 14: if (p == head && tryAcquire(arg)) { 15: setHead(node); 16: p.next = null; // help GC 17: failed = false; 18: return interrupted; 19: } 20: // 獲取失敗,執行緒等待--具體後面介紹 21: if (shouldParkAfterFailedAcquire(p, node) && 22: parkAndCheckInterrupt()) 23: interrupted = true; 24: } 25: } finally { 26: // 獲取同步狀態發生異常,取消獲取。 27: if (failed) 28: cancelAcquire(node); 29: } 30: } |
- 第 3 行:
failed
變數,記錄是否獲取同步狀態成功。 - 第 6 行:
interrupted
變數,記錄獲取過程中,是否發生執行緒中斷。 - ========== 第 7 至 24 行:“死”迴圈,自旋直到獲得同步狀態成功。==========
- 第 12 行:呼叫
Node#predecessor()
方法,獲得當前執行緒的前一個節點p
。 - 第 14 行:
p == head
程式碼塊,若滿足,則表示當前執行緒的前一個節點為頭節點,因為head
是最後一個獲得同步狀態成功的節點,此時呼叫#tryAcquire(int arg)
方法,嘗試獲得同步狀態。? 在#acquire(int arg)
方法的【第 2 行】,也呼叫了這個方法。 - 第 15 至 18 行:當前節點( 執行緒 )獲取同步狀態成功:
- 第 15 行:設定當前節點( 執行緒 )為新的
head
。 - 第 16 行:設定老的頭節點
p
不再指向下一個節點,讓它自身更快的被 GC 。 - 第 17 行:標記
failed = false
,表示獲取同步狀態成功。 - 第 18 行:返回記錄獲取過程中,是否發生執行緒中斷。
- 第 15 行:設定當前節點( 執行緒 )為新的
- 第 20 至 24 行:獲取失敗,執行緒等待喚醒,從而進行下一次的同步狀態獲取的嘗試。詳細解析,見 《【死磕 Java 併發】—– J.U.C 之 AQS:阻塞和喚醒執行緒》 。詳細解析,見 「1.1.2 shouldParkAfterFailedAcquire」 。
- 第 21 行:呼叫
#shouldParkAfterFailedAcquire(Node pre, Node node)
方法,判斷獲取失敗後,是否當前執行緒需要阻塞等待。
- 第 21 行:呼叫
- ========== 第 26 至 29 行:獲取同步狀態的過程中,發生異常,取消獲取。==========
- 第 28 行:呼叫
#cancelAcquire(Node node)
方法,取消獲取同步狀態。詳細解析,見 「1.1.3 cancelAcquire」 。
1.1.2 shouldParkAfterFailedAcquire
1: private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2: // 獲得前一個節點的等待狀態 3: int ws = pred.waitStatus; 4: if (ws == Node.SIGNAL) // Node.SIGNAL 5: /* 6: * This node has already set status asking a release 7: * to signal it, so it can safely park. 8: */ 9: return true; 10: if (ws > 0) { // Node.CANCEL 11: /* 12: * Predecessor was cancelled. Skip over predecessors and 13: * indicate retry. 14: */ 15: do { 16: node.prev = pred = pred.prev; 17: } while (pred.waitStatus > 0); 18: pred.next = node; 19: } else { // 0 或者 Node.PROPAGATE 20: /* 21: * waitStatus must be 0 or PROPAGATE. Indicate that we 22: * need a signal, but don't park yet. Caller will need to 23: * retry to make sure it cannot acquire before parking. 24: */ 25: compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 26: } 27: return false; 28: } |
pred
和node
方法引數,傳入時,要求前者必須是後者的前一個節點。- 第 3 行:獲得前一個節點(
pre
)的等待狀態。下面會根據這個狀態有三種情況的處理。 - 第 4 至 9 行:等待狀態為
Node.SIGNAL
時,表示pred
的下一個節點node
的執行緒需要阻塞等待。在pred
的執行緒釋放同步狀態時,會對node
的執行緒進行喚醒通知。所以,【第 9 行】返回 true ,表明當前執行緒可以被 park,安全的阻塞等待。 - 第 19 至 26 行:等待狀態為
0
或者Node.PROPAGATE
時,通過 CAS 設定,將狀態修改為Node.SIGNAL
,即下一次重新執行#shouldParkAfterFailedAcquire(Node pred, Node node)
方法時,滿足【第 4 至 9 行】的條件。- 但是,對於本次執行,【第 27 行】返回 false 。
- 另外,等待狀態不會為
Node.CONDITION
,因為它用在 ConditonObject 中。
- 第 10 至 18 行:等待狀態為
NODE.CANCELLED
時,則表明該執行緒的前一個節點已經等待超時或者被中斷了,則需要從 CLH 佇列中將該前一個節點刪除掉,迴圈回溯,直到前一個節點狀態<= 0
。- 對於本次執行,【第 27 行】返回 false ,需要下一次再重新執行
#shouldParkAfterFailedAcquire(Node pred, Node node)
方法,看看滿足哪個條件。 - 整個過程如下圖:
- 對於本次執行,【第 27 行】返回 false ,需要下一次再重新執行
1.1.3 cancelAcquire
1: private void cancelAcquire(Node node) { 2: // Ignore if node doesn't exist 3: if (node == null) 4: return; 5: 6: node.thread = null; 7: 8: // Skip cancelled predecessors 9: Node pred = node.prev; 10: while (pred.waitStatus > 0) 11: node.prev = pred = pred.prev; 12: 13: // predNext is the apparent node to unsplice. CASes below will 14: // fail if not, in which case, we lost race vs another cancel 15: // or signal, so no further action is necessary. 16: Node predNext = pred.next; 17: 18: // Can use unconditional write instead of CAS here. 19: // After this atomic step, other Nodes can skip past us. 20: // Before, we are free of interference from other threads. 21: node.waitStatus = Node.CANCELLED; 22: 23: // If we are the tail, remove ourselves. 24: if (node == tail && compareAndSetTail(node, pred)) { 25: compareAndSetNext(pred, predNext, null); 26: } else { 27: // If successor needs signal, try to set pred's next-link 28: // so it will get one. Otherwise wake it up to propagate. 29: int ws; 30: if (pred != head && 31: ((ws = pred.waitStatus) == Node.SIGNAL || 32: (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 33: pred.thread != null) { 34: Node next = node.next; 35: if (next != null && next.waitStatus <= 0) 36: compareAndSetNext(pred, predNext, next); 37: } else { 38: unparkSuccessor(node); 39: } 40: 41: node.next = node; // help GC 42: } 43: } |
- 第 2 至 4 行:忽略,若傳入引數
node
為空。 - 第 6 行:將節點的等待執行緒置空。
- 第 9 行:獲得
node
節點的前一個節點pred
。- 第 10 至 11 行: 邏輯同
#shouldParkAfterFailedAcquire(Node pred, Node node)
的【第 15 至 17 行】。
- 第 10 至 11 行: 邏輯同
- 第 16 行:獲得
pred
的下一個節點predNext
。在這個變數上,有很“複雜”的英文,我們來理解下:predNext
從表面上看,和node
是等價的。- 但是實際上,存在多執行緒併發的情況,所以在【第 25 行】或者【第 36 行】中,我們呼叫
#compareAndSetNext(...)
方法,使用 CAS 的方式,設定pred
的下一個節點。 - 如果設定失敗,說明當前執行緒和其它執行緒競爭失敗,不需要做其它邏輯,因為
pred
的下一個節點已經被其它執行緒設定成功。
- 但是實際上,存在多執行緒併發的情況,所以在【第 25 行】或者【第 36 行】中,我們呼叫
- 第 21 行:設定
node
節點的為取消的等待狀態Node.CANCELLED
。在這個變數上,有很“複雜”的英文,我們再來理解下:- 這裡可以使用直接寫,而不是 CAS 。
- 在這個操作之後,其它 Node 節點可以忽略
node
。 Before, we are free of interference from other threads.
TODO 9000 芋艿,如何理解。
- 下面開始開始修改
pred
的新的下一個節點,一共分成三種情況。 - ========== 第一種 ==========
- 第 24 行:如果
node
是尾節點,呼叫#compareAndSetTail(...)
方法,CAS 設定pred
為新的尾節點。- 第 25 行:若上述操作成功,呼叫
#compareAndSetNext(...)
方法,CAS 設定pred
的下一個節點為空(null
)。
- 第 25 行:若上述操作成功,呼叫
- ========== 第二種 ==========
- 第 30 行:
pred
非首節點。 - 第 31 至 32 行:
pred
的等待狀態為Node.SIGNAL
,或者可被 CAS 為Node.SIGNAL
。 - 第 33 行:
pred
的執行緒非空。- TODO 9001 芋艿,如何理解。目前能想象到的,一開始 30 行為非頭節點,在 33 的時候,結果成為頭節點,執行緒已經為空了。
- 第 34 至 36 行:若
node
的 下一個節點next
的等待狀態非Node.CANCELLED
,則呼叫#compareAndSetNext(...)
方法,CAS 設定pred
的下一個節點為next
。 - ========== 第三種 ==========
- 第 37 至 39 行:如果
pred
為首節點( ? 在【第 31 至 33 行】也會有別的情況 ),呼叫#unparkSuccessor(Node node)
方法,喚醒node
的下一個節點的執行緒等待。詳細解析,見 《【死磕 Java 併發】—– J.U.C 之 AQS:阻塞和喚醒執行緒》 。- 為什麼此處需要喚醒呢?因為,
pred
為首節點,node
的下一個節點的阻塞等待,需要node
釋放同步狀態時進行喚醒。但是,node
取消獲取同步狀態,則不會再出現node
釋放同步狀態時進行喚醒node
的下一個節點。因此,需要此處進行喚醒。
- 為什麼此處需要喚醒呢?因為,
- ========== 第 二 + 三種 ==========
-
第 41 行:TODO 芋艿 9002 為啥是 next 為 node 。目前收集到的資料如下:
-
next
的註釋如下:/** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */
- 最後一句話
-
1.2 獨佔式獲取響應中斷
AQS 提供了acquire(int arg)
方法,以供獨佔式獲取同步狀態,但是該方法對中斷不響應,對執行緒進行中斷操作後,該執行緒會依然位於CLH同步佇列中,等待著獲取同步狀態。為了響應中斷,AQS 提供了 #acquireInterruptibly(int arg)
方法。該方法在等待獲取同步狀態時,如果當前執行緒被中斷了,會立刻響應中斷,並丟擲 InterruptedException 異常。
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } |
- 首先,校驗該執行緒是否已經中斷了,如果是,則丟擲InterruptedException 異常。
- 然後,呼叫
#tryAcquire(int arg)
方法,嘗試獲取同步狀態,如果獲取成功,則直接返回。 - 最後,呼叫
#doAcquireInterruptibly(int arg)
方法,自旋直到獲得同步狀態成功,或執行緒中斷丟擲 InterruptedException 異常。 - 應該不僅僅 help gc
1.2.1 doAcquireInterruptibly
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); // <1> } } finally { if (failed) cancelAcquire(node); } } |
它與 #acquire(int arg)
方法僅有兩個差別:
- 方法宣告丟擲 InterruptedException 異常。
- 在中斷方法處不再是使用
interrupted
標誌,而是直接丟擲 InterruptedException 異常,即<1>
處。
1.3 獨佔式超時獲取
AQS 除了提供上面兩個方法外,還提供了一個增強版的方法 #tryAcquireNanos(int arg, long nanos)
。該方法為 #acquireInterruptibly(int arg)
方法的進一步增強,它除了響應中斷外,還有超時控制。即如果當前執行緒沒有在指定時間內獲取同步狀態,則會返回 false ,否則返回 true 。
流程圖如下:
程式碼如下:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } |
- 首先,校驗該執行緒是否已經中斷了,如果是,則丟擲InterruptedException 異常。
- 然後,呼叫
#tryAcquire(int arg)
方法,嘗試獲取同步狀態,如果獲取成功,則直接返回。 - 最後,呼叫
#tryAcquireNanos(int arg)
方法,自旋直到獲得同步狀態成功,或執行緒中斷丟擲 InterruptedException 異常,或超過指定時間返回獲取同步狀態失敗。
1.3.1 tryAcquireNanos
static final long spinForTimeoutThreshold = 1000L; 1: private boolean doAcquireNanos(int arg, long nanosTimeout) 2: throws InterruptedException { 3: // nanosTimeout <= 0 4: if (nanosTimeout <= 0L) 5: return false; 6: // 超時時間 7: final long deadline = System.nanoTime() + nanosTimeout; 8: // 新增 Node 節點 9: final Node node = addWaiter(Node.EXCLUSIVE); 10: boolean failed = true; 11: try { 12: // 自旋 13: for (;;) { 14: final Node p = node.predecessor(); 15: // 獲取同步狀態成功 16: if (p == head && tryAcquire(arg)) { 17: setHead(node); 18: p.next = null; // help GC 19: failed = false; 20: return true; 21: } 22: /* 23: * 獲取失敗,做超時、中斷判斷 24: */ 25: // 重新計算需要休眠的時間 26: nanosTimeout = deadline - System.nanoTime(); 27: // 已經超時,返回false 28: if (nanosTimeout <= 0L) 29: return false; 30: // 如果沒有超時,則等待nanosTimeout納秒 31: // 注:該執行緒會直接從LockSupport.parkNanos中返回, 32: // LockSupport 為 J.U.C 提供的一個阻塞和喚醒的工具類,後面做詳細介紹 33: if (shouldParkAfterFailedAcquire(p, node) && 34: nanosTimeout > spinForTimeoutThreshold) 35: LockSupport.parkNanos(this, nanosTimeout); 36: // 執行緒是否已經中斷了 37: if (Thread.interrupted()) 38: throw new InterruptedException(); 39: } 40: } finally { 41: if (failed) 42: cancelAcquire(node); 43: } 44: } |
- 因為是在
#doAcquireInterruptibly(int arg)
方法的基礎上,做了超時控制的增強,所以相同部分,我們直接跳過。 - 第 3 至 5 行:如果超時時間小於 0 ,直接返回 false ,已經超時。
- 第 7 行:計算最終超時時間
deadline
。 - 第 9 行:【相同,跳過】
- 第 10 行:【相同,跳過】
- 第 13 行:【相同,跳過】
- 第 14 行:【相同,跳過】
- 第 15 至 21 行:【相同,跳過】
- 第 26 行:重新計算剩餘可獲取同步狀態的時間
nanosTimeout
。 - 第 27 至 29 行:如果剩餘時間小於 0 ,直接返回 false ,已經超時。
- 第 33 行:【相同,跳過】
- 第 34 至 35 行:如果剩餘時間大於
spinForTimeoutThreshold
,則呼叫LockSupport#parkNanos(Object blocker, long nanos)
方法,休眠nanosTimeout
納秒。否則,就不需要休眠了,直接進入快速自旋的過程。原因在於,spinForTimeoutThreshold
已經非常小了,非常短的時間等待無法做到十分精確,如果這時再次進行超時等待,相反會讓nanosTimeout
的超時從整體上面表現得不是那麼精確。所以,在超時非常短的場景中,AQS 會進行無條件的快速自旋。 - 第 36 至 39 行:若執行緒已經中斷了,丟擲 InterruptedException 異常。
- 第 40 至 43 行:【相同,跳過】
1.4 獨佔式同步狀態釋放
當執行緒獲取同步狀態後,執行完相應邏輯後,就需要釋放同步狀態。AQS 提供了#release(int arg)
方法,釋放同步狀態。程式碼如下:
1: public final boolean release(int arg) { 2: if (tryRelease(arg)) { 3: Node h = head; 4: if (h != null && h.waitStatus != 0) 5: unparkSuccessor(h); 6: return true; 7: } 8: return false; 9: } |
-
第 2 行:呼叫
#tryRelease(int arg)
方法,去嘗試釋放同步狀態,釋放成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。同時,它們分別對應【第 3 至 6】和【第 8 行】的邏輯。-
#tryRelease(int arg)
方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的釋放同步狀態。程式碼如下:protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
- 直接丟擲 UnsupportedOperationException 異常。
-
-
第 3 行:獲得當前的
head
,避免併發問題。 -
第 4 行:頭結點不為空,並且頭結點狀態不為 0 (
INITAL
未初始化)。為什麼會出現 0 的情況呢?老艿艿的想法是,以 ReentrantReadWriteLock ( ? 內部基於 AQS 實現 ) 舉例子:- 執行緒 A 和執行緒 B ,都獲取了讀鎖。
-
執行緒 A 和執行緒 B ,基本同時釋放鎖,那麼此時【第 3 行】
h
可能獲取到的是相同的head
節點。此時,A 執行緒恰好先執行了#unparkSuccessor(Node node)
方法,會將waitStatus
設定為 0 。因此,#unparkSuccessor(Node node)
方法,對於 B 執行緒就不需要呼叫了。當然,更極端的情況下,可能 A 和 B 執行緒,都呼叫了#unparkSuccessor(Node node)
方法。老艿艿:如上是我的猜想,並未實際驗證。如果不正確,或者有其他情況,歡迎斧正。
-
第 5 行:呼叫
#unparkSuccessor(Node node)
方法,喚醒下一個節點的執行緒等待。詳細解析,見 《【死磕 Java 併發】—– J.U.C 之 AQS:阻塞和喚醒執行緒》 。
1.5 總結
這裡稍微總結下:
在 AQS 中維護著一個 FIFO 的同步佇列。
- 當執行緒獲取同步狀態失敗後,則會加入到這個 CLH 同步佇列的對尾,並一直保持著自旋。
- 在 CLH 同步佇列中的執行緒在自旋時,會判斷其前驅節點是否為首節點,如果為首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步佇列。
- 當執行緒執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。
2. 共享式
共享式與獨佔式的最主要區別在於,同一時刻:
- 獨佔式只能有一個執行緒獲取同步狀態。
- 共享式可以有多個執行緒獲取同步狀態。
例如,讀操作可以有多個執行緒同時進行,而寫操作同一時刻只能有一個執行緒進行寫操作,其他操作都會被阻塞。參見 ReentrantReadWriteLock 。
2.1 共享式同步狀態獲取
AQS 提供 #acquireShared(int arg)
方法,共享式獲取同步狀態。程式碼如下:
#acquireShared(int arg)
方法,對標#acquire(int arg)
方法。
1: public final void acquireShared(int arg) { 2: if (tryAcquireShared(arg) < 0) 3: doAcquireShared(arg); 4: } |
-
第 2 行:呼叫
#tryAcquireShared(int arg)
方法,嘗試獲取同步狀態,獲取成功則設定鎖狀態並返回大於等於 0 ,否則獲取失敗,返回小於 0 。若獲取成功,直接返回,不用執行緒阻塞,自旋直到獲得同步狀態成功。-
#tryAcquireShared(int arg)
方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。程式碼如下:protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
- 直接丟擲 UnsupportedOperationException 異常。
-
2.1.1 doAcquireShared
1: private void doAcquireShared(int arg) { 2: // 共享式節點 3: final Node node = addWaiter(Node.SHARED); 4: boolean failed = true; 5: try { 6: boolean interrupted = false; 7: for (;;) { 8: // 前驅節點 9: final Node p = node.predecessor(); 10: // 如果其前驅節點,獲取同步狀態 11: if (p == head) { 12: // 嘗試獲取同步 13: int r = tryAcquireShared(arg); 14: if (r >= 0) { 15: setHeadAndPropagate(node, r); 16: p.next = null; // help GC 17: if (interrupted) 18: selfInterrupt(); 19: failed = false; 20: return; 21: } 22: } 23: if (shouldParkAfterFailedAcquire(p, node) && 24: parkAndCheckInterrupt()) 25: interrupted = true; 26: } 27: } finally { 28: if (failed) 29: cancelAcquire(node); 30: } 31: } |
- 因為和
#acquireQueued(int arg)
方法的基礎上,所以相同部分,我們直接跳過。 - 第 3 行:呼叫
#addWaiter(Node mode)
方法,將當前執行緒加入到 CLH 同步佇列尾部。並且,mode
方法引數為Node.SHARED
,表示共享模式。 - 第 6 行:【相同,跳過】
- 第 9 至 22 行:【大體相同,部分跳過】
- 第 13 行:呼叫
#tryAcquireShared(int arg)
方法,嘗試獲得同步狀態。? 在#acquireShared(int arg)
方法的【第 2 行】,也呼叫了這個方法。 - 第 15 行:呼叫
#setHeadAndPropagate(Node node, int propagate)
方法,設定新的首節點,並根據條件,喚醒下一個節點。詳細解析,見 「2.1.2 setHeadAndPropagate」 。- 這裡和獨佔式同步狀態獲取很大的不同:通過這樣的方式,不斷喚醒下一個共享式同步狀態, 從而實現同步狀態被多個執行緒的共享獲取。
- 第 17 至 18 行:和
#acquire(int arg)
方法,對於執行緒中斷的處理方式相同,只是程式碼放置的位置不同。
- 第 13 行:呼叫
- 第 23 至 25 行:【相同,跳過】
- 第 27 至 30 行:【相同,跳過】
2.1.2 setHeadAndPropagate
1: private void setHeadAndPropagate(Node node, int propagate) { 2: Node h = head; // Record old head for check below 3: setHead(node); 4: /* 5: * Try to signal next queued node if: 6: * Propagation was indicated by caller, 7: * or was recorded (as h.waitStatus either before 8: * or after setHead) by a previous operation 9: * (note: this uses sign-check of waitStatus because 10: * PROPAGATE status may transition to SIGNAL.) 11: * and 12: * The next node is waiting in shared mode, 13: * or we don't know, because it appears null 14: * 15: * The conservatism in both of these checks may cause 16: * unnecessary wake-ups, but only when there are multiple 17: * racing acquires/releases, so most need signals now or soon 18: * anyway. 19: */ 20: if (propagate > 0 || h == null || h.waitStatus < 0 || 21: (h = head) == null || h.waitStatus < 0) { 22: Node s = node.next; 23: if (s == null || s.isShared()) 24: doReleaseShared(); 25: } 26: } |
- 第 2 行:記錄原來的首節點
h
。 - 第 3 行:呼叫
#setHead(Node node)
方法,設定node
為新的首節點。 - 第 20 行:
propagate > 0
程式碼塊,說明同步狀態還能被其他執行緒獲取。 - 第 20 至 21 行:判斷原來的或者新的首節點,等待狀態為
Node.PROPAGATE
或者Node.SIGNAL
時,可以繼續向下喚醒。 - 第 23 行:呼叫
Node#isShared()
方法,判斷下一個節點為共享式獲取同步狀態。 - 第 24 行:呼叫
#doReleaseShared()
方法,喚醒後續的共享式獲取同步狀態的節點。詳細解析,見 「2.1.2 setHeadAndPropagate」 。
2.2 共享式獲取響應中斷
#acquireSharedInterruptibly(int arg)
方法,程式碼如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } |
2.3 共享式超時獲取
#tryAcquireSharedNanos(int arg, long nanosTimeout)
方法,程式碼如下:
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } |
2.4 共享式同步狀態釋放
當執行緒獲取同步狀態後,執行完相應邏輯後,就需要釋放同步狀態。AQS 提供了#releaseShared(int arg)
方法,釋放同步狀態。程式碼如下:
1: public final boolean releaseShared(int arg) { 2: if (tryReleaseShared(arg)) { 3: doReleaseShared(); 4: return true; 5: } 6: return false; 7: } |
-
第 2 行:呼叫
#tryReleaseShared(int arg)
方法,去嘗試釋放同步狀態,釋放成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。同時,它們分別對應【第 3 至 5】和【第 6 行】的邏輯。-
#tryReleaseShared(int arg)
方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的釋放同步狀態。程式碼如下:protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
- 直接丟擲 UnsupportedOperationException 異常。
-
-
第 3 行:呼叫
#doReleaseShared()
方法,喚醒後續的共享式獲取同步狀態的節點。
2.4.1 doReleaseShared
1: private void doReleaseShared() { 2: /* 3: * Ensure that a release propagates, even if there are other 4: * in-progress acquires/releases. This proceeds in the usual 5: * way of trying to unparkSuccessor of head if it needs 6: * signal. But if it does not, status is set to PROPAGATE to 7: * ensure that upon release, propagation continues. 8: * Additionally, we must loop in case a new node is added 9: * while we are doing this. Also, unlike other uses of 10: * unparkSuccessor, we need to know if CAS to reset status 11: * fails, if so rechecking. 12: */ 13: for (;;) { 14: Node h = head; 15: if (h != null && h != tail) { 16: int ws = h.waitStatus; 17: if (ws == Node.SIGNAL) { 18: if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 19: continue; // loop to recheck cases 20: unparkSuccessor(h); 21: } 22: else if (ws == 0 && 23: !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 24: continue; // loop on failed CAS 25: } 26: if (h == head) // loop if head changed 27: break; 28: } 29: } |