AQS原始碼深入分析之條件佇列-你知道Java中的阻塞佇列是如何實現的嗎?
阿新 • • 發佈:2020-11-09
本文基於JDK-8u261原始碼分析
------
# 1 簡介
![img](https://img2020.cnblogs.com/other/1187061/202011/1187061-20201109170509374-960514910.png)
因為CLH佇列中的執行緒,什麼執行緒獲取到鎖,什麼執行緒進入佇列排隊,什麼執行緒釋放鎖,這些都是不受我們控制的。所以條件佇列的出現為我們提供了主動式地、只有滿足指定的條件後才能執行緒阻塞和喚醒的方式。對於條件佇列首先需要說明一些概念:條件佇列是AQS中除了CLH佇列之外的另一種佇列,每建立一個Condition實際上就是建立了一個條件佇列,而每呼叫一次await方法實際上就是往條件佇列中入隊,每呼叫一次signal方法實際上就是往條件佇列中出隊。不像CLH佇列上節點的狀態有多個,條件佇列上節點的狀態只有一個:CONDITION。所以如果條件佇列上一個節點不再是CONDITION狀態時,就意味著這個節點該出隊了。需要注意的是,**條件佇列只能執行在獨佔模式下**。
一般在使用條件佇列作為阻塞佇列來使用時都會建立兩個條件佇列:**notFull**和**notEmpty**。notFull表示當條件佇列已滿的時候,put方法會處於等待狀態,直到佇列沒滿;notEmpty表示當條件佇列為空的時候,take方法會處於等待狀態,直到佇列有資料了。
而notFull.signal方法和notEmpty.signal方法會將條件佇列上的節點移到CLH佇列中(每次只轉移一個)。也就是說,**存在一個節點從條件佇列被轉移到CLH佇列的情況發生**。同時也意味著,**條件佇列上不會發生鎖資源競爭,所有的鎖競爭都是發生在CLH佇列上的**。
其他一些條件佇列和CLH佇列之間的差異如下:
- 條件佇列使用nextWaiter指標來指向下一個節點,是一個單向連結串列結構,不同於CLH佇列的雙向連結串列結構;
- 條件佇列使用firstWaiter和lastWaiter來指向頭尾指標,不同於CLH佇列的head和tail;
- 條件佇列中的第一個節點也不會像CLH佇列一樣,是一個特殊的空節點;
- 不同於CLH佇列中會用很多的CAS操作來控制併發,條件佇列進佇列的前提是已經獲取到了獨佔鎖資源,所以很多地方不需要考慮併發。
下面就是具體的原始碼分析了。條件佇列以ArrayBlockingQueue來舉例:
------
# 2 構造器
```java
1 /**
2 * ArrayBlockingQueue:
3 */
4 public ArrayBlockingQueue(int capacity) {
5 this(capacity, false);
6}
7
8 public ArrayBlockingQueue(int capacity, boolean fair) {
9 if (capacity <= 0)
10 throw new IllegalArgumentException();
11 //存放實際資料的陣列
12 this.items = new Object[capacity];
13 //獨佔鎖使用ReentrantLock來實現(fair表示的就是公平鎖還是非公平鎖,預設為非公平鎖)
14 lock = new ReentrantLock(fair);
15 //notEmpty條件佇列
16 notEmpty = lock.newCondition();
17 //notFull條件佇列
18 notFull = lock.newCondition();
19 }
```
------
# 3 put方法
```java
1 /**
2 * ArrayBlockingQueue:
3 */
4 public void put(E e) throws InterruptedException {
5 //非空校驗
6 checkNotNull(e);
7 final ReentrantLock lock = this.lock;
8 /*
9 獲取獨佔鎖資源,響應中斷模式。其實現程式碼和lock方法還有Semaphore的acquire方法是類似的
10 因為這裡分析的是條件佇列,於是就不再分析該方法的細節了
11 */
12 lock.lockInterruptibly();
13 try {
14 while (count == items.length)
15 //如果陣列中資料已經滿了的話,就在notFull中入隊一個新節點,並阻塞當前執行緒
16 notFull.await();
17 //新增陣列元素並喚醒notEmpty
18 enqueue(e);
19 } finally {
20 //釋放鎖資源
21 lock.unlock();
22 }
23 }
```
------
# 4 await方法
如果在put的時候發現數組已滿,或者在take的時候發現數組是空的,就會呼叫await方法來將當前節點放入條件佇列中:
```java
1 /**
2 * AbstractQueuedSynchronizer:
3 */
4 public final void await() throws InterruptedException {
5 //如果當前執行緒被中斷就丟擲異常
6 if (Thread.interrupted())
7 throw new InterruptedException();
8 //把當前節點加入到條件佇列中
9 Node node = addConditionWaiter();
10 //釋放之前獲取到的鎖資源,因為後續會阻塞該執行緒,所以如果不釋放的話,其他執行緒將會等待該執行緒被喚醒
11 int savedState = fullyRelease(node);
12 int interruptMode = 0;
13 //如果當前節點不在CLH佇列中則阻塞住,等待unpark喚醒
14 while (!isOnSyncQueue(node)) {
15 LockSupport.park(this);
16 /*
17 這裡被喚醒可能是正常的signal操作也可能是被中斷了。但無論是哪種情況,都會將當前節點插入到CLH佇列尾,
18 並退出迴圈(注意,這裡被喚醒除了上面兩種情況之外,還有一種情況是作業系統級別的虛假喚醒(spurious wakeup),
19 也就是當前執行緒毫無理由就會被喚醒了,所以上面需要使用while來規避掉這種情況)
20 */
21 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
22 break;
23 }
24 //走到這裡說明當前節點已經插入到了CLH佇列中(被signal所喚醒或者被中斷)。然後在CLH佇列中進行獲取鎖資源的操作
25 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
26 /*
27