【Java 併發筆記】AbstractQueuedSynchronizer 相關整理
文前說明
作為碼農中的一員,需要不斷的學習,我工作之餘將一些分析總結和學習筆記寫成部落格與大家一起交流,也希望採用這種方式記錄自己的學習之旅。
本文僅供學習交流使用,侵權必刪。
不用於商業目的,轉載請註明出處。
- 在 java.util.concurrent.locks 包中有很多 Lock 的實現類,常用的 ReentrantLock、ReadWriteLock(實現類 ReentrantReadWriteLock),內部實現都依賴 AbstractQueuedSynchronizer 類(簡稱 AQS)。
- AQS 定義了一套多執行緒訪問共享資源的同步器框架,是抽象的佇列式的同步器。
1. 框架整理

AQS 框架結構
- AQS 實現了一個volatile int state 成員變數標識同步狀態(此變數代表著共享資源,更改這個變數值來獲取和釋放鎖),通過內建的 FIFO(先進先出佇列) 雙向佇列來完成資源獲取執行緒排隊的工作。
- 橘紅色結點是預設 head 結點,是一個空結點,代表當前持有鎖的執行緒,每當有執行緒競爭失敗,都是插入到佇列的尾結點, tail 結點始終指向佇列中的最後一個元素。
- 每個結點中,除了儲存了 當前執行緒 , 前後結點的引用 以外,還有一個 waitStatus 變數,用於描述 結點當前的狀態 。
- 多執行緒併發執行時,佇列中會有多個結點存在,waitStatus 代表著對應執行緒的狀態。
- waitStatus 有 4 種狀態。
表 1
狀態值 | 狀態 | 說明 |
---|---|---|
1 | CANCELLED | 取消狀態 |
-1 | SIGNAL | 等待觸發狀態 |
-2 | CONDITION | 等待條件狀態 |
-3 | PROPAGATE | 狀態需要向後傳播 |
- AQS 在判斷狀態時,通過用 waitStatus > 0 表示取消狀態,而 waitStatus < 0 表示有效狀態。
- 等待佇列是 FIFO 先進先出,只有前一個結點的狀態為 SIGNAL 時,當前結點的執行緒才能被掛起。
- 共享資源 state 的訪問方式有三種。
private static final Unsafe unsafe = Unsafe.getUnsafe(); protected final int getState() { return this.state; } protected final void setState(int newState) { this.state = newState; } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
- AQS 定義兩種資源共享方式。
- Exclusive (獨佔,只有一個執行緒能執行,如 ReentrantLock)。
- Share (共享,多個執行緒可同時執行,如 Semaphore / CountDownLatch)。
- 不同的自定義同步器爭用共享資源的方式也不同。
- 自定義同步器在實現時只需要實現共享資源 state 的獲取與釋放方式即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊 / 喚醒出隊等),AQS 已經在頂層實現。
- 自定義同步器實現時主要實現以下方法。
方法名稱 | 說明 |
---|---|
isHeldExclusively() | 該執行緒是否正在獨佔資源。只有用到 condition 才需要去實現它。 |
tryAcquire(int) | 獨佔方式。嘗試獲取資源,成功則返回 true,失敗則返回 false。 |
tryRelease(int) | 獨佔方式。嘗試釋放資源,成功則返回 true,失敗則返回 false。 |
tryAcquireShared(int) | 共享方式。嘗試獲取資源。負數表示失敗;0 表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。 |
tryReleaseShared(int) | 共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回 true,否則返回 false。 |
- 自定義同步器要麼是獨佔方式,要麼是共享方式,也只需實現 tryAcquire - tryRelease、tryAcquireShared - tryReleaseShared 中的一種即可。
- 但 AQS 也支援自定義同步器同時實現獨佔和共享兩種方式,如 ReentrantReadWriteLock。
ReentrantLock
- state 初始化為 0,表示未鎖定狀態。A 執行緒 lock 時,呼叫 tryAcquire 獨佔該鎖並將 state + 1。
- 其他執行緒再 tryAcquire 時失敗,直到 A 執行緒 unlock 到 state = 0(即釋放鎖)為止,其它執行緒才有機會獲取該鎖。
- 釋放鎖之前,A 執行緒可以重複獲取此鎖的(state 累加),這就是可重入的概念。
- 獲取多少次就要釋放多少次,這樣才能保證 state 回零。
CountDownLatch
- 任務分為 N 個子執行緒執行,state 初始化為 N(與執行緒個數一致)。
- N 個子執行緒並行執行,每個子執行緒執行完成 countDown 一次,state CAS 減 1。
- 所有子執行緒執行完成(即 state = 0),Unsafe.unpark 主執行緒,然後主執行緒從 await 函式返回,繼續後續動作。
1.1 Node 結構

