【java併發程式設計實戰6】AQS之獨佔鎖ReentrantLock實現
前言
自從JDK1.5後,jdk新增一個併發工具包 java.util.concurrent
,提供了一系列的併發工具類。而今天我們需要學習的是 java.util.concurrent.lock
也就是它下面的lock包,其中有一個最為常見類 ReentrantLock
,
我們知道 ReentrantLock
的功能是實現程式碼段的併發訪問控制,也就是通常意義上所說的鎖。之前我們也學習過一種鎖的實現,也就是 synchronized
關鍵詞, synchronized
是在位元組碼層面,通過物件的監視器鎖實現的。那麼 ReentrantLock
又是怎麼實現的呢?
如果不看原始碼,可能會以為它的實現是通過類似於 synchronized
,通過物件的監視器鎖實現的。但事實上它僅僅是一個工具類!沒有使用更“高階”的機器指令,不是關鍵字,也不依靠JDK編譯時的特殊處理,僅僅作為一個普普通通的類就完成了程式碼塊的併發訪問控制,這就更讓人疑問它怎麼實現的程式碼塊的併發訪問控制的了。
我們檢視原始碼發現,它是通過繼承抽象類實現的 AbstractQueuedSynchronizer
,為了方便描述,接下來我將用AQS代替 AbstractQueuedSynchronizer
。
關於AQS
AQS,它是用來構建鎖或者其他同步組建的基礎框架,我們見過許多同步工具類都是基於它構建的。包括 ReentrantLock、CountDownLatch等
。在深入瞭解AQS瞭解之前,我們需要知道鎖跟AQS的區別。鎖,它是面向使用者的,它定義了使用者與鎖互動的介面,隱藏了實現的細節;而AQS面像的是鎖的實現者,它簡化了鎖的實現。鎖與AQS很好的隔離使用者與實現者所需要關注的領域。那麼我們今天就作為一個鎖的實現者,一步一步分析鎖的實現。
AQS又稱同步器,它的內部有一個int成員變數state表示同步狀態,還有一個內建的FIFO佇列來實現資源獲取執行緒的排隊工作。通過它們我們就能實現鎖。
在實現鎖之前,我們需要考慮做為鎖的使用者,鎖會有哪幾種?
通常來說,鎖分為兩種,一種是獨佔鎖(排它鎖,互斥鎖),另一種就是共享鎖了。根據這兩類,其實AQS也給我們提供了兩套API。而我們作為鎖的實現者,通常都是要麼全部實現它的獨佔api,要麼實現它的共享api,而不會出現一起實現的。即使juc內建的 ReentrantReadWriteLock
也是通過兩個子類分別來實現的。
鎖的實現
獨佔鎖
獨佔鎖又名互斥鎖,同一時間,只有一個執行緒能獲取到鎖,其餘的執行緒都會被阻塞等待。其中我們常用的 ReentrantLock
就是一種獨佔鎖,我們一起來 ReentrantLock
分析 ReentrantLock
的同時看一看AQS的實現,再推理出AQS獨特的設計思路和實現方式。最後,再看其共享控制功能的實現。
首先我們來看看獲取鎖的過程
加鎖
我們檢視 ReentrantLock
的原始碼。來分析它的lock方法
public void lock() { sync.lock(); }
與我們之前分析的一樣,鎖的具體實現由內部的代理類完成,lock只是暴露給鎖的使用者的一套api。使用過ReentrantLock的同學應該知道,ReentrantLock又分為公平鎖和非公平鎖,所以,ReentrantLock內部只有兩個sync的實現。
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync{..} /** * Sync object for fair locks */ static final class FairSync extends Sync{..}
- 公平鎖 :每個執行緒獲取鎖的順序是按照呼叫lock方法的先後順序來的。
- 非公平鎖:每個執行緒獲取鎖的順序是不會按照呼叫lock方法的先後順序來的。完全看運氣。
所以我們完全可以猜測到,這個公平與不公平的區別就體現在鎖的獲取過程。我們以公平鎖為例,來分析獲取鎖過程,最後對比非公平鎖的過程,尋找差異。
lock
檢視FairSync的lock方法
final void lock() { acquire(1); }
這裡它呼叫到了父類AQS的acquire方法,所以我們繼續檢視acquire方法的程式碼
acquire
/** * Acquires in exclusive mode, ignoring interrupts.Implemented * by invoking at least once {@link #tryAcquire}, * returning on success.Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success.This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument.This value is conveyed to *{@link #tryAcquire} but is otherwise uninterpreted and *can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
檢視方法方法的註釋我們可以知道這個方法的作用,這裡我簡單的翻譯一下.
Acquires方法是一個獨佔鎖模式的方法,它是不會響應中斷的。它至少執行一次tryAcquire去獲取鎖,如果返回true,則代表獲取鎖成功,否則它將會被加入等待佇列阻塞,直到重新嘗試獲取鎖成功。所以我們需要看看嘗試獲取鎖的方法tryAcquire的實現
tryAcruire
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
丟擲一個異常,沒有實現。所以我們需要檢視它的子類,在我們這裡就是FairSync的實現。
這裡也會大家會有疑惑,沒有實現為什麼不寫成抽象方法呢,前面我們提到過,我們不會同時在一個類中實現獨佔鎖跟共享鎖的api,那麼tryAcruire是屬於獨佔鎖,那麼如果我想一個共享鎖也要重新獨佔鎖的方法嗎?所以大師的設計是絕對沒有問題的。
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread();//獲取當前執行緒 int c = getState();//獲取父類AQS中的標誌位 if (c == 0) { if (!hasQueuedPredecessors() && //如果佇列中沒有其他執行緒說明沒有執行緒正在佔有鎖! compareAndSetState(0, acquires)) { //修改一下狀態位,注意:這裡的acquires是在lock的時候傳遞來的,從上面的圖中可以知道,這個值是寫死的1 setExclusiveOwnerThread(current); //如果通過CAS操作將狀態為更新成功則代表當前執行緒獲取鎖,因此,將當前執行緒設定到AQS的一個變數中,說明這個執行緒拿走了鎖。 return true; } } else if (current == getExclusiveOwnerThread()) { //如果不為0 意味著,鎖已經被拿走了,但是,因為ReentrantLock是重入鎖, //是可以重複lock,unlock的,只要成對出現行。一次。這裡還要再判斷一次 獲取鎖的執行緒是不是當前請求鎖的執行緒。 int nextc = c + acquires;//如果是的,累加在state欄位上就可以了。 if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
目前為止,如果獲取鎖成功,則返回true,獲取鎖的過程結束,如果獲取失敗,則返回false
按照之前的邏輯,如果執行緒獲取鎖失敗,則會被放入到佇列中,但是在放入之前,需要給執行緒包裝一下。
那麼這個addWaiter就是包裝執行緒並且放入到佇列的過程實現的方法。
addWaiter
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
註釋: 把當前執行緒作為一個節點新增到佇列中,並且為這個節點設定模式
模式: 也就是獨佔模式/共享模式,在這裡模式是形參,所以我們看看起調方
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
Node.EXCLUSIVE 就代表這是獨佔鎖模式。
建立好節點後,將節點加入到佇列尾部,此處,在佇列不為空的時候,先嚐試通過cas方式修改尾節點為最新的節點,如果修改失敗,意味著有併發,這個時候才會進入enq中死迴圈,“自旋”方式修改。
將執行緒的節點接入到隊裡中後,當然還需要做一件事:將當前執行緒掛起!這個事,由acquireQueued來做。
在解釋acquireQueued之前,我們需要先看下AQS中佇列的記憶體結構,我們知道,佇列由Node型別的節點組成,其中至少有兩個變數,一個封裝執行緒,一個封裝節點型別。
而實際上,它的記憶體結構是這樣的(第一次節點插入時,第一個節點是一個空節點,代表有一個執行緒已經獲取鎖,事實上,佇列的第一個節點就是代表持有鎖的節點):

0730009.png
黃色節點為佇列預設的頭節點,每次有執行緒競爭失敗,進入佇列後其實都是插入到佇列的尾節點(tail後面)後面。這個從enq方法可以看出來,上文中有提到enq方法為將節點插入佇列的方法:
enq
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 一個空的節點,通常代表獲取鎖的執行緒 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued
接著我們來看看當節點被放入到佇列中,如何將執行緒掛起,也就是看看acquireQueued方法的實現。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 獲取當前節點前驅結點 final Node p = node.predecessor(); // 如果前驅節點是head,那麼它就是等待佇列中的第一個執行緒 // 因為我們知道head就是獲取執行緒的節點,那麼它就有機會再次獲取鎖 if (p == head && tryAcquire(arg)) { //成功後,將上圖中的黃色節點移除,Node1變成頭節點。 也證實了head就是獲取鎖的執行緒的節點。 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 1、檢查前一個節點的狀態,判斷是否要掛起 // 2、如果需要掛起,則通過JUC下的LockSopport類的靜態方法park掛起當前執行緒,直到被喚醒。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 如果發生異常 if (failed) // 取消請求,也就是將當前節點重佇列中移除。 cancelAcquire(node); } }
這裡我還需要解釋的是:
1、Node節點除了儲存當前執行緒之外,節點型別,前驅後驅指標之後,還儲存一個叫waitStatus的變數,該變數用於描述節點的狀態。共有四種狀態。
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED =1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL= -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
分別表示:
- 1 = 取消狀態,該節點將會被佇列移除。
- -1 = 等待狀態,後驅節點處於等待狀態。
- -2 = 等待被通知,該節點將會阻塞至被該鎖的condition的await方法喚醒。
- -3 = 共享傳播狀態,代表該節點的狀態會向後傳播。
到此為止,一個執行緒對於鎖的一次競爭才告於段落,結果有兩種,要麼成功獲取到鎖(不用進入到AQS佇列中),要麼,獲取失敗,被掛起,等待下次喚醒後繼續迴圈嘗試獲取鎖,值得注意的是,AQS的佇列為FIFO佇列,所以,每次被CPU假喚醒,且當前執行緒不是出在頭節點的位置,也是會被掛起的。AQS通過這樣的方式,實現了競爭的排隊策略。
釋放鎖
看完了加鎖,再看釋放鎖。我們先不看程式碼也可以猜測到釋放鎖需要的步驟。
- 佇列的頭節點是當前獲取鎖的執行緒,所以我們需要移除頭節點
- 釋放鎖,喚醒頭節點後驅節點來競爭鎖
接下來我們檢視原始碼來驗證我們的猜想是否在正確。
unlock
public void unlock() { sync.release(1); }
unlock方法呼叫AQS的release方法,因為我們的acquire的時候傳入的是1,也就是同步狀態量+1,那麼對應的解鎖就要-1。
release
public final boolean release(int arg) { // 嘗試釋放鎖 if (tryRelease(arg)) { // 釋放鎖成功,獲取當前佇列的頭節點 Node h = head; if (h != null && h.waitStatus != 0) // 喚醒當前節點的下一個節點 unparkSuccessor(h); return true; } return false; }
tryRelease
同樣的它是交給子類實現的
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 當前執行緒不是獲取鎖的執行緒 丟擲異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 因為是重入的關係,不是每次釋放鎖c都等於0,直到最後一次釋放鎖時,才通知AQS不需要再記錄哪個執行緒正在獲取鎖。 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
unparkSuccessor
釋放鎖成功之後,就喚醒頭節點後驅節點來競爭鎖
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
值得注意的是,尋找的順序是從佇列尾部開始往前去找的最前面的一個waitStatus小於0的節點。因為大於0 就是1狀態的節點是取消狀態。
公平鎖與非公平鎖
到此我們鎖獲取跟鎖的釋放已經分析的差不多。那麼公平鎖跟非公平鎖的區別在於加鎖的過程。對比程式碼
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } }
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock.Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } }
從程式碼中也可以看出來,非公平在公平鎖的加鎖的邏輯之前先直接cas修改一次state變數(嘗試獲取鎖),成功就返回,不成功再排隊,從而達到不排隊直接搶佔的目的。
最後歡迎大家關注一下我的個人公眾號。一起交流一起學習,有問必答。

公眾號