1. 程式人生 > >深入理解Java併發框架AQS系列(三):獨佔鎖(Exclusive Lock)

深入理解Java併發框架AQS系列(三):獨佔鎖(Exclusive Lock)

[深入理解Java併發框架AQS系列(一):執行緒](https://www.cnblogs.com/xijiu/p/14396061.html) [深入理解Java併發框架AQS系列(二):AQS框架簡介及鎖概念](https://www.cnblogs.com/xijiu/p/14522224.html) [深入理解Java併發框架AQS系列(三):獨佔鎖(Exclusive Lock)](https://www.cnblogs.com/xijiu/p/14579262.html) # 一、前言 > 優秀的原始碼就在那裡 經過了前面兩章的鋪墊,終於要切入正題了,本章也是整個AQS的核心之一 從本章開始,我們要精讀AQS原始碼,在欣賞它的同時也要學會質疑它。當然本文不會帶著大家逐行過原始碼(會有“只在此山中,雲深不知處”的弊端),而是從功能入手,對其架構進行逐層剖析,在核心位置重點解讀,並提出質疑;雖然AQS原始碼讀起來比較“跳”,但我還是建議大家花時間及精力去好好讀它 本章我們採用經典併發類`ReentrantLock`來闡述獨佔鎖 # 二、整體回顧 獨佔鎖,顧名思義,即在同一時刻,僅允許一個執行緒執行同步塊程式碼。好比一夥兒人想要過河,但只有一根獨木橋,且只能承受一人的重量 ![](https://img2020.cnblogs.com/blog/2109301/202103/2109301-20210325204632411-29050628.png) 相信我們平時寫獨佔鎖的程式大抵是這樣的: ``` ReentrantLock lock = new ReentrantLock(); try { lock.lock(); doBusiness(); } finally { lock.unlock(); } ``` 上述程式碼分為三部分: * 加鎖 `lock.lock()` * 執行同步程式碼 `doBusiness()` * 解鎖 `lock.unlock()` 加鎖部分,一定是眾矢之的,兵家爭搶的要地,對於高併發的程式來說,同一時刻,大量的執行緒爭相湧入,而`lock()`則保證只能有一個執行緒進入`doBusiness()`邏輯,且在其執行完畢`unlock()`方法之前,不能有其他執行緒進入。所以相對而言,`unlock()`方法相對輕鬆,不用處理多執行緒的場景 ## 2.1、`waitStatus` 本章中,我們引入節點中一個關鍵的欄位`waitStatus`(後文簡寫為`ws`),在獨佔鎖模式中,可能會使用到的等待狀態如下: * 1、`0`
* 初始狀態,當一個節點新建時,其預設`ws`為0 * 2、`SIGNAL (-1)` * 如果某個節點的狀態為`SIGNAL`,即表明其後續節點處於(或即將處於)阻塞狀態。所以當前節點在執行完同步程式碼或被取消後,一定要記得喚醒其後續節點 * 3、`CANCELLED (1)` * 顧名思義,即取消操作的含義。當一個節點等待超時、或者被打斷、或者執行`tryAcquire`發生異常,都會導致當前節點取消。而當節點一旦取消,便永遠不會再變為`0`或者`SIGNAL`狀態了 # 三、加鎖(核心) 我們先上一張`ReentrantLock`加鎖功能(非公平)的整體流程圖,在併發或關鍵部分有註釋 ![](https://img2020.cnblogs.com/blog/2109301/202103/2109301-20210325204703110-854718401.png) 第一眼看上去,確實有點複雜,不過不用怕,我們逐一分析解讀後,它其實就是隻紙老虎 大體上可以分為三大部分 * a、加入阻塞佇列 * b、阻塞佇列排程 * c、異常處理 按照正常的理解,可能只會有a、b兩部分就夠了,為什麼會有c呢?什麼時候會發生異常? ## 3.1、加入阻塞佇列 當一個執行緒嘗試加鎖失敗後,便會放入阻塞佇列的隊尾;這節我們來討論一下這個動作的細節 在加入阻塞佇列之前,首先會檢視頭節點是否為null,如果是null的話,需要新建`ws`為0的頭結點,(為什麼在AQS初始化的時候,不直接新建頭結點呢?其實由此可見作者細節處理的嚴謹,因為如果當我們的獨佔鎖併發度不大,在嘗試加鎖的過程中,總能獲取到鎖,這時便不會向阻塞佇列新增內容,假如初始化便新建頭結點,會導致其白白佔用記憶體空間而得不到有效利用)然後將當前節點新增至阻塞佇列的尾部,當然頭結點初始化、向尾部節點追加新節點都是通過CAS操作的。而阻塞佇列呢,正如我們前文提及的是一個FIFO的佇列,且帶有`next`、`prev`兩個引用來標記前、後節點;我們在阻塞佇列中加入第一個節點後,阻塞佇列的樣子: ![](https://img2020.cnblogs.com/blog/2109301/202103/2109301-20210325204735010-1042358658.png) ## 3.2、阻塞佇列排程 這一節屬於獨佔鎖很核心的部分,裡面涉及`ws`更改、執行緒掛起與喚醒、更換頭結點等 我們接著3.1繼續,在節點進入排程後,首先檢查下當前節點的前節點是否為`head`節點,如果是的話,那麼有一次嘗試加鎖的機會,加鎖成功或失敗將導致2個分支 我們首先看加鎖加鎖成功的情況,一旦加鎖成功,當前節點便從阻塞佇列中“消失”(其實是當前節點變為了頭結點,而原頭結點記憶體不可達,等待垃圾回收),當所有節點都加鎖成功,阻塞佇列便為空了,但並不代表阻塞佇列的長度為0,因為有頭結點的存在,所以空阻塞佇列的長度是1 ![](https://img2020.cnblogs.com/blog/2109301/202103/2109301-20210325204756683-429471471.png) 而加鎖失敗或者當前節點的前節點不是`head`節點呢?是馬上將執行緒掛起嗎?答案是不確定的,要看前節點的`ws`狀態而定。而此步驟還有個隱藏任務:將當前節點之前的所有已取消節點從阻塞佇列中剔除。 ![](https://img2020.cnblogs.com/blog/2109301/202103/2109301-20210325204819274-1649271927.png) 從上圖中我們看到,一個節點如果想正常進入掛起狀態,那麼一定要將前節點的`ws`改為`SIGNAL (-1)`狀態,但如果前節點已經變為`CANCELLED (1)`狀態後,就要遞歸向前尋找第一個非`CANCELLED`的節點。 針對“執行緒掛起並等待其他執行緒喚醒”,我們提出2個問題 問題1
* 如果是普通節點,直接掛在隊尾,且將其執行緒掛起,這個沒啥問題;但如果是頭節點被喚醒,嘗試加鎖卻失敗了,又被再次掛起,會不會導致頭結點永遠處於掛起狀態? * 答:不會,因為頭結點之所以搶鎖失敗,一定是因為另外一個A執行緒搶鎖成功。雖然頭節點暫時處於掛起狀態,但當A執行緒執行完加鎖程式碼後,還會再次喚醒頭結點 問題2 * 假定當前節點判定需要被掛起,在執行掛起操作前,擁有鎖的執行緒執行完畢,並喚醒了當前執行緒,而當前執行緒又馬上要進行掛起操作,豈不是會導致無法成功將當前節點喚醒,從而永遠hang死? * 答:能考慮到這個問題,說明你已經帶著分身去思考問題了,不錯。不過此處是不會存在這個問題的,因為執行緒掛起、喚醒使用的api為`park/unpark`,即便是unpark發生在park之前,在執行park操作時,也會成功喚醒。這個特質區別於`wait/notify` 而針對阻塞佇列的排程,還有一些沒有解釋的問題: * a、為什麼阻塞佇列內有這麼多`CANCELLED`狀態的節點? * b、當前節點在掛起前,前節點為`SIGNAL`狀態,但經過一段時間執行,前節點變為了`CANCELLED`狀態,豈不是導致當前節點永遠無法被喚醒? 要回答這兩個問題,就要引出異常處理了 ## 3.3、異常處理 我們首先討論如果AQS不做異常處理可以嗎? 不可以,例如第一個節點被喚醒後,在加鎖階段發生了異常,如果沒有異常處理,這個異常節點將永遠處於阻塞佇列,成為“殭屍節點”,且後續節點也不會被喚起 官方標明可能會出現異常的部分,諸如“等待超時”、“打斷”等,那如果我們呼叫`acquire()`方法,而非`acquireInterruptibly()`、`tryAcquireNanos(time)`是不是就不會出現異常?不是的,因為還有AQS下放給我們自己實現的`tryRelease()`等方法。我們實現一個自己的AQS,並模擬`tryRelease()`報錯,看AQS能否正常應對 ``` public class FindBugAQS { public volatile static int FLAG = 0; private static ThreadLocal FLAG_STORE = new ThreadLocal<>(); private static ThreadLocal TIMES = ThreadLocal.withInitial(() -> 0); private Sync sync = new Sync(); private static class Sync extends AbstractQueuedSynchronizer { private Sync() { setState(1); } public void lock() { FLAG_STORE.set(++FLAG); int state = getState(); if (state == 1 && compareAndSetState(state, 0)) { return; } acquire(1); } @Override protected boolean tryAcquire(int acquires) { if (FLAG_STORE.get() == 2) { Integer time = TIMES.get(); if (time == 0) { TIMES.set(1); } else { // 模擬發生異常,第二個節點在第二次訪問tryAcquire方法時,將會扔出執行期異常 System.out.println("發生異常"); throw new RuntimeException("lkn aqs bug"); } } int state = getState(); if (state == 1 && compareAndSetState(state, 0)) { return true; } return false; } @Override protected final boolean tryRelease(int releases) { setState(1); return true; } public void unlock() { release(1); } } public void lock() { sync.lock(); } public void unlock() { sync.unlock(); } } // 測試用例如下: public class BugTest { private static volatile int number = 0; @Test public void test2() throws InterruptedException { List list = Lists.newArrayList(); FindBugAQS aqs = new FindBugAQS(); Thread thread1 = new Thread(() -> { aqs.lock(); PubTools.sleep(5000); number++; aqs.unlock(); }); thread1.start(); list.add(thread1); PubTools.sleep(500); for (int i = 0; i < 4; i++) { Thread thread2 = new Thread(() -> { aqs.lock(); PubTools.sleep(500); number++; aqs.unlock(); }); thread2.start(); list.add(thread2); } for (Thread thread : list) { thread.join(); } System.out.println("number is " + number); } } ``` 執行結果: ``` 發生異常 Exception in thread "Thread-1" java.lang.RuntimeException: lkn aqs bug at org.xijiu.share.aqs.bug.FindBugAQS$Sync.tryAcquire(FindBugAQS.java:42) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:863) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at org.xijiu.share.aqs.bug.FindBugAQS$Sync.lock(FindBugAQS.java:31) at org.xijiu.share.aqs.bug.FindBugAQS.lock(FindBugAQS.java:64) at org.xijiu.share.aqs.bug.BugTest.lambda$test2$2(BugTest.java:61) at java.lang.Thread.run(Thread.java:748) number is 4 ``` 我們自定義了AQS實現類`FindBugAQS.java`,模擬第二個節點在第二次訪問`tryAcquire`會扔出異常;然後啟動5個執行緒,對`number`進行累加。可見,最後的結果符合預期,AQS處理的很完美。那程式發生異常後,阻塞佇列究竟如何應對? 舉例說明吧,假定現在除去頭結點外,阻塞佇列中還有3個節點,當第1個節點被喚醒執行時,發生了異常,那麼第1個節點會將`ws`置為`CANCELLED`,且將向後的鏈條打斷(指向自己),但向前鏈條保持不變,並喚醒下一個節點 ![](https://img2020.cnblogs.com/blog/2109301/202103/2109301-20210325205004061-1851650904.png) 由上圖可見,當某個節點響應中斷/發生異常後,其會主動打斷向後鏈條,但依舊保留向前的鏈條,這樣做的目的是為了後續節點在尋找前節點時,可以找到標記為`CANCELLED`狀態的節點,而不是找到`null`。至此便解答了3.2提出的兩個問題 a、為什麼阻塞佇列內有這麼多`CANCELLED`狀態的節點?
* 當被排程執行的節點發生了異常,狀態便會更改為`CANCELLED`狀態,但仍存在於阻塞佇列中,直到正常執行的節點將其剔除 b、當前節點在掛起前,前節點為`SIGNAL`狀態,但經過一段時間執行,前節點變為了`CANCELLED`狀態,豈不是導致當前節點永遠無法被喚醒? * 不會,節點發生異常後,會主動喚起後續節點,而後續節點負責將前節點從阻塞佇列中刪除 # 四、解鎖 本來想針對“解鎖邏輯”畫一張流程圖,但猛然發現解鎖部分僅僅10行左右的程式碼,那就索性把原始碼貼上,逐一論述下 * AQS解鎖原始碼 ``` public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } ``` * `ReentrantLock`解鎖原始碼 ``` protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } ``` 我們發現當`tryRelease()`方法返回`true`時,AQS便會負責喚醒後續節點,因為`ReentrantLock`支援了可重入的特性,所以當前執行緒的每次加鎖都會對`state`累加,而每次`tryRelease()`方法則會對`state`累減,直到`state`變為初始狀態0時,`tryRelease()`方法才會返回`true`,即喚醒下一個節點 解鎖邏輯相對簡潔,且不存在併發,本文不再贅述 # 五、後記 再次強調本文是通過`ReentrantLock`的視角來分析獨佔鎖,且主要分析的是`ReentrantLock.lock()/unlock()`方法,目的是讓大家對AQS整體的資料結構有個全面認識,方便後續在實現自己的併發框架時,明白api背後發生的事情,做到遊刃有餘 而像`ReentrantLock`的`lockInterruptibly()`、`tryLock(TimeUnit)`或者其他獨佔鎖的實現類,讀者可自行閱讀原始碼,原理類似,核心程式碼也是