結點結構
Node { int waitStatus; Node prev; Node next; Node nextWaiter; Thread thread; }
屬性名稱 | 描述 |
---|---|
int waitStatus | 表示結點的狀態。其中包含的狀態見表 1。 |
Node prev | 前驅結點,比如當前節點被取消,那就需要前驅結點和後繼結點來完成連線。 |
Node next | 後繼結點。 |
Node nextWaiter | 儲存 condition 佇列中的後繼結點。 |
Thread thread | 入佇列時的當前執行緒。 |
- 其中同步佇列(Sync queue),是雙向連結串列,包括 head 結點和 tail 結點,head 結點主要用作後續的排程。而 Condition queue 不是必須的,它是一個單向連結串列,只有當使用 Condition 時,才會存在此單向連結串列。並且可能會有多個 Condition queue。
2. 實現分析整理
2.1 獨佔模式
2.1.1 acquire(int)(執行緒獲取鎖的過程)
- 此方法是獨佔模式下執行緒獲取共享資源的頂層入口。
- 如果獲取到資源,執行緒直接返回。
- 否則進入等待佇列,直到獲取到資源為止,且整個過程 忽略中斷 的影響。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

acquire 流程
- 執行流程。
-
tryAcquire()
嘗試直接獲取資源,成功則 state 值被修改並返回 true,執行緒繼續執行。 - 獲取資源失敗,執行
addWaiter()
將該執行緒加入等待佇列的尾部,並標記為獨佔模式。 -
acquireQueued()
使執行緒在等待佇列中獲取資源,一直獲取到資源後才返回。- 如果在整個等待過程中被中斷過,則返回 true,否則返回 false。
- 如果執行緒在等待過程中被中斷過,它是不響應的。
- 只是獲取資源後才再進行自我中斷
selfInterrupt()
,將中斷補上。
- 只是獲取資源後才再進行自我中斷
-
- 其中判定退出佇列的條件是否滿足和休眠當前執行緒完成了 自旋 的過程。
tryAcquire
- 嘗試去獲取獨佔資源。如果獲取成功,則直接返回 true,否則直接返回 false。
- AQS 只定義了一個介面,具體資源的獲取交由自定義同步器去實現(通過 state 的 get / set / CAS)
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
- 該方法需要子類來實現,但是卻沒有使用 abstract 來修飾。是因為 AQS 有獨佔和共享兩種模式,而子類可以只實現其中一種功能,如果使用 abstract 來修飾,每個子類都需要同時實現兩種功能的方法,對子類不太友好。
addWaiter(Node)
- 用於將當前執行緒加入到等待佇列的隊尾,並返回當前執行緒所在的結點。
- 生成新 Node 結點 node,如果 tail 結點不為空,則通過 CAS 指令插入到等待佇列的隊尾(同一時刻可能會有多個 Node 結點插入到等待佇列中),並建立前後引用關係。
- 如果 tail 結點為空,則將 head 結點指向一個空結點。
private Node addWaiter(Node mode) { //以給定模式構造結點。mode 有兩種:EXCLUSIVE(獨佔)和 SHARED(共享) Node node = new Node(Thread.currentThread(), mode); //嘗試快速方式直接放到隊尾。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //上一步失敗則通過 enq 入隊。 enq(node); return node; } ...... private Node enq(final Node node) { //CAS " 自旋 ",直到成功加入隊尾。 for (;;) { Node t = tail; if (t == null) { // Must initialize //佇列為空,建立一個空的標誌結點作為 head 結點,並將 tail 也指向它。 if (compareAndSetHead(new Node())) tail = head; } else { //正常流程,放入隊尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued(Node, int)
- 通過
tryAcquire()
和addWaiter()
,該執行緒獲取資源失敗,被放入等待佇列尾部。- node 插入到隊尾後,該執行緒不會立馬掛起,會進行 自旋 操作。
- 判斷該 node 的前一個結點 pred 是否為 head 結點。
- 如果是,則表明當前結點是佇列中第一個有效結點,再次嘗試
tryAcquire()
獲取鎖。- 如果成功獲取到鎖,執行緒 node 無需掛起。
- 如果獲取鎖失敗,表示前驅執行緒還未完成,至少還未修改 state 的值。
- 呼叫
shouldParkAfterFailedAcquire()
,結點進入隊尾後,檢查狀態,找到安全休息點。 - 呼叫
parkAndCheckInterrupt()
,進入 waiting 狀態,等待 unpark() 或 interrupt() 喚醒。shouldParkAfterFailedAcquire()
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //標記是否成功拿到資源 try { boolean interrupted = false; //標記等待過程中是否被中斷過 //自旋 for (;;) { final Node p = node.predecessor(); //獲得前驅結點 //如果前驅結點 p 是 head,即該結點 node 為第二結點,那麼便有資格去嘗試獲取資源(可能是 p 釋放完資源後喚醒,也可能被 interrupt)。 if (p == head && tryAcquire(arg)) { setHead(node); //獲取資源後,將 head 指向 node。 p.next = null; // setHead 中 node.prev 已置為 null,此處再將 p.next 置為 null,就是為了方便 GC 回收以前的 head 結點 p,也就意味著之前拿完資源的結點 p 出隊。 failed = false; return interrupted; //返回等待過程中是否被中斷過 } //如果自己可以休息了,就進入 waiting 狀態,直到被 unpark()。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //如果等待過程中被中斷過,哪怕只有那麼一次,就將 interrupted 標記為 true。 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire(Node, Node)
- 用於檢查狀態,看是否真的可以休息。(進入 waiting 狀態)
parkAndCheckInterrupt()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//獲得前驅結點狀態。 if (ws == Node.SIGNAL) //如果前驅結點狀態為等待觸發,則進入安全休息點。 return true; if (ws > 0) { //如果前驅為取消狀態,就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後面。 //那些取消狀態的結點,由於被當前結點 " 加塞 " 到它們前邊,它們相當於形成一個無引用鏈,稍後會被 GC 回收。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果前驅結點正常,將前驅狀態設定成 SIGNAL 等待觸發 //下一次迴圈進入 shouldParkAfterFailedAcquire 因為前驅狀態已經設定為 SIGNAL,因此直接返回 true,執行 parkAndCheckInterrupt,對當前執行緒 park。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt()
- 讓執行緒真正進入等待狀態。
- park() 會讓當前執行緒進入 waiting 狀態。在此狀態下,有兩種途徑可以喚醒該執行緒。
- unpark() 和 interrupt()。
- 需要注意 Thread.interrupted() 會 清除當前執行緒的中斷標記位 。
- park() 會讓當前執行緒進入 waiting 狀態。在此狀態下,有兩種途徑可以喚醒該執行緒。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //呼叫 park() 使執行緒進入 waiting 狀態。 return Thread.interrupted(); //如果被喚醒,檢視自己是不是被中斷的。 }
- 執行緒每次被喚醒時,都要進行中斷檢測,如果發現當前執行緒被中斷,丟擲
InterruptedException
異常並退出迴圈。- 從無限迴圈的程式碼可以看出,並不是被喚醒的執行緒一定能獲得鎖,必須呼叫
tryAccquire()
重新競爭,因為鎖是 非公平 的,有可能被新加入的執行緒獲得,從而導致剛被喚醒的執行緒再次被阻塞。- 如果已經在佇列中的執行緒,必須按照順序執行(等待前驅結點的相關操作,這是 公平的 ),非公平是針對那種還沒進佇列的執行緒可以和佇列中的第一個結點 head 搶佔資源。
- 從無限迴圈的程式碼可以看出,並不是被喚醒的執行緒一定能獲得鎖,必須呼叫

執行緒獲取鎖流程
selfInterrupt()
- 根據
acquireQueued()
的結果決定是否執行中斷。 -
acquireQueued()
中的parkAndCheckInterrupt()
方法已經執行了中斷,這裡再執行一次中斷的目的在於。- 如果當前執行緒是非中斷狀態,則在執行
parkAndCheckInterrupt
中 park 時被阻塞,這時返回中斷狀態是 false。不再執行selfInterrupt()
。 - 如果當前執行緒是中斷狀態,則執行
parkAndCheckInterrupt
中 park 方法不起作用,會立即返回 true,並且將中斷狀態復位。由於中斷狀態已經復位,selfInterrupt()
用 park 方法時會阻塞執行緒。
- 如果當前執行緒是非中斷狀態,則在執行
- 這裡判斷執行緒中斷的狀態實際上是為了不讓迴圈一直執行,讓當前執行緒進入阻塞的狀態。
- 如果一直迴圈下去,會造成 CPU 使用率飆升的後果。
static void selfInterrupt() { Thread.currentThread().interrupt(); }
cancelAcquire(Node)
- 在
acquireQueued()
方法的 finally 語句塊中,如果在迴圈的過程中出現了異常,則執行 cancelAcquire 方法,用於將該結點標記為取消狀態。
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; //設定該結點不再關聯任何執行緒。 node.thread = null; // 通過前繼結點跳過取消狀態的 node。 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 獲取過濾後的前繼結點的後繼結點。 Node predNext = pred.next; // 設定狀態為取消狀態。 node.waitStatus = Node.CANCELLED; // 1.如果當前結點是 tail,嘗試更新 tail 結點,設定 tail 為 pred。更新失敗則返回,成功則設定 tail 的後繼結點為 null。 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 2.如果當前結點不是 head 的後繼結點,判斷當前結點的前繼結點的狀態是否為 SIGNAL,如果不是則嘗試設定前繼結點的狀態為 SIGNAL。 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 3.如果是 head 的後繼結點或者狀態判斷設定失敗,則喚醒當前結點的後繼結點。 unparkSuccessor(node); } node.next = node; // help GC } }
- 結點取消分三種情況。
- 當前結點是 tail。
- 因為 tail 是佇列的最後一個結點,如果該結點需要取消,則直接把該結點的前繼結點的 next 指向 null,也就是把當前結點移除佇列。
- 注意,這裡並沒有設定 node 的 prev 為 null。
- 當前結點不是 head 的後繼結點,也不是 tail。
- 將 node 的前繼結點的 next 指向了 node 的後繼結點。
- 當前節點是 head 的後繼結點。
- unpark 後繼結點的執行緒,然後將 next 指向了自己。
- 當前結點是 tail。

當前結點是 tail

當前結點不是 head 的後繼結點,也不是 tail

當前節點是 head 的後繼結點
- 既然要刪除結點,為什麼沒有對 prev 進行操作,僅僅是修改了 next。
- 因為修改指標的操作都是 CAS 操作,在 AQS 中所有以 compareAndSet 開頭的方法都是嘗試更新,並不保證成功。
- 不能用 CAS 操作更新 prev,因為 prev 是不確定的,更新失敗有可能會導致整個佇列的不完整,例如把 prev 指向一個已經移除佇列的 node。
- prev 是由其他執行緒來修改的,通過
shouldParkAfterFailedAcquire()
方法。
do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node;
- 因為
shouldParkAfterFailedAcquire()
方法是在獲取鎖失敗的情況下才能執行。- 進入該方法時,說明已經有執行緒獲得了鎖。
- 在執行該方法時,當前結點之前的結點不會發生變化(因為只有當下一個結點獲得鎖的時候才會設定 head),所以這裡可以更新 prev,並且不必用 CAS 來更新。
2.1.2 release(int)(執行緒釋放鎖的過程)
- 此方法是獨佔模式下執行緒釋放共享資源的頂層入口。
- 釋放指定量資源,如果徹底釋放(即 state = 0),喚醒等待佇列裡的其他執行緒來獲取資源。
- 呼叫 tryRelease() 釋放資源。根據返回值判斷該執行緒是否已經完成釋放資源,自定義同步器在設計時需明確這一點。
- 釋放指定量資源,如果徹底釋放(即 state = 0),喚醒等待佇列裡的其他執行緒來獲取資源。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //獲得頭結點。 if (h != null && h.waitStatus != 0) unparkSuccessor(h); //喚醒等待佇列裡的下一個執行緒。 return true; } return false; }
tryRelease(int)
- 嘗試去釋放指定量的資源。
tryAcquire() tryRelease()
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
unparkSuccessor(Node)
- 用於喚醒等待佇列中下一個執行緒。
- 執行流程。
- 如果頭結點 head 的 waitStatus 值為 -1,則用 CAS 指令重置為 0。
- 找到 waitStatus 值小於 0 的結點 s,通過 LockSupport.unpark(s.thread) 喚醒執行緒。
private void unparkSuccessor(Node node) { //node 為當前執行緒所在結點。 int ws = node.waitStatus; if (ws < 0) //置零當前執行緒所在的結點狀態,允許失敗。 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //找到下一個需要喚醒的結點 s。 if (s == null || s.waitStatus > 0) { //如果為空或取消狀態 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) // <=0 的結點,都是有效結點。 s = t; } if (s != null) LockSupport.unpark(s.thread); //喚醒 }
2.2 共享模式
2.2.1 acquireShared(int)(執行緒獲取鎖的過程)
- 此方法是共享模式下執行緒獲取共享資源的頂層入口。
- 獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待佇列,直到獲取到資源為止,整個過程忽略中斷。
-
acquireShared()
的流程。tryAcquireShared() doAcquireShared()
- 跟
acquire()
的流程大同小異,多了一步當前結點拿到資源後(還有資源剩餘)繼續喚醒後繼結點的操作(共享)。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared(int)
-
tryAcquireShared()
需要自定義同步器實現。- AQS 已經定義了該方法返回值的語義。
- 負數 代表獲取失敗。
- 0 代表獲取成功,但沒有剩餘資源。
- 正數 表示獲取成功,還有剩餘資源,其他執行緒還可以去獲取。
- AQS 已經定義了該方法返回值的語義。
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
doAcquireShared(int)
- 此方法用於將當前執行緒加入等待佇列尾部休息,直到其他執行緒釋放資源喚醒自己,自己成功拿到相應量的資源後才返回。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); //加入佇列尾部,SHARED 模式。 boolean failed = true; //是否成功標誌 try { boolean interrupted = false; //等待過程中是否被中斷過的標誌 for (;;) { final Node p = node.predecessor(); //獲得前驅結點 if (p == head) { //如果結點為 head 結點的下一個,因為 head 是拿到資源的執行緒,此時 node 被喚醒,很可能是 head 用完資源來喚醒自己的。 int r = tryAcquireShared(arg); //嘗試獲取資源 if (r >= 0) { //獲取資源成功 setHeadAndPropagate(node, r); //將 head 指向自己,如果還有剩餘資源可以再喚醒之後的執行緒。 p.next = null; // help GC if (interrupted) //如果等待過程中被打斷過,此時將中斷補上。 selfInterrupt(); failed = false; return; } } //判斷狀態,尋找安全點,進入 waiting 狀態,等著被 unpark() 或 interrupt()。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
- 與
acquireQueued()
相似,流程並沒有太大區別。- 跟獨佔模式比,只有當前結點 node 的前驅結點是 head 時,才會去嘗試獲取資源,資源有剩餘的情況再去喚醒之後的結點。如果資源不夠,當前結點會繼續 park() 等待其他執行緒釋放資源,而 不會去喚醒後續結點 (即使後續結點所需資源量更小)。
- 獨佔模式下,同一時刻只有一個執行緒執行,這樣做未嘗不可。
- 共享模式下,多個執行緒是可以同時執行的,因為當前結點資源需求量大,而將後續量小的結點阻塞。這是 AQS 保證 嚴格按照入隊順序喚醒 (保證了公平,降低了併發)。
- 跟獨佔模式比,只有當前結點 node 的前驅結點是 head 時,才會去嘗試獲取資源,資源有剩餘的情況再去喚醒之後的結點。如果資源不夠,當前結點會繼續 park() 等待其他執行緒釋放資源,而 不會去喚醒後續結點 (即使後續結點所需資源量更小)。
setHeadAndPropagate(Node, int)
- 此方法在
setHead()
的基礎上多了一步,自己甦醒的同時,如果條件符合(還有剩餘資源),還會去喚醒後繼結點。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //獲得 head 結點 setHead(node); //head 指向當前結點 // 如果還有剩餘量,繼續喚醒下一個結點 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
2.2.2 releaseShared()(執行緒釋放鎖的過程)
- 此方法是共享模式下執行緒釋放共享資源的頂層入口。
- 會釋放指定量的資源,如果成功釋放且允許喚醒等待執行緒,會喚醒等待佇列裡的其他執行緒來獲取資源。
- 獨佔模式下的
tryRelease()
需要在完全釋放掉資源(state = 0)後,才會返回 true 去喚醒其他執行緒,主要是基於獨佔下可重入的考量。 - 共享模式下的
releaseShared()
沒有這種要求,共享模式實質就是控制一定量的執行緒併發執行,只要擁有資源的執行緒在釋放掉部分資源後就可以喚醒後繼等待結點。- 例如,資源總量是 13,A(5)和 B(7)分別獲取到資源併發執行,C(4)到來時只剩 1 個資源就需要等待。
- A 在執行過程中釋放掉 2 個資源量,然後
tryReleaseShared(2)
返回 true 喚醒 C,C只有 3 個資源量仍不夠,繼續等待。 - 隨後 B 又釋放 2 個資源量,
tryReleaseShared(2)
返回 true 喚醒 C,C 發現資源量 5 個足夠自己使用,然後 C 就可以跟 A 和 B 一起執行。 - ReentrantReadWriteLock 讀鎖的
tryReleaseShared()
只有在完全釋放掉資源(state = 0)才返回 true,所以自定義同步器可以根據需要決定tryReleaseShared()
的返回值。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
tryReleaseShared(int)
protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
doReleaseShared()
- 主要用於喚醒後繼。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); //喚醒後繼 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head)// head 發生變化 break; } }
-
acquire()
和acquireShared()
兩種方法下,執行緒在等待佇列中都是忽略中斷的。- AQS也支援響應中斷,
acquireInterruptibly()
和acquireSharedInterruptibly()
即是。
- AQS也支援響應中斷,
acquireInterruptibly(int)
tryAcquire() acquire()
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
acquireSharedInterruptibly(int)
- 與共享模式執行緒獲取鎖類似,多了響應中斷的處理。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
tryAcquireNanos(int, long)
- 該方法提供了具備有超時功能的獲取狀態的呼叫。
acquireInterruptibly()
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
doAcquireNanos(int, long)
- 針對超時控制這部分的實現,主要需要計算出睡眠的間隔值。
- 間隔可以表示為 nanosTimeout = System.nanoTime()(睡眠之前記錄的時間) + 原有 nanosTimeout – System.nanoTime()(當前時間) 。
- 如果 nanosTimeout 大於 0,那麼還需要當前執行緒睡眠,反之則返回 false。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } //計算時間,最後期限減去當前時間,得到了還應該睡眠的時間。 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
- doAcquireNanos() 的流程。
- 將當前結點 node 加入到佇列中。
- 如果前驅結點是 head 結點並且成功獲取到狀態,那麼設定自己為 head 結點並退出,返回 true,在指定的 nanosTimeout 之前獲取了鎖。
- 獲取狀態失敗,通過 LockSupport.park 指定當前執行緒休眠一段時間。
- 喚醒後的執行緒,計算仍需要休眠的時間。嘗試再獲取狀態,如果失敗後檢視其 nanosTimeout 是否大於0,如果小於 0,那麼返回超時(false),沒有獲取到鎖。
- 如果 nanosTimeout 小於等於 1000L 納秒,則進入快速的自旋過程。
- Doug Lea 應該測算了線上程排程器上的切換造成的額外開銷,因此在短時 1000 納秒內就讓當前執行緒進入快速自旋狀態,如果這時再休眠相反會讓 nanosTimeout 的獲取時間變得更加不精確。
2.3 等待佇列
- AQS 維護的佇列是當前等待資源的佇列。
- 當前執行緒獲取同步狀態失敗時,同步器會將當前執行緒以及等待狀態等資訊構造成為一個結點並將其加入同步佇列,同時會阻塞當前執行緒,當同步狀態釋放時,會把首結點中的執行緒喚醒,使其再次嘗試獲取同步狀態。

