Java concurrent AQS 源碼詳解
一、引言
AQS(同步阻塞隊列)是concurrent包下鎖機制實現的基礎,相信大家在讀完本篇博客後會對AQS框架有一個較為清晰的認識
這篇博客主要針對AbstractQueuedSynchronizer的源碼進行分析,大致分為三個部分:
- 靜態內部類Node的解析
- 重要常量以及字段的解析
- 重要方法的源碼詳解。
所有的分析僅基於個人的理解,若有不正之處,請諒解和批評指正,不勝感激!!!
二、Node解析
AQS在內部維護了一個同步阻塞隊列,下面簡稱sync queue,該隊列的元素即靜態內部類Node的實例
首先來看Node中涉及的常量定義,源碼如下
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ staticfinal int CANCELLED = 1; /** waitStatus value to indicate successor‘s thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate*/ static final int PROPAGATE = -3;
以下兩個均為Node#nextWaiter字段的可取值
SHARED:若Node#nextWaiter為SHARED,那麽表明該Node節點處於共享模式
EXCLUSIVE:若Node#nextWaiter為EXCLUSIVE,那麽表明該Node節點處於獨占模式
以下五個均為Node#waitStatus字段的可取值
CANCELLED:用於標記一個已被取消的節點,一旦Node#waitStatus的值被設為CANCELLED,那麽waitStatus的值便不再被改變
SIGNAL:標記一個節點(記為node)處於這樣一種狀態:當node釋放資源(unlock/release)時,node節點必須喚醒其後繼節點
CONDITION:用於標記一個節點位於條件變量的阻塞隊列中(我稱這個阻塞隊列為Condition list),本篇暫不介紹Condition相關源碼,因此讀者可以暫時忽略
PROPAGATE:僅用於標記sync queue頭節點,且為一種暫時狀態,表明共享狀態正在傳遞中(如果有剩余資源且sync queue中仍有等待的節點,那麽這些節點會依次獲取資源,直至資源消耗殆盡或者隊列為空),僅在共享模式中出現
其次,再看Node中重要字段,源碼如下
/** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn‘t need to * signal. So, most code doesn‘t need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev‘s from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter;
waitStatus:節點的狀態,可取值有五種,分別是SIGNAL、CANCEL、CONDITION、PROPAGATE、0。其中獨占模式僅涉及到SIGNAL、CANCEL、0三種狀態,共享模式僅涉及到SIGNAL、CANCEL、PROPAGATE、0四種狀態。CONDITION狀態不會出現在sync queue中,而是位於條件變量的Condition list中,本篇博客暫不討論ConditoinObject
pre:前繼節點,該字段通過CAS操作進行賦值,保證可靠(現在不理解沒關系,後面的方法解析會多次提到)
next:後繼節點,該字段的賦值操作是非線程安全的,即next是不可靠的(Node#next為null並不代表節點不存在後繼)。但是,一旦next不為null,那麽next也是可靠的(現在不理解沒關系,後面的方法解析會多次提到)
thread:該節點關聯的線程
nextWaiter:獨占模式中就是null,共享模式中就是SHARED。在ConditionObject的Condition list中指向下一個節點
註意:Condition list用nextWaiter來連接單向鏈表(pre與next是無用的),sync queue利用pre和next來連接雙向鏈表(nextWaiter僅用於標記獨占或者共享模式而已),不要搞混了!!!
三、AQS字段解析
AQS字段僅有三個,源碼如下
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. */ private volatile int state;
head:sync queue隊列的頭節點
tail:sync queue隊列的尾節點
state:資源狀態
四、重要方法解析
4.1 acquire
該方法是獨占模式下的入口方法,可以相應interrupt,但是不會拋出InterruptedException異常
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { //首先執行tryAcquire(arg)嘗試獲取資源,如果成功則直接返回 //如果tryAcquire(arg)獲取資源失敗,則講當前線程封裝成Node節點加入到sync queue隊列中,並通過acquireQueued進行獲取資源直至成功(如果尚未有資源可獲取,那麽acquireQueued會阻塞當前線程) if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
其中tryAcquire方法如下,該方法的具體含義交給AQS的子類去完成,註意,該方法的實現不可有任何耗時操作,更不可阻塞線程,僅實現是否可獲取資源(換言之,是否可獲取鎖)的邏輯即可,源碼如下
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
addWaiter的作用是:將當前線程封裝成一個Node節點,並且添加到sync queue中
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { //生成指定模式的Node節點 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //以下幾行進行入隊操作,如果失敗,交給enq進行入隊處理。其實,我認為可以直接調用enq,不知道作者設置如下幾行的意圖 if (pred != null) { node.prev = pred; //通過CAS操作串行化並發入隊操作,僅有一個線程會成功,由於node節點的prev字段是在CAS操作之前進行的,一旦CAS操作成功,node節點的prev字段就是指向了其前繼節點,因此說prev字段是安全的 if (compareAndSetTail(pred, node)) { //這裏直接通過賦值操作賦值next字段,註意,可能有別的線程會在next字段賦值之前訪問到next字段,因此next字段是非可靠的(一個節點的next字段為null並不代表該節點沒有後繼) pred.next = node; //一旦next字段賦值成功,那麽next字段又變為可靠的了 return node; } } //通過enq入隊 enq(node); return node; }
enq入隊,源碼如下
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node‘s predecessor */ private Node enq(final Node node) { //死循環進行入隊操作,CAS操作常規模式 for (;;) { Node t = tail; //此時隊列為空,需要初始化 if (t == null) { // Must initialize //此時可能多個線程都在執行該方法,因此只有一個線程才能初始化sync queue,此處添加的節點我稱之為Dummy Node,該節點沒有關聯線程 if (compareAndSetHead(new Node())) tail = head; } else { //以下四行與addWaiter類似,不再贅述 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這裏拋出一個問題:在初始化sync queue中,將一個new Node()設置為了sync queue的頭結點,該節點沒有關聯任何線程,我稱之為"Dummy Node",這個頭結點"Dummy Node"待會可能會被設置為SIGNAL狀態,那麽它是如何喚醒後繼節點的呢?我會在在講到release時進行解釋
到這裏,線程已被封裝成節點,並且成功添加到sync queue中去了,接下來,來看最重要的acquireQueued方法
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { //用於記錄是否獲取成功,我現在還不清楚何時會失敗= = boolean failed = true; try { //記錄是否被中斷過,如果被中斷過,則需要在acquire方法中恢復中斷現場 boolean interrupted = false; //同樣的套路,CAS配合死循環 for (;;) { //獲取node節點的前繼節點p final Node p = node.predecessor(); //當p為sync queue頭結點時,才有資格嘗試獲取資源,換言之,當且僅當一個節點是sync queue中第二個節點時,它才有資格獲取資源 if (p == head && tryAcquire(arg)) { //一旦獲取成功,以下語句都是線程安全的,所有字段直接賦值即可,不需要CAS或者加鎖 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //否則,找到前繼節點,並將其設置為SIGNAL狀態後阻塞自己 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //如果失敗了 cancelAcquire(node); } }
該方法的主要邏輯就是:不斷地通過死循環執行獲取資源(當且僅當節點是sync queue中第二個節點時才有資格獲取資源)或者阻塞自己的操作,只有成果獲取資源後才能夠返回
接下來,來看shouldParkAfterFailedAcquire方法以及parkAndCheckInterrupt方法
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node‘s predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //一旦發現前繼節點是SIGNAL狀態,就返回true,在acquireQueued方法中會阻塞當前線程 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ //這裏給出兩個問題: //1.如果在當前線程阻塞之前,前繼節點就喚醒了當前線程,那麽當前線程不就永遠阻塞下去了嗎? //2.萬一有別的線程更改了前繼節點的狀態,導致前繼節點不喚醒當前線程,那麽當前線程不就永遠阻塞下去了嗎? return true; //如果前繼節點處於CANCELL狀態(僅有CANCELL狀態大於0) if (ws > 0) { //那麽跳過那些被CANCELL的節點,先前找到第一個有效節點 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //前繼節點狀態要麽是0,要麽是PROPAGATE,將其通過CAS操作設為SIGNAL,不用管是否成功,退回到上層函數acquireQueued進行再次判斷 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don‘t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
該方法的主要邏輯就是:將前繼節點設置為SIGNAL
關於上面提到的兩個問題
1. 如果在當前線程阻塞之前,前繼節點就喚醒了當前線程,那麽當前線程不就永遠阻塞下去了嗎?--->AQS采用的是Unsafe#park以及Unsafe#unpark,這對方法能夠很好的處理這類問題,可以先unpark獲取一枚許可,然後執行park不會阻塞當前線程,而是消耗這個提前獲取的許可,註意,多次unpark僅能獲取一枚許可
2.萬一有別的線程更改了前繼節點的狀態,導致前繼節點不喚醒當前線程,那麽當前線程不就永遠阻塞下去了嗎?--->一旦一個節點被設為SIGNAL狀態,AQS框架保證,任何改變其SIGNAL狀態的操作都會喚醒其後繼節點,因此,只要節點看到其前繼節點為SIGNAL狀態,便可放心阻塞自己
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //返回是否被中斷過 return Thread.interrupted(); }
至此,獨占模式的acquire調用鏈分析完畢,總結一下:首先嘗試獲取鎖(tryAcquire),若成功則直接返回。若失敗,將當前線程封裝成Node節點加入到sync queue隊列中,當該節點位於第二個節點時,會重新嘗試獲取鎖,成功則返回,失敗則阻塞自己,直至前繼節點喚醒自己
AQS通過死循環以及CAS操作來串行化並發操作,並且通過這種適當的自旋加阻塞,來減少頻繁的加鎖解鎖操作
4.2 release
release方法是獨占模式下釋放資源(即解鎖)的入口,源碼如下
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { //調用tryRelease嘗試釋放資源 if (tryRelease(arg)) { Node h = head; //只要頭節點不為空且狀態不為0,就喚醒後繼節點,對於獨占模式也就只有SIGNAL狀態一種,頭結點在任何情況下都不可能為CANCELL狀態 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
在此,解釋一下enq方法中提到的問題,即那個"Dummy Node"如何喚醒後繼:由於"Dummy Node"不關聯任何線程,因此真正的喚醒操作實際上是由外部的線程來完成的,這裏的外部線程是指從未進入sync queue的線程,因此,"Dummy Node"節點設置為SIGNAL狀態,也能夠正常喚醒後繼
同理,tryRelease也是交給AQS子類實現的方法,只需要定義釋放資源的邏輯即可,該方法的實現不應該有耗時的操作,更不該阻塞
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
通過unparkSuccessor方法喚醒指定節點的後繼節點
/** * Wakes up node‘s successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; //若節點狀態小於0,將其通過CAS操作改為0,表明本次SIGNAL的任務已經完成,至於CAS是否成功,或者是否再次被其他線程修改,都與本次無關unparkSuccessor無關,只是該節點被賦予了新的任務而已。 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //這裏通過非可靠的next字段直接獲取後繼,如果非空,那麽說明該字段可靠,如果為空,那麽利用可靠的prev字段從tail向前找到當前node節點的後繼節點 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //喚醒後繼節點 if (s != null) LockSupport.unpark(s.thread); }
4.3 acquireShared
待續
4.4 releaseShared
待續
Java concurrent AQS 源碼詳解