多執行緒學習筆記三之ReentrantLock與AQS實現分析
目錄
簡介
ReentrantLock是基於同步器AbstractQueuedSynchronizer(AQS)實現的獨佔式重入鎖,支援公平鎖、非公平鎖(預設是非公平鎖)、申請鎖可響應中斷以及限時獲取鎖等高階功能,分析ReentrantLock就離不開同步器AQS,關係圖如下:
在AQS中實現瞭如何獲取鎖和釋放鎖的模板方法,重入鎖ReentrantLock實現時通過內部類繼承Sync同步器AbstractQueuedSynchronizer。並呼叫同步器提供的模板方法,而這些模板方法將會呼叫ReentrantLock重寫的方法,這是典型的模板方法設計模式。AQS實現同步器功能離不開三大基礎元件:
- 對共享資源同步狀態進行原子性管理 ---> 利用CAS對同步狀態進行更新
- 執行緒的阻塞與喚醒 ---> 呼叫native方法
- 等待佇列的管理 ---> 維護FIFO佇列
AQS同步狀態
AQS中使用了一個int型的volatile變數來表示同步狀態,執行緒在嘗試獲取鎖的時候,就回去比較同步器同步狀態state是否為0,為0,那麼執行緒就拿到了鎖並改變同步狀態;不為0,說明有其他執行緒拿到了鎖。AQS中提供了以下三個方法來訪問或修改同步狀態:
//AQS成員變數,同步狀態 private volatile int state; //獲取當前同步狀態 protected final int getState() { return state; } //設定當前同步狀態 protected final void setState(int newState) { state = newState; } //使用CAS設定當前狀態,該方法能夠保證狀態設定的原子性 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS同步佇列
當有多個執行緒競爭獲取鎖時,只有一個執行緒能獲取到鎖,那麼這些沒有獲取到鎖的執行緒就需要等待,等到執行緒把鎖釋放了再喚醒等待執行緒去獲取鎖,為了實現等待-喚醒機制,AQS提供了基於CLH佇列(Craig, Landin,Hagersten)實現的等待佇列,是一個先入先出的雙向佇列。同步佇列是一個非阻塞的 FIFO 佇列。也就是說往裡面插入或移除一個節點的時候,在併發條件下不會阻塞,而是通過自旋鎖和CAS保證節點插入和移除的原子性。