同步佇列
- 每個 Condition 維護著一個佇列,該佇列的作用是維護一個等待 singal 訊號的佇列。

等待佇列
- 同步佇列和等待佇列使用的是同一種結點型別 AQS.Node。
2.3.1 Condition 介面
- 同步佇列和等待佇列的作用是不同的。每個執行緒只能存在於同步佇列或等待佇列中的一個。
- 任意一個 Java 物件,都擁有一組監視器方法定義在( java.lang.Object 上),主要包括 wait()、notify()、notifyAll() 方法,這些方法與synchronized 同步關鍵字配合,可以實現等待/通知模式。
- Condition 介面提供了類似 Object 的監視器方法,與 lock 配合可以實現等待/通知模式。
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
方法 | 說明 |
---|---|
await() | 呼叫此方法後,會使當前執行緒在接收到喚醒訊號(signal)之前或被中斷之前一直處於等待休眠狀態。呼叫此方法後,當前執行緒會釋放持有的鎖。如果當前等待執行緒從該方法返回(被喚醒),那麼在返回之前會重新獲取鎖(獲取到鎖才能繼續執行)。 |
await(long time,TimeUnit unit) | 呼叫此方法後,會使當前執行緒在接收到喚醒訊號之前、被中斷之前或到達指定等待時間之前一直處於等待狀態。如果在從此方法返回前檢測到等待時間超時,則返回 false,否則返回 true。 |
awaitNanos(long nanosTimeout) | 該方法等效於 await(long time,TimeUnit unit) 方法,只是等待的時間是 nanosTimeout 指定的以毫微秒數為單位的等待時間。該方法返回值是所剩毫微秒數的一個估計值,如果超時,則返回一個小於等於 0 的值。可以根據該返回值來確定是否要再次等待,以及再次等待的時間。 |
awaitUninterruptibly() | 當前執行緒進入等待狀態直到被通知,該方法對中斷忽略。 |
awaitUntil(Date deadline) | 當前執行緒進入等待狀態直到被通知,中斷或者到某個時間,如果沒有到指定時間就被通知,返回 true,否則表示到了指定時間,返回 false。 |
signal() | 喚醒一個等待執行緒,如果所有的執行緒都在等待此條件,則選擇其中的一個喚醒。在從 await 返回之前,該執行緒必須重新獲取鎖。 |
signalAll() | 喚醒所有等待執行緒,如果所有的執行緒都在等待此條件,則喚醒所有執行緒。 在從 await 返回之前,每個執行緒必須重新獲取鎖。 |
2.3.2 Condition 的實現
- Condition 的實現類是 ConditionObject 。
- ConditionObject 是 AQS 的內部類,Condition 的操作需要獲取相關聯的鎖,需要和同步器掛鉤。
- 每個 Condition 物件都包含著一個佇列(等待佇列),Condition 中也有結點的概念,在將執行緒放到等待佇列中時會構造結點。
- 等待佇列也是一個 FIFO 的佇列,在佇列中的每個節點都包含了一個執行緒引用,該執行緒就是在 Condition 物件上等待的執行緒,如果一個執行緒呼叫了 await 方法,那麼該執行緒將會釋放鎖,構造成結點加入等待佇列並進入等待狀態。
- 一個 Condition 包含一個等待佇列,Condition 擁有首結點(firstWaiter)和尾結點(lastWaiter)。
- 以下使用 生產者和消費者模式 用例進行說明同步佇列和等待佇列之間的區別與協同工作。
- 在一個有大小的佇列 queue 中,生產者往佇列中放資料,消費者從佇列中取資料,當佇列不滿時,生產者可以繼續生產資料,當佇列不空時,消費者可以繼續取資料,如果不符合條件,則等待,直到符合條件為止。
public class TestQueue<T> { //佇列大小 private int size; //list 充當佇列 private List<T> queue; //鎖 private Lock lock = new ReentrantLock(); //保證佇列大小不 <0 的 condition private Condition notEmpty = lock.newCondition(); //保證佇列大小不 >size 的 condition private Condition notFull = lock.newCondition(); public TestQueue(int size) { this.size = size; queue = new ArrayList<T>(); } public void product(T t) throws Exception { lock.lock(); try { //如果佇列滿,則不能生產,等待消費者消費資料。 while (size == queue.size()) { notFull.await(); } //佇列已經有空位置,放入一個數據。 queue.add(t); //通知消費者可以繼續消費。 notEmpty.signal(); } finally { lock.unlock(); } } public T consume() throws Exception { lock.lock(); try { //佇列為空,則不能消費,等待生產者生產資料。 while (queue.size() == 0) { notEmpty.await(); } //佇列已經有資料,拿掉一個數據 T t = queue.remove(0); //通知生產者可以繼續生產。 notFull.signal(); return t; } finally { lock.unlock(); } } }
- 假設存線上程 a 和執行緒 b。
- 執行緒 a 呼叫
product()
方法,執行了lock.lock()
,執行緒 a 加入到 AQS 同步佇列中,構建結點 A。 - 執行緒 b 呼叫
consume()
方法,執行了lock.lock()
,執行緒 b 加入到 AQS 同步佇列中,構建結點 B。 - 結點 B 是結點 A 的後繼結點,結點 A 是結點 B 的前驅結點。
- 同步佇列初始狀態為下圖。
- 執行緒 a 呼叫

同步佇列的初始狀態
- 假設自定義佇列 queue 已滿,執行緒 a(結點 A)呼叫
notFull.await()
方法。- 執行緒 a(結點 A)從 AQS 同步佇列中被移除,對應操作是鎖的釋放。
- 執行緒 a(結點 A)被加入到 Condition 等待佇列,執行緒 a 需要等待 singal 訊號。
- 執行緒 b(結點 B)由於執行緒 a(結點 A)釋放鎖被喚醒,成為同步佇列的頭結點且同步狀態為 0 可以獲取鎖。
- 執行緒 b(結點 B)獲取鎖。

結點 A 進入等待佇列
- 假設執行緒 b(結點 B)呼叫
notFull.singal()
方法,Condition 等待佇列中只有結點 A,把它取出來加入到 AQS 同步佇列中。- 這時候執行緒 a(結點 A)並沒有被喚醒 。

結點 A 重新進入 AQS 佇列
-
執行緒 b(結點 B)
notFull.signal()
方法執行完畢,呼叫lock.unlock()
方法釋放鎖。執行緒 a(結點 A)成為 AQS 首結點並且同步狀態可獲取,執行緒 a(結點 A)被喚醒,繼續執行。 -
AQS 按從頭到尾的順序喚醒執行緒,直到等待佇列中的執行緒被執行完畢結束。
await(等待)
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
-
await(等待)的流程。
- 如果執行緒被中斷,丟擲中斷異常。
- 生成一個結點 node(與當前執行緒繫結),並將該結點加入等待佇列中。
- 如果尾結點狀態不是 CONDITION,也就是執行緒任務被取消了,那麼需要從等待佇列中清除掉。
- 釋放該執行緒的鎖(狀態)。
- 因為某執行緒可能多次呼叫(重入)了 lock() 方法,需要將狀態全部釋放,這樣後面的執行緒才能重新從 state = 0 開始競爭鎖。
- 直到當前結點不在同步佇列中,掛起該執行緒。
- 如果當前結點的的狀態等於 CONDITION 或者前驅結點 pre 為空,則不表示不在同步佇列中。
- 如果當前結點的 next 結點不為空,則表示在同步佇列中,next 和 pre 只在同步佇列中使用,等待佇列不會使用。
- 如果只是當前結點的前驅結點 pre 不為空,是不能說明該結點就在同步佇列中的。
- 因為同步佇列中新增結點的方法,是先設定 node.prev = pred,然後再 CAS 設定 tail,但是 CAS 設定可能失敗,從而導致設定了 node 的前驅結點,確並沒有把 node 加入同步佇列。
- 執行緒喚醒後,獲取同步狀態(鎖)。
- 執行
acquireQueued()
,重新加入到獲取同步狀態的競爭中(掛起前釋放了鎖)。
- 執行
- 執行緒喚醒後,如果不是尾節點,那麼檢查佇列,清除一些取消的節點。
-
因為在執行 await 前,執行緒獲取了鎖,所以沒有使用 CAS 來保證執行緒安全。
-
呼叫 await 後,執行緒會釋放 全部 鎖,然後被掛起。
-
如果 await 返回,表明當前執行緒已經重新獲取到了鎖。
signal(通知)
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
- signal(通知)的流程。
- 如果當前不是獨佔模式,那麼就會丟擲異常,也就是說 Condition 對於 共享模式 不適用。
- 獲取第一個等待結點,然後進行喚醒工作。
- 重新設定 firstWaiter,指向第一個 waiter 的 nextWaiter。
- 如果第一個 waiter 的 nextWaiter 為 null,說明當前佇列中只有一個 waiter,將 lastWaiter 置空,然後執行
transferForSignal()
方法。 -
transferForSignal()
方法將一個結點從等待佇列轉換到同步佇列。- 嘗試將 node 的 waitStatus 從 CONDITION 置為 0,如果失敗直接返回 false。
- 當前結點呼叫 enq 方法進入同步佇列。
- 當前結點通過 CAS 機制將 waitStatus 置為 SIGNAL,返回 true,代表喚醒成功。
- 某個被
await()
的節點被喚醒後並不意味著它後面的程式碼會立即執行,它會被加入到同步佇列的尾部。 - 如果
transferForSignal()
執行失敗,會返回 false,然後會對下個結點進行喚醒,同時firstWaiter
也會被重新設定,最終取消狀態的結點會被移出等待佇列。
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
2.4 問題
2.4.1 插入節點的程式碼順序
原始碼
-
addWaiter()
和enq()
方法中,先將 node.prev 設定為 tail 結點,再嘗試 CAS 修改。
Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
修改
- 嘗試 CAS 修改後,再設定結點之間的雙向連結。
Node pred = tail; if (pred != null) { //node.prev = pred;// del if (compareAndSetTail(pred, node)) { node.prev = pred; // add pred.next = node; return node; } }
分析
- 雙向連結串列目前沒有基於 CAS 原子插入的手段,程式碼按上述修改後執行,會導致這一瞬間的 tail 的 prev 為 null,使得這一瞬間佇列處於一種不一致的中間狀態。
2.4.2 喚醒節點從 tail 向前遍歷
原始碼
-
unparkSuccessor
方法中喚醒後繼結點時,是從 tail 向前查詢最接近 node 的非取消節點。
Node s = node.next; //找到下一個需要喚醒的結點 s。 if (s == null || s.waitStatus > 0) { //如果為空或取消狀態 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) // <=0 的結點,都是有效結點。 s = t; }
分析
- node.next 為 null,不代表 node 為 tail。
- 當有新執行緒結點通過 addWaiter 中的 if 分支或者 enq 方法新增自己,並且 compareAndSetTail 成功,但是還未進行 node.next = 新結點 設定。
- 這時 node.next 雖然為 null,但實際上 node 物件已經擁有後續結點。
2.4.3 PROPAGATE 狀態存在的意義
原始碼
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { //Node.PROPAGATE 狀態就是為了此處可以讀取到 h.waitStatus < 0(PROPAGATE 值為 -3)。 Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
- JSR 166 repository 上 ofollow,noindex">Revision1.73 中,PROPAGATE 狀態被引入用以修復 bug 6801020 。
修復前版本
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 && node.waitStatus != 0) { Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); } }
分析
- 假設存在將要訊號量釋放的 T3 和 T4,釋放順序為先 T3 後 T4。
- 假設存在某次迴圈中佇列裡排隊的結點情況為 head -> T1 node -> tail。
- 修復前版本執行流程。
- T3 呼叫
releaseShared()
,直接呼叫了unparkSuccessor(head)
,head 的等待狀態從 -1 變為 0。 - T1 由於 T3 釋放訊號量被喚醒,呼叫 tryAcquireShared,假設返回值為 0(獲取鎖成功,但沒有剩餘資源量)。
- T4 呼叫
releaseShared()
,此時 head.waitStatus 為 0(此時讀到的 head 和 1 中為同一個head),不滿足條件,因此不呼叫unparkSuccessor(head)
。 - T1 獲取訊號量成功,呼叫
setHeadAndPropagate
時,因為不滿足 propagate > 0(2 的返回值也就是 propagate(剩餘資源量) == 0),從而不會喚醒後繼結點,出現執行緒 hang 住問題。
- T3 呼叫
- 修復後版本(原始碼)執行流程。
- T3 呼叫
releaseShared()
,直接呼叫了unparkSuccessor(head)
,head 的等待狀態從 -1 變為 0。 - T1 由於 T3 釋放訊號量被喚醒,呼叫 tryAcquireShared,假設返回值為 0(獲取鎖成功,但沒有剩餘資源量)。
- T4 呼叫
releaseShared()
,此時 head.waitStatus 為 0(此時讀到的 head 和 1 中為同一個 head), 呼叫doReleaseShared()
將等待狀態置為 PROPAGATE 。 - T1 獲取訊號量成功,呼叫
setHeadAndPropagate
時,讀到 h.waitStatus < 0,從而呼叫doReleaseShared()
喚醒 T2。
- T3 呼叫
- 在 PROPAGATE 引入之前,之所以可能會出現執行緒 hang 住的情況,在於
releaseShared()
有競爭的情況下,可能會有佇列中處於等待狀態的結點因為第一個執行緒完成釋放喚醒,第二個執行緒獲取到鎖,但還沒設定好 head,又有新執行緒釋放鎖,但是讀到老的 head 狀態為 0,導致釋放但不喚醒,最終後一個等待執行緒既沒有被釋放執行緒喚醒,也沒有被持鎖執行緒喚醒。僅僅靠tryAcquireShared()
的返回值來決定是否要將喚醒傳遞下去是不充分的。
2.4.4 AQS 如何防止記憶體洩露
- AQS 在無競爭條件下,甚至都不會 new 出 head 和 tail 節點。
- 執行緒成功獲取鎖時設定 head 節點的方法為 setHead。
- 由於頭結點的 thread 並不重要,此時會置 node 的 thread 和 prev 為 null。
- 會置原先 head 的 next 為 null,從而實現隊首元素的安全移出。
- 取消結點時,會令 node.thread = null,在 node 不為 tail 的情況下,使 node.next = node。