Java併發-locks包原始碼剖析2-ReentrantLock鎖機制
上一篇文章對ReentrantLock鎖進行了概述,相信看完了的話應該對ReentrantLock鎖的使用有了一定的瞭解,這篇文章分析下ReentrantLock鎖的實現機制。
首先需要了解ReentrantLock類裡面有三個靜態類: Sync
、 NonfairSync
和 FairSync
,ReentrantLock的鎖內部實現通過NonfairSync和FairSync實現,而NonfairSync和FairSync都是Sync的子類。類圖如下所示:

AbstractQueuedSynchronizer
,類如其名,抽象的佇列式的同步器,通常被稱之為AQS類,它是一個非常有用的父類,可用來定義鎖以及依賴於排隊
park
執行緒(這裡的park你可能不明白什麼意思,但是你肯定知道阻塞的意思,其實park類似阻塞,很多文章直接就說是阻塞,但是我感覺兩者還是不一樣的,park可以看做是一種輕量級的阻塞,至少我是這樣理解的,歡迎指正,unpark類似喚醒執行緒但是比喚醒執行緒高效)的其他同步器。AbstractQueuedLongSynchronizer 類提供相同的功能但擴充套件了對同步狀態的 64 位的支援。兩者都擴充套件了類 AbstractOwnableSynchronizer(一個幫助記錄當前保持獨佔同步的執行緒的簡單類)。AQS框架是整個JUC鎖的核心部分,提供了以下內容與功能:
- Node 節點, Sync Queue和Condition Queue的存放的元素,用來儲存park執行緒, 這些節點主要的區分在於 waitStatus 的值
- Condition Queue, 這個佇列是用於獨佔模式中, 只有用到 Condition.awaitXX 時才會將 node加到 tail 上(PS: 在使用 Condition的前提是已經獲取 Lock)
- Sync Queue, 獨佔和共享的模式中均會使用到的存放 Node 的 CLH queue(主要特點是, 佇列中總有一個 dummy 節點, 後繼節點獲取鎖的條件由前繼節點決定, 前繼節點在釋放 lock 時會喚醒sleep中的後繼節點),CLH佇列的CLH全稱是Craig, Landin, and Hagersten
- ConditionObject, 用於獨佔的模式, 主要是執行緒釋放lock, 加入 Condition Queue, 並進行相應的 signal 操作
- 獨佔的獲取lock (acquire, release), 例如 ReentrantLock 就是使用這種,
- 共享的獲取lock (acquireShared, releaseShared), 例如 ReentrantReadWriteLock、Semaphore、CountDownLatch
所以,理解了AbstractQueuedSynchronizer 機制就理解了ReentrantLock鎖機制。
AbstractQueuedSynchronizer
繼承自 AbstractOwnableSynchronizer
:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { /**所有的變數的都是transient的,為啥還要提供序列化ID呢?*/ private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } // The current owner of exclusive mode synchronization. private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
AbstractOwnableSynchronizer
類名能夠很清晰的表達它的意思:獨有執行緒同步器,提供了執行緒獨佔的兩個方法 setExclusiveOwnerThread(Thread thread)
和 getExclusiveOwnerThread()
內部類 Node
AQS有一個很重要的內部類Node,這個類會裝載執行緒,看下它的原始碼:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED =1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL= -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() {//Returns true if node is waiting in shared mode. return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException {// Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {// Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) {// Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
可以看到,Node類裡首先定義了6個static final的變數,用來區分是共享鎖還是獨佔鎖以及等待執行緒的五種狀態(1、-1、-2、-3再額外加一個0的狀態),接著定義了四個volatile變數和一個Node型別的普通變數nextWaiter: Node。我們需要著重分析下這四個volatile變數和nextWaiter變數。
- volatile int waitStatus:
- CANCELLED值為1表示當前節點裝載的執行緒因超時或者中斷而取消,Nodes never leave this state. In particular, a thread with cancelled node never again blocks.
- SIGNAL值為-1表示後繼節點被park了,如果當前節點釋放鎖了或者裝載的執行緒取消了,那麼需要unpark自己的後繼節點,To avoid races, acquire methods must first indicate they need a signal, then retry the atomic acquire, and then, on failure, block.
- CONDITION值為-2表示在條件佇列(condition queue)中,It will not be used as a sync queue node until transferred, at which time the status will be set to 0. (Use of this value here has nothing to do with the other uses of the field, but simplifies mechanics.)
- PROPAGATE值為-3表示後續的acquireShared操作能夠得以執行,A releaseShared should be propagated to other nodes. This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.
- 值為0表示當前節點在同步佇列(async queue)中等待著獲取鎖
- volatile Node prev:前驅節點
- volatile Node next:後繼節點
- volatile Thread thread:當前節點裝載的執行緒
- Node nextWaiter:這個後繼節點和後繼節點next是有區別的,nextWaiter後繼節點是指在條件佇列(condition queue)中的後繼節點或者是共享節點,因為條件鎖必須是獨佔模式,在共享模式中我們也可以使用這個成員變量表示後繼節點節點,這樣我們就節省了一個成員變數。注意只有nextWaiter而並沒有定義preWaiter。Link to next node waiting on condition, or the special value SHARED. Because condition queues are accessed only when holding in exclusive mode, we just need a simple linked queue to hold nodes while they are waiting on conditions. They are then transferred to the queue to re-acquire. And because conditions can only be exclusive, we save a field by using special value to indicate shared mode.
從上面我們可以看出,同步佇列async queue是一個雙向佇列,而條件佇列condition queue是一個單向佇列只有nextWaiter而沒有preWaiter。
關於 waitStatus
這個變數,原始碼中的註釋寫的非常清楚,我不翻譯了,大家細心看下:
The values are arranged numerically to simplify use. Non-negative values mean that a node doesn't need to signal. So, most code doesn't need to check for particular values, just for sign. The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes. It is modified using CAS (or when possible, unconditional volatile writes).
內部類 ConditionObject
這個類是用來維護條件佇列(condition queue)的,這個類和LinkedList很像,就是一個單向的列表,裡面維護了一個頭結點firstWaiter和尾節點lastWaiter以及兩個十分重要的方法 await()
和 signal()
。先來看下這個類的一個很核心的 await
方法:
public final void await() throws InterruptedException { //加入條件等待佇列前需要進行中斷判斷 if (Thread.interrupted()) throw new InterruptedException(); //加入條件等待佇列 Node node = addConditionWaiter(); //釋放鎖 int savedState = fullyRelease(node); //中斷標記位 int interruptMode = 0; /*死迴圈阻塞,直到被通知或者被中斷: 1) 當被通知喚醒時還得判斷一下當前節點是否已經轉移到AQS同步隊列當中(其實主動通知的執行緒會確保其後繼等待節點轉移到同步佇列中,所以被通知後在下一次迴圈條件為false,繼續後續流程); 2) 當被中斷喚醒時需要確保節點被轉移到同步佇列中,然後根據中斷髮生在被通知前後位置設定中斷模式,並跳出迴圈 關於中斷模式: 1) 當在被通知前被中斷則將中斷模式設定為THROW_IE; 2) 當在被通知後則將中斷模式設定為REINTERRUPT(因為acquireQueued不會響應中斷)。 */ while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } /*死迴圈獲取同步狀態,並在同步狀態獲取成功或者取消獲取時設定中斷模式: 如果在被通知之後獲取鎖過程中發生中斷則將中斷模式設定為REINTERRUPT。 */ if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //重新連結CONDITION節點。 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //根據中斷模式丟擲異常。 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
注意:被中斷的執行緒跳出while迴圈後,會呼叫acquireQueued方法自旋獲取鎖,嘗試獲取同步狀態,而不是立即響應中斷丟擲中斷異常。在最後根據中斷模式來決定是否丟擲異常。
再來看下 signal()
方法:
//通知頭結點到同步佇列中去競爭鎖,用於獨佔模式 public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /* 從頭結點向後遍歷直到遇到一個非canceled或者null的節點 並將其移除條件等待佇列,並新增到同步佇列的尾部 */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } //當頭結點轉移失敗後,繼續重試並確保更新後的頭結點不為null while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //如果CAS失敗,說明節點在signal之前被cancel了,返回false //CAS嘗試將節點的waitStatus從CONDITION-2修改為0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //進入同步佇列,返回的p是這個節點在同步佇列中的前驅節點 Node p = enq(node); //獲取前驅節點的狀態 int ws = p.waitStatus; //1 如果前驅節點取消則直接喚醒當前節點執行緒 //2 或者前驅節點沒取消的話,將前驅節點狀態設定為SIGNAL(-1),保證能被前驅節點通知到,如果設定失敗,直接喚醒當前節點的執行緒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); }
可以看到,更新waitStatus的操作是依賴Unsafe的CAS機制進行的。
總結:
await signal
AQS的同步狀態
AQS的int型成員變數 state
表示同步狀態,它是用volatile修飾:
private volatile int state;
在互斥鎖中它表示著執行緒是否已經獲取了鎖,0未獲取,1已經獲取了,大於1表示重入數。同時AQS提供了getState()、setState()、compareAndSetState()方法來獲取和修改該值:
可重入鎖指的是在一個執行緒中可以多次獲取同一把鎖,比如:一個執行緒在執行一個帶鎖的方法,該方法中又呼叫了另一個需要相同鎖的方法,則該執行緒可以直接執行呼叫的方法,而無需重新獲得鎖。synchronized鎖可以看做重入鎖。但要注意,獲取多少次鎖就要釋放多麼次,這樣才能保證state是能回到零態的。
以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A執行緒lock()時,會呼叫tryAcquire()獨佔該鎖並將state+1。此後,其他執行緒再tryAcquire()時就會失敗,直到A執行緒unlock()到state=0(即釋放鎖)為止,其它執行緒才有機會獲取該鎖。當然,釋放鎖之前,A執行緒自己是可以重複獲取此鎖的(state會累加)。
所以可重入數大於1表示該執行緒可能呼叫了多個需要當前鎖的方法,或同一個執行緒呼叫了多次lock()方法。
再以CountDownLatch以例,任務分為N個子執行緒去執行,state也初始化為N(注意N要與執行緒個數一致)。這N個子執行緒是並行執行的,每個子執行緒執行完後countDown()一次,state會CAS減1。等到所有子執行緒都執行完後(即state=0),會unpark()主呼叫執行緒,然後主呼叫執行緒就會從await()函式返回,繼續後餘動作。
protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
同步佇列async queue(CLH)
CLH是一個FIFO執行緒等待佇列,多執行緒爭用資源被阻塞時會進入此佇列,它的頭節點時虛擬節點。

CLH同步佇列(圖片來自引用4)
獨佔鎖acquire/release方法解析
對於獲取鎖的流程,這張圖比直接貼程式碼更加直觀,希望結合原始碼仔細看下,圖片來自文獻3:

acquire獲取鎖流程
tryAcquire()
對於鎖的釋放,原始碼如下:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
它呼叫tryRelease()來釋放資源,這個tryRelease()方法一般都由子類實現。
在release()中“當前執行緒”釋放鎖成功的話,會喚醒當前執行緒的後繼執行緒。根據CLH佇列的FIFO規則,“當前執行緒”(即已經獲取鎖的執行緒)肯定是head;如果CLH佇列非空的話,則喚醒鎖的下一個等待執行緒。看下unparkSuccessor()的原始碼,它在AQS中實現:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling.It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node.But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
用unpark()喚醒等待佇列中最前邊的那個未放棄執行緒,這裡我們也用s來表示吧。此時,再和acquireQueued()聯絡起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關係,它會再進入shouldParkAfterFailedAcquire()尋找一個離頭節點最近的一個等待位置。這裡既然s已經是等待佇列中最前邊的那個未放棄執行緒了,那麼通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次迴圈判斷p==head就成立了),然後s把自己設定成head標杆結點,表示自己已經獲取到資源了,acquire()也返回了。
共享鎖acquireShared/releaseShared方法解析
acquireShared(int)
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared()方法依然需要自定義同步器去實現,但是AQS已經把其返回值的語義定義好了:負值代表獲取失敗;0代表獲取成功,但沒有剩餘資源;正數表示獲取成功,還有剩餘資源,其他執行緒還可以去獲取。所以這裡acquireShared()的流程就是:
- tryAcquireShared()嘗試獲取資源,成功則直接返回;
- 失敗則通過doAcquireShared()進入等待佇列,直到獲取到資源為止才返回。
看下doAcquireShared原始碼:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//加入佇列尾部 boolean failed = true;//是否成功標誌 try { boolean interrupted = false;//等待過程中是否被中斷過的標誌 for (;;) { final Node p = node.predecessor();//前驅 if (p == head) {//如果到head的下一個,因為head是拿到資源的執行緒,此時node被喚醒,很可能是head用完資源來喚醒自己的 int r = tryAcquireShared(arg);//嘗試獲取資源 if (r >= 0) {//成功 setHeadAndPropagate(node, r);//將head指向自己,還有剩餘資源可以再喚醒之後的執行緒 p.next = null; // help GC if (interrupted)//如果等待過程中被打斷過,此時將中斷補上。 selfInterrupt(); failed = false; return; } } //判斷狀態,尋找安全點,進入waiting狀態,等著被unpark()或interrupt() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
重點分析一下獲取鎖後的操作:setHeadAndPropagate:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: *Propagation was indicated by caller, *or was recorded (as h.waitStatus either before *or after setHead) by a previous operation *(note: this uses sign-check of waitStatus because *PROPAGATE status may transition to SIGNAL.) * and *The next node is waiting in shared mode, *or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ /** *如果讀鎖(共享鎖)獲取成功,或頭部節點為空,或頭節點取消,或剛獲取讀鎖的執行緒的下一個節點為空, *或在節點的下個節點也在申請讀鎖,則在CLH佇列中傳播下去喚醒執行緒,怎麼理解這個傳播呢, *就是隻要獲取成功到讀鎖,那就要傳播到下一個節點(如果一下個節點繼續是讀鎖的申請, *只要成功獲取,就再下一個節點,直到佇列尾部或為寫鎖的申請,停止傳播)。具體請看doReleaseShared方法。 * */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
其實跟acquire()的流程大同小異,只不過多了個自己拿到資源後,還會去喚醒後繼隊友的操作,這體現了共享鎖的定義。
releaseShared()
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
此方法的流程也比較簡單,一句話:釋放掉資源後,喚醒後繼。跟獨佔模式下的release()相似,但有一點稍微需要注意:獨佔模式下的tryRelease()在完全釋放掉資源(state=0)後,才會返回true去喚醒其他執行緒,這主要是基於可重入的考量;而共享模式下的releaseShared()則沒有這種要求,一是共享的實質--多執行緒可併發執行;二是共享模式基本也不會重入吧(至少我還沒見過),所以自定義同步器可以根據需要決定返回值。
doReleaseShared()原始碼:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) {// 從佇列的頭部開始遍歷每一個節點 int ws = h.waitStatus; // 如果節點狀態為 Node.SIGNAL,將狀態設定為0,設定成功,喚醒執行緒。 // 為什麼會設定不成功,可能改節點被取消;還有一種情況就是有多個執行緒在執行該程式碼段,這就是PROPAGATE的含義吧。 // 如果一個節點的狀態設定為 Node.SIGNAL,則說明它有後繼節點,並且處於阻塞狀態 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue;// loop to recheck cases unparkSuccessor(h); } // 如果狀態為0,則設定為Node.PROPAGATE,設定為傳播,該值然後會在什麼時候變化呢? // 在判斷該節點的下一個節點是否需要阻塞時,會判斷,如果狀態不是Node.SIGNAL或取消狀態, // 為了保險起見,會將前置節點狀態設定為Node.SIGNAL,然後再次判斷,是否需要阻塞。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;// loop on failed CAS } /** * 如果處理過一次 unparkSuccessor 方法後,頭節點沒有發生變化,就退出該方法,那head在什麼時候會改變呢? * 當然在是搶佔鎖成功的時候,head節點代表獲取鎖的節點。一旦獲取鎖成功,則又會進入setHeadAndPropagate方法, * 當然又會觸發doReleaseShared方法,傳播特性應該就是表現在這裡吧。再想一下,同一時間,可以有多個多執行緒佔有鎖, * 那在鎖釋放時,寫鎖的釋放比較簡單,就是從頭部節點下的第一個非取消節點,喚醒執行緒即可, * 為了在釋放讀鎖的上下文環境中獲取代表讀鎖的執行緒,將資訊存入在 readHolds ThreadLocal變數中。 */ if (h == head)// loop if head changed break; } }
參考文獻
- ofollow,noindex">AbstractQueuedSynchronizer 原始碼分析 (基於Java 8)
- AQS的ConditionObject原始碼詳解
- 最不能忽略的AbstractQueuedSynchronizer類原始碼分析(必看)
- Java多執行緒:AQS原始碼分析