多執行緒之美8一 AbstractQueuedSynchronizer原始碼分析<二>
目錄
AQS的原始碼分析 <二>
該篇主要分析AQS的ConditionObject,是AQS的內部類,實現等待通知機制。
1、條件佇列
條件佇列與AQS中的同步佇列有所不同,結構圖如下:
兩者區別:
- 1、連結串列結構不同,條件佇列是單向連結串列,同步佇列是雙向連結串列。
- 2、兩個佇列中等待條件不同,條件佇列中執行緒是已經獲取到鎖,主動呼叫await方法釋放鎖,掛起當前執行緒,等待某個條件(如IO,mq訊息等),同步佇列中的執行緒是等待獲取鎖,在獲取鎖失敗後掛起等待鎖可用。
兩者聯絡:
當等待的某個條件完成,其他執行緒呼叫signal方法,通知掛起在條件佇列中的執行緒,會將條件佇列中該node移出,加入到同步佇列中,node的ws狀態由Node.CONDITION改為0 ,開始等待鎖。
2、ConditionObject
ConditionObject 和 Node一樣,都是AQS的內部類, ConditionObject實現Condition介面,主要實現執行緒呼叫 await和signal ,實現執行緒條件阻塞和通知機制,Condition物件通過 Lock子類呼叫newConditon方法獲取,以
ReentrantLock為例,程式碼如下:
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
可見排他鎖的newCondition方法返回的是ConditionObject物件
final ConditionObject newCondition() {
return new ConditionObject();
}
簡單生產者消費示例程式碼:
package AQS; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * @author zdd * 2019/12/30 下午 * Description: 利用ReentrantLock和Condition實現生產者消費者 */ public class ConditionTest { static ReentrantLock lock = new ReentrantLock(); static Condition condition = lock.newCondition(); public static void main(String[] args) { //資源類 Apple apple = new Apple(); //1.開啟生產者執行緒 new Thread(()-> { for (;;) { lock.lock(); try { //蘋果沒有被消費,吃完通知我,我再生產哦 if (apple.getNumber() > 0) { condition.await(); } TimeUnit.SECONDS.sleep(1); System.out.println("生產一個蘋果"); apple.addNumber(); //通知消費執行緒消費 condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } },"producer").start(); //2.開啟消費者執行緒 new Thread(()-> { for (;;) { lock.lock(); try { //蘋果數量為0,掛起等待生產蘋果,有蘋果了會通知 if(apple.getNumber() == 0) { condition.await(); } TimeUnit.SECONDS.sleep(1); System.out.println("消費一個蘋果"); apple.decreNumber(); //通知生產執行緒生產 condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } },"consumer").start(); } //定義蘋果內部類 static class Apple { //記錄蘋果數量 private Integer number =0; public void addNumber() { number++; System.out.println(Thread.currentThread().getName() +"當前蘋果數量:"+number ); } public void decreNumber() { number--; System.out.println(Thread.currentThread().getName() +"當前蘋果數量:"+number); } public Integer getNumber() { return number; } } }
執行結果如下圖:
2.1、 await() 方法
當前執行緒是在已經獲取鎖的情況下,呼叫await方法主動釋放鎖,掛起當前執行緒,等待某個條件(IO,mq訊息等)喚醒,再去競爭獲取鎖的過程。該方法會將當前執行緒封裝到node節點中,新增到Condition條件佇列中,釋放鎖資源,並掛起當前執行緒。
具體執行步驟如下:
1、執行緒封裝到node中,並新增到Condition條件佇列中,ws =-2 即為Node.CONDITION。
2、釋放鎖。
3、將自己阻塞掛起,如果執行緒被喚醒,首先檢查自己是被中斷喚醒的不。如果是被中斷喚醒,跳出while迴圈;如果是被其他執行緒signal喚醒,則判斷當前執行緒所在node是否被加入到同步等待佇列,已在同步佇列中也跳出while迴圈,否則繼續掛起,signal喚醒邏輯會將condition條件佇列node 移出,加入到同步佇列中,去等待獲取鎖。
4,執行緒被喚醒,執行acquireQueued方法,執行緒會嘗試獲取鎖,若失敗則在同步佇列中找到安全位置阻塞,成功則從呼叫await()方法處繼續向下執行,返回值表示當前執行緒是否被中斷過。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//掛起當前執行緒
LockSupport.park(this);
// 被喚醒: 1,被其他執行緒喚醒,2,中斷喚醒,
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//1,如果被signal正常喚醒執行acquireQueued,返回false,如果獲取到鎖就繼續執行呼叫await後面的程式碼了,未獲取到鎖就在同步佇列中繼續掛起等待鎖執行了
//2,如果被中斷喚醒的,acquireQueued 返回true
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
//執行緒在被signal後,再被中斷的
interruptMode = REINTERRUPT;
// 後面程式碼處理的是被中斷喚醒的情況
if (node.nextWaiter != null) // clean up if cancelled
//如果nextWaiter!=null,則表示還在條件佇列中,清理一下所有被取消node
//什麼情況下會進入該if判斷中,如果是正常被signal的,會將該node從條件佇列移出加入到同步佇列中的, nextWaiter 一定為null,那就是被異常中斷情況,
unlinkCancelledWaiters();
if (interruptMode != 0)
//響應中斷模式
reportInterruptAfterWait(interruptMode);
}
第1步,執行addConditionWaiter方法,主要邏輯是將執行緒封裝為Node,並新增到條件佇列中
private Node addConditionWaiter() {
//1.獲取佇列中最後一個節點
Node t = lastWaiter;
//2.如果最後一個節點被取消,清除出隊
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//3. t 指向最新有效的節點,也可能條件佇列為空,t==null
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
第2步,完全釋放鎖 fullyRelease,將同步狀態state 設定為初始值0,這裡考慮到有多次重入獲取鎖情況,state >1,這時需完全釋放鎖。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//1,釋放鎖
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//2,釋放鎖失敗,將條件佇列中的節點標記為取消
node.waitStatus = Node.CANCELLED;
}
}
isOnSyncQueue 判斷node是否在同步佇列中
final boolean isOnSyncQueue(Node node) {
//1,這2種情況肯定沒有在同步佇列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
//3.從同步佇列尾節點開始對比,看是否在同步佇列中
return findNodeFromTail(node);
}
findNodeFromTail 從後向前尋找
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
線上程被喚醒後,檢查掛起期間是否被中斷
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
如果執行緒被中斷了,那就需要將在條件佇列中等待的該節點執行 transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
// 判斷是否是被signal通知喚醒的,會更新為0,更新成功,執行入隊操作(加入同步佇列)
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
//未在同步佇列中,讓出處理器,執行緒回到就緒態,等待下一次分配cpu排程
Thread.yield();
return false;
}
最後根據不同的中斷值做出相應處理
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
//1,直接丟擲中斷異常
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
//2,中斷標誌
selfInterrupt();
}
2.2、signal方法
就是將條件佇列中的node移出,加入到同步佇列等待獲取鎖的過程。
流程圖如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 1、將first節點執行出隊操作
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
//2,如果條件佇列中有ws =-2的節點,肯定會移出一個到同步佇列中
}
final boolean transferForSignal(Node node) {
//1,將node ws更新為0 ,如果node 狀態不等於CONDITION,一定是被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//2,加入到同步佇列中,返回的p是node的pre
Node p = enq(node);
int ws = p.waitStatus;
//3,如果前置節點被取消,或者更新p的 ws =-1 失敗,直接喚醒執行緒,否則等待前置節點喚醒自己
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//喚醒執行緒
LockSupport.unpark(node.thread);
return true;
}
3、總結
1、Condition提供的阻塞通知機制與Object類兩者對比:
- 方法不同,Condition提供方法有 await(), signal(),signalAll(), Object類提供的是wait(),notify() , notifyAll()
- 配合使用物件不同,Condition條件需要和Lock配合使用,Object類需和Synchronized關鍵字配合。
- 多條件, Condition可實現多個條件,即建立多個Condition物件,可以每個Condition物件對應一種條件,從而有選擇的實現喚醒通知,Object類的要喚醒一個阻塞執行緒,只能在一個條件佇列中,喚醒是隨機的,沒有Condition使用靈活。
2、注意區別Condition條件佇列與同步佇列兩者的區別,2個佇列中執行緒等待條件不