AQS原始碼分析之獨佔鎖和共享鎖
簡介
AQS實現鎖機制並不是通過synchronized——給物件加鎖實現的,事實上它僅僅是一個工具類!它沒有使用更高階的機器指令,也不靠關鍵字,更不依靠JDK編譯時的特殊處理,僅僅作為一個普普通通的類就完成了程式碼塊的訪問控制。
AQS使用標記位+佇列的方式,記錄獲取鎖、競爭鎖、釋放鎖等一些類鎖操作。但更準確的說,AQS並不關心什麼是鎖,對於AQS來說它只是實現了一系列的用於判斷資源是否可以訪問的API,並且封裝了在訪問資源受限時,將請求訪問的執行緒加入佇列、掛起、喚醒等操作。AQS關心的問題如下:
- 資源不可訪問時,怎麼處理?
- 資源時可以被同時訪問,還是在同一時間只能被一個執行緒訪問?
- 如果有執行緒等不及資源了,怎麼從AQS佇列中退出?
至於資源能否被訪問的問題,則交給子類去實現。
站在使用者的角度,AQS的功能主要分為兩類:獨佔鎖和共享鎖。在它的所有子類中,要麼實現了它的獨佔功能的API,要麼實現了共享功能的API,但不會同時使用兩套API,即使是ReentrantReadWriteLock,也是通過兩個內部類:讀鎖和寫鎖,分別使用兩套API來實現的。
- 當AQS的子類實現獨佔功能時,如ReentrantLock,資源是否可以被訪問被定義為:只要AQS的state變數不為0,並且持有鎖的執行緒不是當前執行緒,那麼代表資源不可訪問。
- 當AQS的子類實現共享功能時,如CountDownLatch,資源是否可以被訪問
AQS類繼承結構圖
獨佔鎖
ReentrantLock是AQS獨佔功能的一個實現,通常的使用方式如下:
reentrantLock.lock();
// do something
reentrantLock.unlock();
ReentrantLock會保證執行do something
在同一時間有且只有一個執行緒獲取到鎖,其餘執行緒全部掛起,直到該擁有鎖的執行緒釋放鎖,被掛起的執行緒被喚醒重新開始競爭鎖。
ReentrantLock的加鎖全部委託給內部代理類完成,ReentrantLock只是封裝了統一的一套API而已,而ReentrantLock又分為公平鎖
abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
- 公平鎖:每個執行緒搶佔鎖的順序為先後呼叫lock方法的順序,並依此順序獲得鎖,類似於排隊吃飯;
- 非公平鎖:每個執行緒搶佔鎖的順序不變,誰運氣好,誰就獲得鎖,和呼叫lock方法的先後順序無關,類似後插入。
換句話說,公平鎖和非公平鎖的唯一的區別是在獲取鎖的時候是直接去獲取鎖,還是進入佇列排隊的問題。
獲取獨佔鎖的流程
FairSync的tryAcquire()
方法分析
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();/*獲取當前執行緒*/
int c = getState(); /*獲取父類AQS中的標誌位*/
if (c == 0) {
if (!hasQueuedPredecessors()
/*說明佇列中沒有其他執行緒:沒有執行緒正在佔有鎖,那麼修改一下狀態位*/
//注意:這裡的acquires是在lock的時候傳遞來的,從上面的圖中可以知道,這個值是寫死的1
&& compareAndSetState(0, acquires)) {
// 如果通過CAS操作將狀態為更新成功則代表當前執行緒獲取鎖,
// 因此,將當前執行緒設定到AQS的一個變數中,說明這個執行緒拿走了鎖。
setExclusiveOwnerThread(current);
return true;/*返回true,說明已拿到鎖*/
}
} else if (current == getExclusiveOwnerThread()) {
/*如果當前state不為0,說明鎖已經被拿走,那麼判斷是否是當前執行緒拿走了鎖*/
/*因為鎖是可重入的,可以重複lock,unlock*/
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果當前執行緒獲取到鎖,tryAcquire()
返回true,否則返回false,這時會返回到AQS的acquire()
方法。
如果沒有獲取到鎖,那麼應該將當前執行緒放到佇列中去,只不過,在放之前,需要做些包裝。
AQS的addWaiter()
方法分析
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
/*將節點加入到佇列尾部*/
node.prev = pred;
/*嘗試使用CAS方式修改尾節點,但是在【併發情況】下,可能修改失敗*/
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
/*修改失敗,說明有併發,那麼進入enq,以自旋方式修改*/
enq(node);
return node;
}
用當前執行緒去構造一個Node物件,mode是一個表示Node型別的欄位(獨佔的還是共享的)。構造好節點後,在佇列不為空的時候,使用CAS的方式將新的節點加入佇列尾部,如果修改失敗,則進入enq()
方法,使用自旋的方式修改。
/**
* 將節點通過自旋的方式插入到佇列尾部
*
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (; ; ) {
Node pred = tail;
if (pred == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return pred;
}
}
}
}
將執行緒的節點接入到隊裡中後,當然還需要做一件事:將當前執行緒掛起!這個事,由acquireQueued()
來做。
AQS的acquireQueued()
方法分析
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
/*獲得當前節點pred節點*/
final Node p = node.predecessor();
/*如果當前節點正好是第二個節點,那麼則再次嘗試獲取鎖*/
if (p == head && tryAcquire(arg)) {
/*獲取鎖成功,*/
setHead(node); /*將當前節點設定為頭結點*/
p.next = null; // help GC
failed = false;
return interrupted;
}
/*當前節點不是第二個節點 或者 再次獲取鎖失敗*/
/*判斷是否需要掛起,在掛起後,判斷執行緒是否中斷*/
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
判斷此時是否能夠安全的掛起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 當前節點已經被設定為等待喚醒的狀態,可以安全的掛起了
*/
return true;
if (ws > 0) {
/*
* 當前節點node的前任節點被取消,那麼【跳過】這些取消的節點,
* 當跳過之後,重新嘗試獲取鎖
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 通過前面的判斷,waitStatus一定不是 SIGNAL 或 CANCELLED。
* 推斷出一定是 0 or PROPAGATE
* 呼叫者需要再次嘗試,在掛起之前能不能獲取到鎖,
* 因此,將當前pred的狀態設為SIGNAL,再次嘗試獲取鎖之後,如果還沒有得到鎖那麼掛起
*
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
掛起執行緒,並判斷此時執行緒是否被中斷。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
/*如果被中斷,則返回true,interrupted()方法返回後,中斷狀態被取消,變為false*/
return Thread.interrupted();
}
到此為止,一個執行緒對於鎖的一次競爭才告一段落,結果有兩種:
- 要麼成功獲取到鎖(不用進入到AQS佇列中);
- 要麼獲取失敗,被掛起,等待下次喚醒後繼續迴圈嘗試獲取鎖。
值得注意的是,AQS的佇列為FIFO佇列,所以,每次被CPU假喚醒,且當前執行緒不是處在頭節點的位置,也是會被掛起的。AQS通過這樣的方式,實現了競爭的排隊策略。
釋放獨佔鎖流程分析
AQS的release()
方法分析
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
/*將持有鎖的頭結點釋放成功後
* 喚醒其後繼節點
* */
unparkSuccessor(h);
return true;
}
return false;
}
嘗試釋放鎖,如果釋放成功,找到AQS的頭節點,呼叫unparkSuccessor()
喚醒FIFO佇列中第一個等待鎖的節點。
Sync的tryRelease()
方法分析
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//如果【釋放的執行緒】和【獲取鎖的執行緒】不是同一個,丟擲非法監視器狀態異常。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 因為是重入的關係,不是每次釋放鎖c都等於0,
// 直到最後一次釋放鎖時,才通知AQS不需要再記錄哪個執行緒正在獲取鎖。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
AQS的unparkSuccessor()
方法分析
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) {
/*如果狀態為負(SIGNAL、PROPAGATE),那麼清除其狀態
* 如果失敗,或者狀態被其他等待執行緒改變,也沒有關係
* */
compareAndSetWaitStatus(node, ws, 0);
}
/*
* 一般情況下喚醒的執行緒是【頭結點】的【下一個節點】
* 但是如果該節點被取消或者為null,
* 那麼需要【從後往前遍歷】尋找一個【最早的】並且【沒有被取消】的節點
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) LockSupport.unpark(s.thread);
}
當FIFO佇列中等待鎖的第一個節點被喚醒之後,會返回到到節點所線上程的acquireQueued()
方法中,繼續下一輪迴圈,這時當前節點正好時頭節點的第一個後繼節點,並且使用CAS修改狀態持有鎖成功,那麼當前節點則晉升為頭結點,並返回。
····
for (; ; ) {
/*獲得當前節點pred節點*/
final Node p = node.predecessor();
/*如果當前節點正好是第二個節點,那麼則再次嘗試獲取鎖*/
if (p == head && tryAcquire(arg)) {
/*獲取鎖成功,*/
setHead(node); /*將當前節點設定為頭結點*/
p.next = null; // help GC
failed = false;
return interrupted;
}
/*當前節點不是第二個節點 或者 再次獲取鎖失敗*/
/*判斷是否需要掛起,在掛起後,判斷執行緒是否中斷*/
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
····
以上的分析是基於ReentrantLock內部的公平鎖來分析的,並且其lcok和unlock已經基本分析完畢,唯獨剩下一個非公平鎖nonfairSync。其實,它和公平鎖的唯一區別就是獲取鎖的方式不同,一個是按前後順序一次獲取鎖,一個是搶佔式的獲取鎖。
- 非公平鎖的lock方法的處理方式:在lock的時候先直接CAS修改一次state變數(嘗試獲取鎖),成功就返回,不成功再排隊,從而達到不排隊直接搶佔的目的。
- 而對於公平鎖:則是老老實實的開始就走AQS的流程排隊獲取鎖。如果前面有人呼叫過其lock方法,則排在佇列中前面,也就更有機會更早的獲取鎖,從而達到“公平”的目的。
共享鎖
共享功能的主要實現為CountDownLatch,CountDownLatch是一種靈活的閉鎖實現,它可以使一個或多個執行緒等待一組事件發生。閉鎖狀態包括一個計數器,該計數器被初始化為一個正數,表示需要等待的事件數量。countDown遞減計數器,表示有一個事件已經發生了,而await方法等待計數器達到零,這表示所有需要等待的時間都已經發生。如果計數器值非零,那麼await會一直阻塞直到計數器為零,或者等待執行緒中斷,或者等待超時。
等待獲取共享鎖的流程分析
CountDownLatch的await()
方法分析
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
CountDownLatch的await()方法不會只在一個執行緒中呼叫,多個執行緒可以同時等待await()
方法返回,所以CountDownLatch被設計成實現tryAcquireShared()
方法,獲取的是一個共享鎖,鎖在所有呼叫await()
方法的執行緒間共享,所以叫做共享鎖。
AQS的acquireSharedInterruptibly()
方法分析
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
/*用於響應執行緒中斷,
* 在前兩行會檢查執行緒是否被打斷
* */
if (Thread.interrupted())
throw new InterruptedException();
/*返回了-1,說明state不為0,也就是CountDownLatch的計數器還不為0*/
if (tryAcquireShared(arg) < 0)/*嘗試獲取共享鎖,如果小於0,表示獲取失敗*/
doAcquireSharedInterruptibly(arg);
}
Sync的tryAcquireShared()
方法分析
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
AQS的doAcquireSharedInterruptibly()
方法分析
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
/*將當前執行緒包裝為一個共享節點*/
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
//如果新建節點的前一個節點,就是Head,
//說明當前節點是AQS佇列中等待獲取鎖的第一個節點,
//按照FIFO的原則,可以直接嘗試獲取鎖。
int r = tryAcquireShared(arg);
if (r >= 0) {
/*如果獲取成功,則需要將當前節點設定為AQS佇列中的第一個節點*/
/*佇列的頭節點表示正在獲取鎖的節點*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//檢查下是否需要將當前節點掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
獲得共享鎖的流程分析
AQS的releaseShared()
方法分析
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (; ; ) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
死迴圈更新state的值,實現state的減1操作,之所以用死迴圈是為了確保state值的更新成功。
從上文的分析中可知,如果state的值為0,在CountDownLatch中意味:所有的子執行緒已經執行完畢,這個時候可以喚醒呼叫await()
方法的執行緒了,而這些執行緒正在AQS的佇列中,並被掛起的,所以下一步應該去喚醒AQS佇列中的頭節點了(AQS的佇列為FIFO佇列),然後由頭節點去依次喚醒AQS佇列中的其他共享節點。
AQS的doReleaseShared()
方法分析
private void doReleaseShared() {
for (; ; ) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//如果當前節點是SIGNAL意味著,它正在等待一個訊號。或者說,它在等待被喚醒,因此做兩件事
/*重置waitStatus標誌位,如果失敗則重試*/
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
/*重置成功後,喚醒等待獲取共享鎖的第一個節點*/
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//如果本身頭節點的waitStatus是處於重置狀態(waitStatus==0)的,將其設定為“傳播”狀態。
//意味著需要將狀態向後一個節點傳播。
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
當FIFO佇列中等待共享鎖的第一個節點被頭結點喚醒之後,會返回到到節點對應執行緒的doAcquireSharedInterruptibly()
方法中,並繼續迴圈,這時候當前節點的前驅節點正好時頭結點,並且能夠獲得共享鎖,這時會執行setHeadAndPropagate()
方法,將當前節點設定為頭結點,並繼續喚醒下一個節點。
AQS的setHeadAndPropagate()
方法分析
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
首先,使用CAS更換了頭結點,然後,將當前節點的下一個節點取出來。如果下一個節點同樣是shared型別,再做一個releaseShared()
操作。這時可以回到第二步,依次喚醒AQS佇列中其他共享節點。
總而言之,AQS關於共享鎖方面的實現方式:如果獲取共享鎖失敗後,將請求共享鎖的執行緒封裝成Node物件放入AQS的佇列中,並掛起Node物件對應的執行緒,實現請求鎖執行緒的等待操作。待共享鎖可以被獲取後,從頭節點開始,依次喚醒頭節點及其以後的所有共享型別的節點。實現共享狀態的傳播。
共享鎖與獨佔鎖的對比
- 與AQS的獨佔功能一樣,共享鎖是否可以被獲取的判斷為空方法,交由子類去實現。
- 與AQS的獨佔功能不同,當鎖被頭節點獲取後,獨佔功能是隻有頭節點獲取鎖,其餘節點的執行緒繼續沉睡,等待鎖被釋放後,才會喚醒下一個節點的執行緒,而共享功能是隻要頭節點獲取鎖成功,就在喚醒自身節點對應的執行緒的同時,繼續喚醒AQS佇列中的下一個節點的執行緒,每個節點在喚醒自身的同時還會喚醒下一個節點對應的執行緒,以實現共享狀態的“向後傳播”,從而實現共享功能。