相比於獨占鎖跟共享鎖,AbstractQueuedSynchronizer中的條件隊列可能被關註的並不是很多,但它在阻塞隊列的實現裏起著至關重要的作用,同時如果想全面了解AQS,條件隊列也是必須要學習的。
原文地址:http://www.jianshu.com/p/3f8b08ca21cd
這篇文章會涉及到AQS中獨占鎖跟共享鎖的一些知識,如果你已經對這兩塊內容很了解了,那就直接往下看。否則在讀本文之前還是建議讀者先去看看我之前寫的兩篇文章溫習一下。
深入淺出AQS之獨占鎖模式
深入淺出AQS之共享鎖模式
一、使用場景介紹
區別於前面兩篇文章,可能之前很多人都沒有太在意AQS中的這塊內容,所以這篇文章我們先來看下條件隊列的使用場景:
//首先創建一個可重入鎖,它本質是獨占鎖
private final ReentrantLock takeLock = new ReentrantLock();
//創建該鎖上的條件隊列
private final Condition notEmpty = takeLock.newCondition();
//使用過程
public E take() throws InterruptedException {
//首先進行加鎖
takeLock.lockInterruptibly();
try {
//如果隊列是空的,則進行等待
notEmpty.await();
//取元素的操作...
//如果有剩余,則喚醒等待元素的線程
notEmpty.signal();
} finally {
//釋放鎖
takeLock.unlock();
}
//取完元素以後喚醒等待放入元素的線程
}
上面的代碼片段截取自LinkedBlockingQueue,是Java常用的阻塞隊列之一。
從上面的代碼可以看出,條件隊列是建立在鎖基礎上的,而且必須是獨占鎖(原因後面會通過源碼分析)。
二、執行過程概述
等待條件的過程:
- 在操作條件隊列之前首先需要成功獲取獨占鎖,不然直接在獲取獨占鎖的時候已經被掛起了。
- 成功獲取獨占鎖以後,如果當前條件還不滿足,則在當前鎖的條件隊列上掛起,與此同時釋放掉當前獲取的鎖資源。這裏可以考慮一下如果不釋放鎖資源會發生什麽?
- 如果被喚醒,則檢查是否可以獲取獨占鎖,否則繼續掛起。
條件滿足後的喚醒過程(以喚醒一個節點為例,也可以喚醒多個):
- 把當前等待隊列中的第一個有效節點(如果被取消就無效了)加入同步隊列等待被前置節點喚醒,如果此時前置節點被取消,則直接喚醒該節點讓它重新在同步隊列裏適當的嘗試獲取鎖或者掛起。
註:說到這裏必須要解釋一個知識點,整個AQS分為兩個隊列,一個同步隊列,一個條件隊列。只有同步隊列中的節點才能獲取鎖。前面兩篇獨占鎖共享鎖文章中提到的加入隊列就是同步隊列。條件隊列中所謂的喚醒是把節點從條件隊列移到同步隊列,讓節點有機會去獲取鎖。
二、源碼深入分析
下面的代碼稍微復雜一點,因為它考慮了中斷的處理情況。我由於想跟文章開頭的代碼片段保持一致,所以選取了該方法進行說明。如果只想看核心邏輯的話,那推薦讀者看看awaitUninterruptibly()方法的源碼。
//條件隊列入口,參考上面的代碼片段
public final void await() throws InterruptedException {
//如果當前線程被中斷則直接拋出異常
if (Thread.interrupted())
throw new InterruptedException();
//把當前節點加入條件隊列
Node node = addConditionWaiter();
//釋放掉已經獲取的獨占鎖資源
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果不在同步隊列中則不斷掛起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//中斷處理,另一種跳出循環的方式
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//走到這裏說明節點已經條件滿足被加入到了同步隊列中或者中斷了
//這個方法很熟悉吧?就跟獨占鎖調用同樣的獲取鎖方法,從這裏可以看出條件隊列只能用於獨占鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//走到這裏說明已經成功獲取到了獨占鎖,接下來就做些收尾工作
//刪除條件隊列中被取消的節點
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//根據不同模式處理中斷
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
流程比較復雜,一步一步來分析,首先看下加入條件隊列的代碼:
//註:1.與同步隊列不同,條件隊列頭尾指針是firstWaiter跟lastWaiter
//註:2.條件隊列是在獲取鎖之後,也就是臨界區進行操作,因此很多地方不用考慮並發
private Node addConditionWaiter() {
Node t = lastWaiter;
//如果最後一個節點被取消,則刪除隊列中被取消的節點
//至於為啥是最後一個節點後面會分析
if (t != null && t.waitStatus != Node.CONDITION) {
//刪除所有被取消的節點
unlinkCancelledWaiters();
t = lastWaiter;
}
//創建一個類型為CONDITION的節點並加入隊列,由於在臨界區,所以這裏不用並發控制
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//刪除取消節點的邏輯雖然長,但比較簡單,就不單獨說了,就是鏈表刪除
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
把節點加入到條件隊列中以後,接下來要做的就是釋放鎖資源:
//入參就是新創建的節點,即當前節點
final int fullyRelease(Node node) {
boolean failed = true;
try {
//這裏這個取值要註意,獲取當前的state並釋放,這從另一個角度說明必須是獨占鎖
//可以考慮下這個邏輯放在共享鎖下面會發生什麽?
int savedState = getState();
//跟獨占鎖釋放鎖資源一樣,不贅述
if (release(savedState)) {
failed = false;
return savedState;
} else {
//如果這裏釋放失敗,則拋出異常
throw new IllegalMonitorStateException();
}
} finally {
//如果釋放鎖失敗,則把節點取消,由這裏就能看出來上面添加節點的邏輯中只需要判斷最後一個節點是否被取消就可以了
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
走到這一步,節點也加入條件隊列中了,鎖資源也釋放了,接下來就該掛起了(先忽略中斷處理,單看掛起邏輯):
//如果不在同步隊列就繼續掛起(signal操作會把節點加入同步隊列)
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//中斷處理後面再分析
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//判斷節點是否在同步隊列中
final boolean isOnSyncQueue(Node node) {
//快速判斷1:節點狀態或者節點沒有前置節點
//註:同步隊列是有頭節點的,而條件隊列沒有
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//快速判斷2:next字段只有同步隊列才會使用,條件隊列中使用的是nextWaiter字段
if (node.next != null)
return true;
//上面如果無法判斷則進入復雜判斷
return findNodeFromTail(node);
}
//註意這裏用的是tail,這是因為條件隊列中的節點是被加入到同步隊列尾部,這樣查找更快
//從同步隊列尾節點開始向前查找當前節點,如果找到則說明在,否則不在
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
如果被喚醒且已經被轉移到了同步隊列,則會執行與獨占鎖一樣的方法acquireQueued()進行同步隊列獨占獲取。
最後我們來梳理一下裏面的中斷邏輯以及收尾工作的代碼:
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//這裏被喚醒可能是正常的signal操作也可能是中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//這裏的判斷邏輯是:
//1.如果現在不是中斷的,即正常被signal喚醒則返回0
//2.如果節點由中斷加入同步隊列則返回THROW_IE,由signal加入同步隊列則返回REINTERRUPT
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
//修改節點狀態並加入同步隊列
//該方法返回true表示節點由中斷加入同步隊列,返回false表示由signal加入同步隊列
final boolean transferAfterCancelledWait(Node node) {
//這裏設置節點狀態為0,如果成功則加入同步隊列
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//與獨占鎖同樣的加入隊列邏輯,不贅述
enq(node);
return true;
}
//如果上面設置失敗,說明節點已經被signal喚醒,由於signal操作會將節點加入同步隊列,我們只需自旋等待即可
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
在把喚醒後的中斷判斷做好以後,看await()中最後一段邏輯:
//在處理中斷之前首先要做的是從同步隊列中成功獲取鎖資源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//由於當前節點可能是由於中斷修改了節點狀態,所以如果有後繼節點則執行刪除已取消節點的操作
//如果沒有後繼節點,根據上面的分析在後繼節點加入的時候會進行刪除
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
//根據中斷時機選擇拋出異常或者設置線程中斷狀態
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
//實現代碼為:Thread.currentThread().interrupt();
selfInterrupt();
}
至此條件隊列await操作全部分析完畢。signal()方法相對容易一些,一起看源碼分析下:
//條件隊列喚醒入口
public final void signal() {
//如果不是獨占鎖則拋出異常,再次說明條件隊列只適用於獨占鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//如果條件隊列不為空,則進行喚醒操作
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
//該方法就是把一個有效節點從條件隊列中刪除並加入同步隊列
//如果失敗則會查找條件隊列上等待的下一個節點直到隊列為空
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
//將節點加入同步隊列
final boolean transferForSignal(Node node) {
//修改節點狀態,這裏如果修改失敗只有一種可能就是該節點被取消,具體看上面await過程分析
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//該方法很熟悉了,跟獨占鎖入隊方法一樣,不贅述
Node p = enq(node);
//註:這裏的p節點是當前節點的前置節點
int ws = p.waitStatus;
//如果前置節點被取消或者修改狀態失敗則直接喚醒當前節點
//此時當前節點已經處於同步隊列中,喚醒會進行鎖獲取或者正確的掛起操作
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
三、總結
相比於獨占鎖跟共享鎖,條件隊列可能是最不受關註的了,但由於它是阻塞隊列實現的關鍵組件,還是有必要了解一下其中的原理。其實我認為關鍵點有兩條,第一是條件隊列是建立在某個具體的鎖上面的,第二是條件隊列跟同步隊列是兩個隊列,前者依賴條件喚醒後者依賴鎖釋放喚醒,了解了這兩點以後搞清楚條件隊列就不是什麽難事了。
***
至此,Java同步器AQS中三大鎖模式就都分析完了。雖然已經盡力思考,盡量寫的清楚,但鑒於水平有限,如果有紕漏的地方,歡迎廣大讀者指正。
明天就是國慶長假了,我自己也計劃出國玩一趟,散散心。
提前祝廣大朋友國慶快樂。
Tags: 隊列 獨占 條件 深入淺出 takeLock 元素
文章來源: