1. 程式人生 > >這才是圖文並茂:我寫了1萬多字,就是為了讓你瞭解AQS是怎麼執行的

這才是圖文並茂:我寫了1萬多字,就是為了讓你瞭解AQS是怎麼執行的

### 前言 如果你想深入研究Java併發的話,那麼AQS一定是繞不開的一塊知識點,Java併發包很多的同步工具類底層都是基於AQS來實現的,比如我們工作中經常用的Lock工具ReentrantLock、柵欄CountDownLatch、訊號量Semaphore等,而且關於AQS的知識點也是面試中經常考察的內容,所以,無論是為了更好的使用還是為了應付面試,深入學習AQS都很有必要。 ## CAS 學習AQS之前,我們有必要了解一個知識點,就是AQS底層中大量使用的CAS,關於CAS,大家應該都不陌生,如果還有哪位同學不清楚的話,可以看看我之前的文章[《面試必備知識點:悲觀鎖和樂觀鎖的那些事兒》](https://www.cnblogs.com/yeya/p/13587232.html) ,這裡不多複述,哈哈,給自己舊文章加了閱讀量 ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000126126-627129123.jpg) 此時,好幾塊搬磚朝我飛了過來。。。。。 好吧,開個玩笑,還是大概講解一下吧,瞭解的同學可以跳過這一段。 CAS是樂觀鎖的一種思想,它假設執行緒對資源的訪問是沒有衝突的,同時所有的執行緒執行都不需要等待,可以持續執行。 如果有衝突的話,就用比較+交換的方式來檢測衝突,有衝突就不斷重試。 CAS的全稱是Compare-and-Swap,也就是比較並交換,它包含了三個引數:V,A,B,V表示要讀寫的記憶體位置,A表示舊的預期值,B表示新值,當執行CAS時,只有當V的值等於預期值A時,才會把V的值改為B,這樣的方式可以讓多個執行緒同時去修改,但也會因為執行緒操作失敗而不斷重試,對CPU有一定程式上的開銷。 ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000138455-1939376405.png) ## AQS簡介 本文主角正式登場。 AQS,全名AbstractQueuedSynchronizer,是一個抽象類的佇列式同步器,它的內部通過維護一個狀態volatile int state(共享資源),一個FIFO執行緒等待佇列來實現同步功能。 state用關鍵字volatile修飾,代表著該共享資源的狀態一更改就能被所有執行緒可見,而AQS的加鎖方式本質上就是多個執行緒在競爭state,當state為0時代表執行緒可以競爭鎖,不為0時代表當前物件鎖已經被佔有,其他執行緒來加鎖時則會失敗,加鎖失敗的執行緒會被放入一個FIFO的等待佇列中,這些執行緒會被`UNSAFE.park()`操作掛起,等待其他獲取鎖的執行緒釋放鎖才能夠被喚醒。 而這個等待佇列其實就相當於一個CLH佇列,用一張原理圖來表示大致如下: ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000159426-480733009.jpg) ### ### 基礎定義 AQS支援兩種資源分享的方式:Exclusive(獨佔,只有一個執行緒能執行,如ReentrantLock)和Share(共享,多個執行緒可同時執行,如Semaphore/CountDownLatch)。 自定義的同步器繼承AQS後,只需要實現共享資源state的獲取和釋放方式即可,其他如執行緒佇列的維護(如獲取資源失敗入隊/喚醒出隊等)等操作,AQS在頂層已經實現了, AQS程式碼內部提供了一系列操作鎖和執行緒佇列的方法,主要操作鎖的方法包含以下幾個: - compareAndSetState():利用CAS的操作來設定state的值 - tryAcquire(int):獨佔方式獲取鎖。成功則返回true,失敗則返回false。 - tryRelease(int):獨佔方式釋放鎖。成功則返回true,失敗則返回false。 - tryAcquireShared(int):共享方式釋放鎖。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。 - tryReleaseShared(int):共享方式釋放鎖。如果釋放後允許喚醒後續等待結點返回true,否則返回false。 像ReentrantLock就是實現了自定義的tryAcquire-tryRelease,從而操作state的值來實現同步效果。 除此之外,AQS內部還定義了一個靜態類Node,表示CLH佇列的每一個結點,該結點的作用是對每一個等待獲取資源做了封裝,包含了需要同步的執行緒本身、執行緒等待狀態..... 我們可以看下該類的一些重點變數: ``` static final class Node { /** 表示共享模式下等待的Node */ static final Node SHARED = new Node(); /** 表示獨佔模式下等待的mode */ static final Node EXCLUSIVE = null; /** 下面幾個為waitStatus的具體值 */ static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; /** 表示前面的結點 */ volatile Node prev; /** 表示後面的結點 */ volatile Node next; /**當前結點裝載的執行緒,初始化時被建立,使用後會置空*/ volatile Thread thread; /**連結到下一個節點的等待條件,用到Condition的時候會使用到*/ Node nextWaiter; } ``` 程式碼裡面定義了一個表示當前Node結點等待狀態的欄位`waitStatus`,該欄位的取值包含了CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,這五個值代表了不同的特定場景: - **CANCELLED**:表示當前結點已取消排程。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀態,進入該狀態後的結點將不會再變化。 - **SIGNAL**:表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新為SIGNAL(記住這個-1的值,因為後面我們講的時候經常會提到) - **CONDITION**:表示結點等待在Condition上,當其他執行緒呼叫了Condition的signal()方法後,CONDITION狀態的結點將**從等待佇列轉移到同步佇列中**,等待獲取同步鎖。(注:**Condition**是AQS的一個元件,後面會細說) - **PROPAGATE**:共享模式下,前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。 - **0**:新結點入隊時的預設狀態。 也就是說,當waitStatus為**負值表示結點處於有效等待狀態,為正值的時候表示結點已被取消。** 在AQS內部中還維護了兩個Node物件`head`和`tail`,一開始預設都為null ``` private transient volatile Node head; private transient volatile Node tail; ``` 講完了AQS的一些基礎定義,我們就可以開始學習同步的具體執行機制了,為了更好的演示,我們用ReentrantLock作為使用入口,一步步跟進原始碼探究AQS底層是如何運作的,**這裡說明一下,因為ReentrantLock底層呼叫的AQS是獨佔模式,所以下文講解的AQS原始碼也是針對獨佔模式的操作** 好了,熱身正式結束,來吧。 ## 獨佔模式 ### 加鎖過程 我們都知道,ReentrantLock的加鎖和解鎖方法分別為lock()和unLock(),我們先來看獲取鎖的方法, ``` final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } ``` 邏輯很簡單,執行緒進來後直接利用`CAS`嘗試搶佔鎖,如果搶佔成功`state`值回被改為1,且設定物件獨佔鎖執行緒為當前執行緒,否則就呼叫`acquire(1)`再次嘗試獲取鎖。 我們假定有兩個執行緒A和B同時競爭鎖,A進來先搶佔到鎖,此時的AQS模型圖就類似這樣: ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000214549-70427225.png) 繼續走下面的方法, ``` public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` `acquire`包含了幾個函式的呼叫, tryAcquire:嘗試直接獲取鎖,如果成功就直接返回; addWaiter:將該執行緒加入等待佇列FIFO的尾部,並標記為獨佔模式; acquireQueued:執行緒阻塞在等待佇列中獲取鎖,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。 selfInterrupt:自我中斷,就是既拿不到鎖,又在等待時被中斷了,執行緒就會進行自我中斷selfInterrupt(),將中斷補上。 我們一個個來看原始碼,並結合上面的兩個執行緒來做場景分析。 #### **tryAcquire** 不用多說,就是為了再次嘗試獲取鎖 ``` protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } ``` 當執行緒B進來後,**nonfairTryAcquire**方法首先會獲取state的值,如果為0,則正常獲取該鎖,不為0的話判斷是否是當前執行緒佔用了,是的話就累加state的值,這裡的累加也是為了配合釋放鎖時候的次數,從而實現可重入鎖的效果。 當然,因為之前鎖已經被執行緒A佔領了,所以這時候`tryAcquire`會返回false,繼續下面的流程。 #### addWaiter ``` private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } ``` 這段程式碼首先會建立一個和當前執行緒繫結的`Node`節點,`Node`為雙向連結串列。此時等待佇列中的`tail`指標為空,直接呼叫`enq(node)`方法將當前執行緒加入等待佇列尾部,然後返回當前結點的前驅結點, ``` private Node enq(final Node node) { // CAS"自旋",直到成功加入隊尾 for (;;) { Node t = tail; if (t == null) { // 佇列為空,初始化一個Node結點作為Head結點,並將tail結點也指向它 if (compareAndSetHead(new Node())) tail = head; } else { // 把當前結點插入佇列尾部 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ``` 第一遍迴圈時,tail指標為空,初始化一個Node結點,並把head和tail結點都指向它,然後第二次迴圈進來之後,tail結點不為空了,就將當前的結點加入到tail結點後面,也就是這樣: ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000230061-996350559.png) todo 如果此時有另一個執行緒C進來的話,發現鎖已經被A拿走了,然後佇列裡已經有了執行緒B,那麼執行緒C就只能乖乖排到執行緒B的後面去, ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000237629-381835424.png) #### acquireQueued 接著解讀方法,通過tryAcquire()和addWaiter(),我們的執行緒還是沒有拿到資源,並且還被排到了佇列的尾部,如果讓你來設計的話,這個時候你會怎麼處理執行緒呢?其實答案也很簡單,能做的事無非兩個: **1、迴圈讓執行緒再搶資源。但仔細一推敲就知道不合理,因為如果有多個執行緒都參與的話,你搶我也搶只會降低系統性能** **2、進入等待狀態休息,直到其他執行緒徹底釋放資源後喚醒自己,自己再拿到資源** 毫無疑問,選擇2更加靠譜,acquireQueued方法做的也是這樣的處理: ``` final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // 標記是否會被中斷 boolean interrupted = false; // CAS自旋 for (;;) { // 獲取當前結點的前結點 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 獲取鎖失敗,則將此執行緒對應的node的waitStatus改為CANCEL cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 前驅結點等待狀態為"SIGNAL",那麼自己就可以安心等待被喚醒了 return true; if (ws > 0) { /* * 前驅結點被取消了,通過迴圈一直往前找,直到找到等待狀態有效的結點(等待狀態值小於等於0) , * 然後排在他們的後邊,至於那些被當前Node強制"靠後"的結點,因為已經被取消了,也沒有引用鏈, * 就等著被GC了 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前驅正常,那就把前驅的狀態設定成SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } ``` `acquireQueued`方法的流程是這樣的: 1、CAS自旋,先判斷當前傳入的Node的前結點是否為head結點,是的話就嘗試獲取鎖,獲取鎖成功的話就把當前結點置為head,之前的head置為null(方便GC),然後返回 2、如果前驅結點不是head或者加鎖失敗的話,就呼叫`shouldParkAfterFailedAcquire`,將前驅節點的**waitStatus**變為了**SIGNAL=-1**,最後執行`parkAndChecknIterrupt`方法,呼叫`LockSupport.park()`掛起當前執行緒,`parkAndCheckInterrupt`在掛起執行緒後會判斷執行緒是否被中斷,如果被中斷的話,就會重新跑`acquireQueued`方法的CAS自旋操作,直到獲取資源。 ps:LockSupport.park方法會讓當前執行緒進入waitting狀態,在這種狀態下,執行緒被喚醒的情況有兩種,一是被unpark(),二是被interrupt(),所以,如果是第二種情況的話,需要返回被中斷的標誌,然後在`acquire`頂層方法的視窗那裡自我中斷補上 此時,因為執行緒A還未釋放鎖,所以執行緒B狀態都是被掛起的, ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000253237-2080352702.png) 到這裡,加鎖的流程就分析完了,其實整體來說也並不複雜,而且當你理解了獨佔模式加鎖的過程,後面釋放鎖和共享模式的執行機制也沒什麼難懂的了,所以整個加鎖的過程還是有必要多消化下的,也是AQS的重中之重。 為了方便你們更加清晰理解,我加多一張流程圖吧(這個作者也太暖了吧,哈哈) ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000304827-852852462.png) ### 釋放鎖 說完了加鎖,我們來看看釋放鎖是怎麼做的,AQS中釋放鎖的方法是`release()`,當呼叫該方法時會釋放指定量的資源 (也就是鎖) ,如果徹底釋放了(即state=0),它會喚醒等待佇列裡的其他執行緒來獲取資源。 還是一步步看原始碼吧, ``` public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } ``` #### tryRelease 程式碼上可以看出,核心的邏輯都在`tryRelease`方法中,該方法的作用是釋放資源,AQS裡該方法沒有具體的實現,需要由自定義的同步器去實現,我們看下**ReentrantLock**程式碼中對應方法的原始碼: ``` protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } ``` `tryRelease`方法會減去state對應的值,如果state為0,也就是已經徹底釋放資源,就返回true,並且把獨佔的執行緒置為null,否則返回false。 此時AQS中的資料就會變成這樣: ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000318007-633540248.png) 完全釋放資源後,當前執行緒要做的就是喚醒CLH佇列中第一個在等待資源的執行緒,也就是head結點後面的執行緒,此時呼叫的方法是`unparkSuccessor()`, ``` private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) //將head結點的狀態置為0 compareAndSetWaitStatus(node, ws, 0); //找到下一個需要喚醒的結點s Node s = node.next; //如果為空或已取消 if (s == null || s.waitStatus >
0) { s = null; // 從後向前,直到找到等待狀態小於0的結點,前面說了,結點waitStatus小於0時才有效 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 找到有效的結點,直接喚醒 if (s != null) LockSupport.unpark(s.thread);//喚醒 } ``` 方法的邏輯很簡單,就是先將head的結點狀態置為0,避免下面找結點的時候再找到head,然後找到佇列中最前面的有效結點,然後喚醒,我們假設這個時候執行緒A已經釋放鎖,那麼此時佇列中排最前邊競爭鎖的執行緒B就會被喚醒, 然後被喚醒的執行緒B就會嘗試用CAS獲取鎖,回到`acquireQueued`方法的邏輯, ``` for (;;) { // 獲取當前結點的前結點 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } ``` 當執行緒B獲取鎖之後,會把當前結點賦值給head,然後原先的前驅結點 (也就是原來的head結點) 去掉引用鏈,方便回收,這樣一來,執行緒B獲取鎖的整個過程就完成了,此時AQS的資料就會變成這樣: ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000333250-1552499348.png) 到這裡,我們已經分析完了AQS獨佔模式下加鎖和釋放鎖的過程,也就是**tryAccquire->tryRelease**這一鏈條的邏輯,除此之外,AQS中還支援共享模式的同步,這種模式下關於鎖的操作核心其實就是**tryAcquireShared->tryReleaseShared**這兩個方法,我們可以簡單看下 ## 共享模式 ### 獲取鎖 AQS中,共享模式獲取鎖的頂層入口方法是`acquireShared`,該方法會獲取指定數量的資源,成功的話就直接返回,失敗的話就進入等待佇列,直到獲取資源, ``` public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } ``` 該方法裡包含了兩個方法的呼叫, tryAcquireShared:嘗試獲取一定資源的鎖,返回的值代表獲取鎖的狀態。 doAcquireShared:進入等待佇列,並迴圈嘗試獲取鎖,直到成功。 #### tryAcquireShared `tryAcquireShared`在AQS裡沒有實現,同樣由自定義的同步器去完成具體的邏輯,像一些較為常見的併發工具Semaphore、CountDownLatch裡就有對該方法的自定義實現,雖然實現的邏輯不同,但方法的作用是一樣的,就是獲取一定資源的資源,然後根據返回值判斷是否還有剩餘資源,從而決定下一步的操作。 返回值有三種定義: - 負值代表獲取失敗; - 0代表獲取成功,但沒有剩餘的資源,也就是state已經為0; - 正值代表獲取成功,而且state還有剩餘,其他執行緒可以繼續領取 當返回值小於0時,證明此次獲取一定數量的鎖失敗了,然後就會走`doAcquireShared`方法 #### doAcquireShared 此方法的作用是將當前執行緒加入等待佇列尾部休息,直到其他執行緒釋放資源喚醒自己,自己成功拿到相應量的資源後才返回,這是它的原始碼: ``` private void doAcquireShared(int arg) { // 加入佇列尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // CAS自旋 for (;;) { final Node p = node.predecessor(); // 判斷前驅結點是否是head if (p == head) { // 嘗試獲取一定數量的鎖 int r = tryAcquireShared(arg); if (r >
= 0) { // 獲取鎖成功,而且還有剩餘資源,就設定當前結點為head,並繼續喚醒下一個執行緒 setHeadAndPropagate(node, r); // 讓前驅結點去掉引用鏈,方便被GC p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 跟獨佔模式一樣,改前驅結點waitStatus為-1,並且當前執行緒掛起,等待被喚醒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // head指向自己 setHead(node); // 如果還有剩餘量,繼續喚醒下一個鄰居執行緒 if (propagate >
0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } ``` 看到這裡,你會不會一點熟悉的感覺,這個方法的邏輯怎麼跟上面那個`acquireQueued()` 那麼類似啊?對的,其實兩個流程並沒有太大的差別。只是`doAcquireShared()`比起獨佔模式下的獲取鎖上多了一步喚醒後繼執行緒的操作,當獲取完一定的資源後,發現還有剩餘的資源,就繼續喚醒下一個鄰居執行緒,這才符合"共享"的思想嘛。 這裡我們可以提出一個疑問,共享模式下,當前執行緒釋放了一定數量的資源,但這部分資源滿足不了下一個等待結點的需要的話,那麼會怎麼樣? 按照正常的思維,共享模式是可以多個執行緒同時執行的才對,所以,多個執行緒的情況下,如果老大釋放完資源,但這部分資源滿足不了老二,但能滿足老三,那麼老三就可以拿到資源。可事實是,從原始碼設計中可以看出,如果真的發生了這種情況,老三是拿不到資源的,因為等待佇列是按順序排列的,老二的資源需求量大,會把後面量小的老三以及老四、老五等都給卡住。**從這一個角度來看,雖然AQS嚴格保證了順序,但也降低了併發能力** 接著往下說吧,喚醒下一個鄰居執行緒的邏輯在`doReleaseShared()`中,我們放到下面的釋放鎖來解析。 ### 釋放鎖 共享模式釋放鎖的頂層方法是`releaseShared`,它會釋放指定量的資源,如果成功釋放且允許喚醒等待執行緒,它會喚醒等待佇列裡的其他執行緒來獲取資源。下面是releaseShared()的原始碼: ``` public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } ``` 該方法同樣包含兩部分的邏輯: tryReleaseShared:釋放資源。 doAcquireShared:喚醒後繼結點。 跟`tryAcquireShared`方法一樣,`tryReleaseShared`在AQS中沒有具體的實現,由子同步器自己去定義,但功能都一樣,就是釋放一定數量的資源。 釋放完資源後,執行緒不會馬上就收工,而是喚醒等待佇列裡最前排的等待結點。 #### doAcquireShared 喚醒後繼結點的工作在`doReleaseShared()`方法中完成,我們可以看下它的原始碼: ``` private void doReleaseShared() { for (;;) { // 獲取等待佇列中的head結點 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // head結點waitStatus = -1,喚醒下一個結點對應的執行緒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒後繼結點 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } ``` 程式碼沒什麼特別的,就是如果等待佇列head結點的waitStatus為-1的話,就直接喚醒後繼結點,喚醒的方法`unparkSuccessor()`在上面已經講過了,這裡也沒必要再複述。 總的來看,AQS共享模式的運作流程和獨佔模式很相似,只要掌握了獨佔模式的流程運轉,共享模式什麼的不就那樣嗎,沒難度。這也是我為什麼共享模式講解中不畫流程圖的原因,沒必要嘛。 ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000353583-281804091.jpg) ### Condition 介紹完了AQS的核心功能,我們再擴充套件一個知識點,在AQS中,除了提供獨佔/共享模式的加鎖/解鎖功能,它還對外提供了關於**Condition**的一些操作方法。 **Condition**是個介面,在jdk1.5版本後設計的,基本的方法就是`await()`和`signal()`方法,功能大概就對應**Object**的`wait()`和`notify()`,Condition必須要配合鎖一起使用,因為對共享狀態變數的訪問發生在多執行緒環境下。一個Condition的例項必須與一個Lock繫結,因此Condition一般都是作為Lock的內部實現 ,AQS中就定義了一個類**ConditionObject**來實現了這個介面, ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000403999-1174369617.png) 那麼它應該怎麼用呢?我們可以簡單寫個demo來看下效果 ``` public class ConditionDemo { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); Thread tA = new Thread(() -> { lock.lock(); try { System.out.println("執行緒A加鎖成功"); System.out.println("執行緒A執行await被掛起"); condition.await(); System.out.println("執行緒A被喚醒成功"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); System.out.println("執行緒A釋放鎖成功"); } }); Thread tB = new Thread(() -> { lock.lock(); try { System.out.println("執行緒B加鎖成功"); condition.signal(); System.out.println("執行緒B喚醒執行緒A"); } finally { lock.unlock(); System.out.println("執行緒B釋放鎖成功"); } }); tA.start(); tB.start(); } } ``` 執行main函式後結果輸出為: ``` 執行緒A加鎖成功 執行緒A執行await被掛起 執行緒B加鎖成功 執行緒B喚醒執行緒A 執行緒B釋放鎖成功 執行緒A被喚醒成功 執行緒A釋放鎖成功 ``` 程式碼執行的結果很容易理解,執行緒A先獲取鎖,然後呼叫`await()`方法掛起當前執行緒並釋放鎖,執行緒B這時候拿到鎖,然後呼叫`signal`喚醒執行緒A。 毫無疑問,這兩個方法讓執行緒的狀態發生了變化,我們仔細來研究一下, 翻看AQS的原始碼,我們會發現Condition中定義了兩個屬性`firstWaiter`和`lastWaiter`,前面說了,AQS中包含了一個FIFO的CLH等待佇列,每個Conditon物件就包含這樣一個等待佇列,而這兩個屬性分別表示的是等待佇列中的首尾結點, ``` /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; ``` **注意:Condition當中的等待佇列和AQS主體的同步等待佇列是分開的,兩個佇列雖然結構體相同,但是作用域是分開的** #### await 先看`await()`的原始碼: ``` public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 將當前執行緒加入到等待佇列中 Node node = addConditionWaiter(); // 完全釋放佔有的資源,並返回資源數 int savedState = fullyRelease(node); int interruptMode = 0; // 迴圈判斷當前結點是不是在Condition的佇列中,是的話掛起 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } ``` 當一個執行緒呼叫**Condition.await()**方法,將會以當前執行緒構造結點,這個結點的`waitStatus`賦值為**Node.CONDITION**,也就是-2,並將結點從尾部加入等待佇列,然後尾部結點就會指向這個新增的結點, ``` private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } ``` 我們依然用上面的demo來演示,此時,執行緒A獲取鎖並呼叫**Condition.await()**方法後,AQS內部的資料結構會變成這樣: ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000429247-966732196.png) 在Condition佇列中插入對應的結點後,執行緒A會釋放所持有的資源,走到while迴圈那層邏輯, ``` while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } ``` `isOnSyncQueue`方法的會判斷當前的執行緒節點是不是在同步佇列中,這個時候此結點還在Condition佇列中,所以該方法返回false,這樣的話迴圈會一直持續下去,執行緒被掛起,等待被喚醒,此時,執行緒A的流程暫時停止了。 當執行緒A呼叫`await()`方法掛起的時候,執行緒B獲取到了執行緒A釋放的資源,然後執行`signal()`方法: #### signal ``` public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } ``` 先判斷當前執行緒是否為獲取鎖的執行緒,如果不是則直接丟擲異常。 接著呼叫`doSignal()`方法來喚醒執行緒。 ``` private void doSignal(Node first) { // 迴圈,從佇列一直往後找不為空的首結點 do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { // CAS迴圈,將結點的waitStatus改為0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 上面已經分析過,此方法會把當前結點加入到等待佇列中,並返回前驅結點 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } ``` 從`doSignal`的程式碼中可以看出,這時候程式尋找的是Condition等待佇列中首結點firstWaiter的結點,此時該結點指向的是執行緒A的結點,所以之後的流程作用的都是執行緒A的結點。 這裡分析下`transferForSignal`方法,先通過CAS自旋將結點**waitStatus**改為0,然後就把結點放入到同步佇列 (此佇列不是Condition的等待佇列) 中,然後再用CAS將同步佇列中該結點的前驅結點waitStatus改為Node.SIGNAL,也就是-1,此時AQS的資料結構大概如下 (額.....少畫了個箭頭,大家就當head結點是執行緒A結點的前驅結點就好): ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000439924-1026104128.png) 回到`await()`方法,當執行緒A的結點被加入同步佇列中時,`isOnSyncQueue()`會返回true,跳出迴圈, ``` while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); ``` 接著執行`acquireQueued()`方法,這裡就不用多說了吧,嘗試重新獲取鎖,如果獲取鎖失敗繼續會被掛起,直到另外執行緒釋放鎖才被喚醒。 所以,當執行緒B釋放完鎖後,執行緒A被喚醒,繼續嘗試獲取鎖,至此流程結束。 對於這整個通訊過程,我們可以畫一張流程圖展示下: ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000448055-1723143797.png) #### 總結 說完了Condition的使用和底層執行機制,我們再來總結下它跟普通 wait/notify 的比較,一般這也是問的比較多的,Condition大概有以下兩點優勢: - Condition 需要結合 Lock 進行控制,使用的時候要注意一定要對應的unlock(),可以對多個不同條件進行控制,只要new 多個 Condition物件就可以為多個執行緒控制通訊,wait/notify 只能和 synchronized 關鍵字一起使用,並且只能喚醒一個或者全部的等待佇列; - Condition 有類似於 await 的機制,因此不會產生加鎖方式而產生的死鎖出現,同時底層實現的是 park/unpark 的機制,因此也不會產生先喚醒再掛起的死鎖,一句話就是不會產生死鎖,但是 wait/notify 會產生先喚醒再掛起的死鎖。 ## 最後 對AQS的原始碼分析到這裡就全部結束了,雖然還有很多知識點沒講解,比如公平鎖/非公平鎖下AQS是怎麼作用的,篇幅所限,部分知識點沒有擴充套件還請見諒,儘管如此,如果您能看完文章的話,相信對AQS也算是有足夠的瞭解了。 回顧本篇文章,我們不難發現,無論是獨佔還是共享模式,或者結合是Condition工具使用,AQS本質上的同步功能都是通過對鎖和佇列中結點的操作來實現的,從設計上講,AQS的組成結構並不算複雜,底層的運轉機制也不會很繞,所以,大家如果看原始碼的時候覺得有些困難的話也不用灰心,多看幾遍,順便畫個圖之類的,理清下流程還是沒什麼問題的。 當然,自己看得懂是一回事,寫出來讓別人看懂又是另一回事了,就像這篇文章,我花了好長的時間來準備,又是畫圖又是理流程的,期間還參考了不少網上大神的博文,肝了幾天才算是成文了。雖然我知道本文不算什麼高質文,但我也算是費盡心力了,寫技術文真是挺累的,大家看的覺得不錯的話還請幫忙轉發下或點個贊吧!這也是對我最好的鼓勵了 ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000504077-284241851.jpg) ------------------------- 作者:鄙人薛某,一個不拘於技術的網際網路人,技術三流,吹水一流,想看更多精彩文章可以關注我的公眾號哦~~~ ![](https://img2020.cnblogs.com/blog/1478697/202011/1478697-20201117000514932-3221764