AQS中的內部類Node是構建同步佇列和等待佇列(後面介紹Condition再介紹)的基礎節點類,Node類部分原始碼如下:
static final class Node { //等待狀態 volatile int waitStatus; //前驅結點 volatile Node prev; //後繼節點 volatile Node next; //等待獲取鎖的執行緒 volatile Thread thread; //condition佇列的後繼節點 Node nextWaiter; }
關於節點Node的waitStatus,它反映的是節點中執行緒的等待狀態,有如下取值:
- CANCELLED,值為1,因為超時或中斷,該執行緒已經被取消
- SIGNAL,值為-1,執行緒的後繼執行緒正/已被阻塞,當該執行緒release或cancel時要重新這個後繼執行緒(unpark)
- CONDITION,值為-2,表明該執行緒被處於條件佇列,就是因為呼叫了Condition.await而被阻塞
- PROPAGATE,值為-3,表示當前場景下後續的acquireShared能夠得以執行
- 等待狀態的初始值為0,表示當前節點在sync佇列中,等待著獲取鎖。
ReentrantLock資料結構
從關係圖可以看出,ReentrantLock實現了Lock介面,內部類Sync是AQS的子類,Sync有兩個子類FairSync(公平鎖)和NonFairSync(非公平鎖)。ReentrantLock只有一個成員變數sync,通過建構函式初始化,可以看到通過預設的建構函式構造的ReentrantLock是非公平鎖。
private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
公平鎖的獲取
ReentrantLock獲取鎖方法如下:
public void lock() { sync.lock(); }
公平鎖呼叫的是FairSync的lock方法:
final void lock() { acquire(1); }
acquire方法是AQS實現的方法,介紹一下引數的1的意思:AQS規定同步狀態state,想要獲得鎖就去改變同步狀態,就是把同步狀態加1。acquire方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
獲取鎖的過程:
- 嘗試獲取鎖。
- 嘗試獲取失敗,將當前執行緒構成Node加入Sync佇列。
- 再次嘗試獲取,若獲取失敗執行緒進入等待態,等待喚醒。
tryAcquire(arg)
公平鎖嘗試獲取,在FairSync裡實現,獲取同步狀態成功返回true,否則返回false
protected final boolean tryAcquire(int acquires) { //獲取當前執行緒 final Thread current = Thread.currentThread(); //獲取同步狀態 int c = getState(); //同步狀態為0,沒有其他執行緒佔據鎖 if (c == 0) { //檢測同步佇列沒有其他執行緒等待(確保公平性),如果沒有獲取鎖就以CAS方式嘗試改變同步狀態 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //設定鎖的擁有者為當前執行緒 setExclusiveOwnerThread(current); return true; } } //同步狀態不為0,檢測是否是當前執行緒擁有鎖 else if (current == getExclusiveOwnerThread()) { //當前執行緒擁有鎖,直接更新同步狀態,重入鎖 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
-
hasQueuedPredecessors()
hasQueuedPredecessors是AQS中的方法,檢測同步佇列有沒有等待獲取鎖的執行緒,保證公平性。
public final boolean hasQueuedPredecessors() { //同步佇列尾節點 Node t = tail; //同步佇列頭節點 Node h = head; Node s; //h!=t 頭節點和尾節點不同,說明同步佇列不為空 //同步佇列不為空,檢測下一個等待獲取鎖的執行緒(h.next.thread)是不是當前執行緒 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
-
compareAndSetState(int expect, int update)
compareAndSetState()在AQS中實現。compareAndSwapInt() 是sun.misc.Unsafe類中的一個native方法,如果當前狀態值等於預期值,則以原子方式將同步狀態設定為給定的更新值。
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
-
setExclusiveOwnerThread(Thread thread) & getExclusiveOwnerThread()
setExclusiveOwnerThread和getExclusiveOwnerThread都是AQS父類AbstractOwnableSynchronizer的方法,setExclusiveOwnerThread用於設定執行緒t為當前擁有獨佔鎖的執行緒。getExclusiveOwnerThread用於獲得當前佔據獨佔鎖的執行緒
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; }
addWaiter(Node mode)
addWaiter在AQS中實現,以當前執行緒構成節點加入到同步佇列末尾,並返回這個節點Node。
private Node addWaiter(Node mode) { //以當前執行緒和給定模式構成節點Node Node node = new Node(Thread.currentThread(), mode); // 同步佇列不為空,以CAS方式把當前執行緒加入到佇列末尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //佇列為空,建立同步佇列,再把當前執行緒加入同步佇列 enq(node); return node; }
-
compareAndSetTail(Node expect, Node update)
compareAndSetTail是AQS中的方法,呼叫本地native方法,如果同步佇列隊尾是expect節點,就把update節點新增到佇列末尾,這是一個原子操作。
private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
- enq(final Node node)
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued(final Node node, int arg)
如果當前執行緒的節點的前驅結點,就去嘗試獲取同步狀態,如果不是或者獲取失敗根據waitStatus對同步佇列進行清理:把waitStatus為CANCELLED從同步佇列清除,修改錯誤的waitStatus,然後把執行緒堵塞,返回當前執行緒是否被中斷。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //當前節點的前驅結點 final Node p = node.predecessor(); //前驅結點是head頭節點,嘗試獲取同步狀態 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
-
shouldParkAfterFailedAcquire(Node pred, Node node)
前驅結點不是head頭節點或嘗試獲取同步狀態失敗以後,並不是馬上把當前執行緒執行緒堵塞,還要檢測同步佇列前驅結點的狀態,檢查規則如下:
- 如果前驅節點狀態為SIGNAL,表明當前節點需要被堵塞,此時則返回true。
- 如果前驅節點狀態為CANCELLED(ws>0),說明前繼節點已經被取消,則從後往前找到一個有效(非CANCELLED狀態)的節點,並返回false;之後無限迴圈直到步驟1返回true,執行緒阻塞。
- 如果前驅節點狀態為非SIGNAL、非CANCELLED,則CAS設定前驅節點的狀態為SIGNAL,並返回false;之後無限迴圈直到步驟1返回true,執行緒阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
-
parkAndCheckInterrupt()
把當前執行緒堵塞並檢查是否有中斷。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
鎖的釋放
ReentrantLock公平鎖與非公平鎖的釋放機制是一樣的,釋放鎖方法如下:
public void unlock() { sync.release(1); }
unlock方法呼叫的release方法是在AQS中實現的,這裡的1類似於acquire(1),適用於用來設定同步狀態的,釋放鎖時會把同步狀態減1。release方法會先呼叫tryRelease來嘗試釋放當前執行緒鎖持有的鎖。成功的話,則喚醒後繼等待執行緒,並返回true。否則,直接返回false
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease(int releases)
tryRelease嘗試獲取鎖,當同步狀態為0時清空佔據鎖的執行緒,返回true;如果同步狀態不為0返回false,因為ReentrantLock是重入鎖,只有徹底釋放tryRelease才會返回true。
protected final boolean tryRelease(int releases) { // c是本次釋放鎖之後的同步狀態 int c = getState() - releases; //當前執行緒不是鎖的擁有者,丟擲IllegalMonitorStateException異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //如果“鎖”已經被當前執行緒徹底釋放,則設定“鎖”的持有者為null,即鎖是可獲取狀態。 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
unparkSuccessor(Node node)
當前執行緒釋放鎖成功的話,會喚醒當前執行緒的後繼執行緒。從aquireQueued方法可以看出,一旦頭結點的後繼結點被喚醒,那麼後繼結點就嘗試去獲取鎖,如果獲取成功就將頭結點設定為自身,並將前一個頭節點清空。
private void unparkSuccessor(Node node) { // 獲取當前執行緒(要釋放鎖)的等待狀態 int ws = node.waitStatus; if (ws < 0) //設定為初始狀態 compareAndSetWaitStatus(node, ws, 0); //同步佇列頭節點的下一個等待節點 Node s = node.next; //等待節點無效,從同步節點尾部開始遍歷找到有效的等待節點 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //喚醒等待節點的執行緒 if (s != null) LockSupport.unpark(s.thread); }
非公平鎖的獲取
NonfairSync類中lock()實現,首先嚐試用CAS更改同步狀態,如果成功,把當前執行緒設定為獨佔鎖的擁有者;然後呼叫acquire(1)方法。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
acquire方法除了tryAcquire是由AQS的子類實現的,其他方法都是在AQS類實現的,tryAcquire的實現機制不同體現了公平鎖與非公平鎖的不同。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
ReentrantLock中的NonfairSync的tryAcquire方法,呼叫了nonfairTryAcquire方法
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
nonfairTryAcquire(int acquires)
非公平鎖的嘗試獲取鎖時,如果同步狀態為0,即沒有其他執行緒獲取到鎖,當前執行緒直接以CAS方式改變同步狀態,不會去同步佇列找是否有其他執行緒早於當前執行緒等在同步佇列中,效率較高。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //同步狀態為0,嘗試以CAS方式改變同步狀態 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //重入鎖 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
總結
本文介紹了ReentrantLock基於AQS同步器實現的公平鎖和非公平鎖的獲取和釋放,基於CAS改變同步狀態是獲得獨佔鎖的基礎,為了避免多個執行緒同時對進行競爭,在AQS中維護了FIFO的同步佇列,當獨佔鎖釋放時,AQS同步器排程同步佇列隊首等待節點的執行緒去獲取鎖,有效避免了海量競爭獨佔鎖造成資源的浪費,是一個非常巧妙的方法。