圖解AQS系列(上)--獨佔鎖
開場白
AQS在juc包中簡直是基石般的存在,筆者會通過juc包中的ReentrantLock來講解AQS的獨佔鎖實現,通過Semaphore來講解下AQS共享鎖的實現。
本文力求用直白的結構圖和詳細的描述,讓大家花最少的時間,便能夠比較詳細的瞭解AQS的流程。
一、AQS等待佇列
所有未獲取到鎖的執行緒,都會進入AQS的等待佇列,其實就是一個雙向連結串列,如下圖:
head節點是佇列初始化的時候一個節點,只表示位置,不代表實際的等待執行緒。head節點之後的節點就是獲取鎖失敗進入等待佇列的執行緒。接下來,我們開啟AQS原始碼,看下Node節點都有哪些關鍵內容:
static final class Node { /** 共享模式 */ static final Node SHARED = new Node(); /** 獨佔模式 */ static final Node EXCLUSIVE = null; /** 節點狀態值,表示節點已經取消 */ static final int CANCELLED =1; /** 節點狀態值,在當前節點釋放或者取消的時候,會喚醒下一個節點 */ static final int SIGNAL= -1; /** 此處可忽略,waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * 這個值是在共享鎖的時候會用到,喚醒了一個節點,會嘗試喚醒下一個節點, * 如果當前節點未阻塞(阻塞前就獲得了鎖),那麼當前節點的狀態會被設定成-3 */ static final int PROPAGATE = -3; //等待狀態 volatile int waitStatus; //前驅節點 volatile Node prev; //後繼節點 volatile Node next; //等待的執行緒 volatile Thread thread; //此處可忽略,主要是模式的判斷 Node nextWaiter; }複製程式碼
我們再看下AQS的另外兩個核心屬性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //同步狀態值(鎖的數量) private volatile int state; //ps:繼承自父類AbstractOwnableSynchronizer的屬性,此處為了顯示方便就拿過來了 //記錄獲得了鎖的執行緒 private transient Thread exclusiveOwnerThread //上文中的Node節點 static final class Node { //...... } }複製程式碼
接下來,我們通過ReentrantLock的加鎖和解鎖流程,來看看執行緒是如何加入等待佇列的,以及佇列中每個節點的狀態值是如何變化的。
二、獨佔鎖--加鎖(ReentrantLock.lock() )
我們先初略看下ReentrantLock的核心結構
public class ReentrantLock implements Lock, java.io.Serializable { //繼承自AQS abstract static class Sync extends AbstractQueuedSynchronizer { //..... } static final class NonfairSync extends Sync { // ..... 非公平鎖 } static final class FairSync extends Sync { // ..... 公平鎖 } }複製程式碼
可以看出, ReentrantLock主要是由內部類繼承自AQS,並實現了非公平鎖和公平鎖。 我們看下加鎖的流程(本處以非公平鎖為例子,下文會單獨提下公平鎖的區別):
接下來,我們按照流程圖的順序,看下原始碼的實現細節(以非公平鎖為例)
final void lock() { //嘗試快速獲取鎖 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread());//成功後記錄獲取鎖的執行緒 else acquire(1);//走獲取鎖的常規流程 }複製程式碼
acquire()是AQS的模板方法,其中 tryAcquire()由子類自己實現,而addWaiter()和acquireQueued()都是固定的。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }複製程式碼
這裡主要由3個步驟。1、呼叫ReentrantLock的tryAcquire()方法嘗試獲取鎖。 2、如果失敗,則呼叫addWaiter()方法,把當前執行緒加入AQS的等待佇列。 3、之後呼叫acquireQueued()方法來自旋獲取鎖或者把當前節點的執行緒掛起。我們逐步看下這3步的實現。
protected final boolean tryAcquire(int acquires) { //非公平鎖的實現 return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();//狀態值 if (c == 0) {//鎖空閒 if (compareAndSetState(0, acquires)) {//嘗試CAS快速搶佔狀態值 setExclusiveOwnerThread(current);//記錄當前執行緒獲取了鎖 return true; } } //當前執行緒重入(同一執行緒需要重複執行加鎖的方法,比如遞迴呼叫) else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires;//狀態值增加 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc);//設定狀態值 return true; } return false; } 複製程式碼
如果第1步獲取鎖失敗了,那麼就需要加入等待佇列。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//新建一個節點(當前執行緒) // 首先會嘗試用CAS把當前新節點快速加入到尾節點 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node);//如果CAS失敗了,則自旋+CAS新增到尾節點 return node; }複製程式碼
看下自旋加入佇列操作
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 如果尾節點為null,則必須要先初始化head,tail節點 if (compareAndSetHead(new Node()))//新建一個節點(佔位的作用) tail = head;//head和tail都指向第一個位置節點 } else { node.prev = t; if (compareAndSetTail(t, node)) {//通過CAS把當前執行緒節點加入到等待佇列尾部 t.next = node; return t; } } } }複製程式碼
最後,需要呼叫acquireQueued()方法來做最後的操作
final boolean acquireQueued(final Node node, int arg) { boolean failed = true;//是否取消節點 try { boolean interrupted = false;//是否中斷執行緒 for (;;) { final Node p = node.predecessor();//獲取當前執行緒節點的前驅節點 if (p == head && tryAcquire(arg)) {//如果前驅節點是head節點,並且當前節點執行緒獲取鎖成功 setHead(node);//把當前節點設定為head節點(當前節點變成了位置標示的作用) p.next = null; // help GC,這裡去除原先的head節點的強引用,方便GC回收資源 failed = false; return interrupted; } //在自旋過程中,需要判斷當前執行緒是否需要阻塞(正常情況下最多迴圈3次,而不是無限迴圈。當然前驅節點一直被取消除外) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node);//取消節點,相見下文 } }複製程式碼
先來看下AQS如何判斷當前執行緒是否需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//當前節點 的 前驅節點 的等待狀態值 if (ws == Node.SIGNAL) //如果ws==-1,則當前節點等待前驅節點喚醒,自己可以放心的阻塞 return true; if (ws > 0) { //ws>0,那麼前驅節點已經被取消了,那麼從前驅節點往前找到waitStatus<=0的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node;//設定為當前節點的前驅節點 } else { /* 到這裡,前驅節點的ws只有0或者-3(PROPAGATE)兩種情況,表明當前節點需要 * 等待訊號喚醒,但是這裡不是馬上掛起,而是再迴圈一次,如果下一次還不能獲取鎖, * 那麼就會掛起當前執行緒 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }複製程式碼
如果需要阻塞,那麼會執行下面方法
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//呼叫LockSupport阻塞執行緒(再往下是UNSAFE,不再深入了) return Thread.interrupted();//獲取當前執行緒是否需要中斷,同時清理中斷標誌 }複製程式碼
下面看下取消節點都做了什麼
private void cancelAcquire(Node node) { // 節點為null,啥都不做 if (node == null) return; node.thread = null;//釋放資源 // 前驅節點,如果已經取消的,則跳過 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 前驅節點的後繼節點 Node predNext = pred.next; // 當前節點ws設定為CANCELLED node.waitStatus = Node.CANCELLED; // 如果當前節點是尾節點,先把前驅節點設定為tail節點,然後在移除當前節點 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 如果後繼節點需要訊號喚醒,那麼就把後繼節點鏈到前驅節點的後面;否則直接喚醒後繼節點 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //喚醒當前節點的下一個阻塞節點(釋放鎖的流程詳細講一下) unparkSuccessor(node); } node.next = node; // help GC } }複製程式碼
到這裡,加鎖的過程已經結束了。我們先來階段性總結下等待佇列中的節點等待狀態ws的數值變化情況:
- 新建節點,ws==0
- 取消節點,ws==1 (CANCELLED)
- 進入等待佇列後,有機率把ws設定為-1 (SIGNAL)
還有一種狀態-3( PROPAGATE ),我們會在下篇講解共享鎖的時候提到。
上文我們是以非公平鎖為例子講解的,其實ReentrantLock的公平鎖和非公平鎖就一步的區別
static final class FairSync extends Sync { protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //差別就是這個hasQueuedPredecessors()方法,如果前面有人排隊了, //他就不插隊了,乖乖進入等待佇列 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }複製程式碼
下面我們看下釋放鎖的流程。
三、獨佔鎖--釋放鎖(ReentrantLock.unlock() )
釋放鎖的過程,不管公平鎖和非公平鎖,都是一樣的。我們先來看下流程圖
釋放鎖的流程比較簡單,我們快速過一下原始碼
//ReentrantLock.unlock() public void unlock() { sync.release(1); } //AQS的方法 public final boolean release(int arg) { //先嚐試釋放鎖,成功則喚醒head節點的後繼節點 if (tryRelease(arg)) {Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }複製程式碼
看下釋放鎖的過程
protected final boolean tryRelease(int releases) { int c = getState() - releases;//減去狀態值 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//如果狀態值為0,那麼重置持有鎖的執行緒屬性(重入鎖釋放後c仍可能大於0) free = true; setExclusiveOwnerThread(null); } setState(c); return free; }複製程式碼
接著講下上文一筆帶過的喚醒後繼節點執行緒的操作
private void unparkSuccessor(Node node) { //提醒下,這個方法是喚醒節點的後繼節點 int ws = node.waitStatus; if (ws < 0)//當前節點如果小於0,設定為0 compareAndSetWaitStatus(node, ws, 0); /* * 嘗試喚醒後繼節點的阻塞執行緒,一般就是下一個節點 */ Node s = node.next;//獲取後繼節點 if (s == null || s.waitStatus > 0) {//如果下一個節點為null或者已經被取消 s = null; //那麼需要從尾部節點開始往前找,找到最靠近當前節點的且ws<=0的節點,然後喚醒節點執行緒 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//喚醒執行緒還是呼叫LockSupport來實現,底層是UNSAFE類 }複製程式碼
四、小結
AQS的獨佔鎖流程到這裡就講完了。其實只要大家先了解AQS的等待佇列結構(還有另外兩個核心屬性state和持有鎖的執行緒變數 exclusiveOwnerThread ),然後再把節點等待狀態值(ws)的數值變化場景搞清楚,那麼AQS就會變得簡單直白。
本文有提到一個模版模式,對設計模式不瞭解的同學,可以看下設計模式。後續會出圖解AQS系列(下),來講解下AQS的共享鎖實現。