1. 程式人生 > >AQS原始碼深入分析之條件佇列-你知道Java中的阻塞佇列是如何實現的嗎?

AQS原始碼深入分析之條件佇列-你知道Java中的阻塞佇列是如何實現的嗎?

本文基於JDK-8u261原始碼分析 ------ # 1 簡介 ![img](https://img2020.cnblogs.com/other/1187061/202011/1187061-20201109170509374-960514910.png) 因為CLH佇列中的執行緒,什麼執行緒獲取到鎖,什麼執行緒進入佇列排隊,什麼執行緒釋放鎖,這些都是不受我們控制的。所以條件佇列的出現為我們提供了主動式地、只有滿足指定的條件後才能執行緒阻塞和喚醒的方式。對於條件佇列首先需要說明一些概念:條件佇列是AQS中除了CLH佇列之外的另一種佇列,每建立一個Condition實際上就是建立了一個條件佇列,而每呼叫一次await方法實際上就是往條件佇列中入隊,每呼叫一次signal方法實際上就是往條件佇列中出隊。不像CLH佇列上節點的狀態有多個,條件佇列上節點的狀態只有一個:CONDITION。所以如果條件佇列上一個節點不再是CONDITION狀態時,就意味著這個節點該出隊了。需要注意的是,**條件佇列只能執行在獨佔模式下**。 一般在使用條件佇列作為阻塞佇列來使用時都會建立兩個條件佇列:**notFull**和**notEmpty**。notFull表示當條件佇列已滿的時候,put方法會處於等待狀態,直到佇列沒滿;notEmpty表示當條件佇列為空的時候,take方法會處於等待狀態,直到佇列有資料了。 而notFull.signal方法和notEmpty.signal方法會將條件佇列上的節點移到CLH佇列中(每次只轉移一個)。也就是說,**存在一個節點從條件佇列被轉移到CLH佇列的情況發生**。同時也意味著,**條件佇列上不會發生鎖資源競爭,所有的鎖競爭都是發生在CLH佇列上的**。 其他一些條件佇列和CLH佇列之間的差異如下: - 條件佇列使用nextWaiter指標來指向下一個節點,是一個單向連結串列結構,不同於CLH佇列的雙向連結串列結構; - 條件佇列使用firstWaiter和lastWaiter來指向頭尾指標,不同於CLH佇列的head和tail; - 條件佇列中的第一個節點也不會像CLH佇列一樣,是一個特殊的空節點; - 不同於CLH佇列中會用很多的CAS操作來控制併發,條件佇列進佇列的前提是已經獲取到了獨佔鎖資源,所以很多地方不需要考慮併發。 下面就是具體的原始碼分析了。條件佇列以ArrayBlockingQueue來舉例: ------ # 2 構造器 ```java 1 /** 2 * ArrayBlockingQueue: 3 */ 4 public ArrayBlockingQueue(int capacity) { 5 this(capacity, false); 6} 7 8 public ArrayBlockingQueue(int capacity, boolean fair) { 9 if (capacity <= 0) 10 throw new IllegalArgumentException(); 11 //存放實際資料的陣列 12 this.items = new Object[capacity]; 13 //獨佔鎖使用ReentrantLock來實現(fair表示的就是公平鎖還是非公平鎖,預設為非公平鎖) 14 lock = new ReentrantLock(fair); 15 //notEmpty條件佇列 16 notEmpty = lock.newCondition(); 17 //notFull條件佇列 18 notFull = lock.newCondition(); 19 } ``` ------ # 3 put方法 ```java 1 /** 2 * ArrayBlockingQueue: 3 */ 4 public void put(E e) throws InterruptedException { 5 //非空校驗 6 checkNotNull(e); 7 final ReentrantLock lock = this.lock; 8 /* 9 獲取獨佔鎖資源,響應中斷模式。其實現程式碼和lock方法還有Semaphore的acquire方法是類似的 10 因為這裡分析的是條件佇列,於是就不再分析該方法的細節了 11 */ 12 lock.lockInterruptibly(); 13 try { 14 while (count == items.length) 15 //如果陣列中資料已經滿了的話,就在notFull中入隊一個新節點,並阻塞當前執行緒 16 notFull.await(); 17 //新增陣列元素並喚醒notEmpty 18 enqueue(e); 19 } finally { 20 //釋放鎖資源 21 lock.unlock(); 22 } 23 } ``` ------ # 4 await方法 如果在put的時候發現數組已滿,或者在take的時候發現數組是空的,就會呼叫await方法來將當前節點放入條件佇列中: ```java 1 /** 2 * AbstractQueuedSynchronizer: 3 */ 4 public final void await() throws InterruptedException { 5 //如果當前執行緒被中斷就丟擲異常 6 if (Thread.interrupted()) 7 throw new InterruptedException(); 8 //把當前節點加入到條件佇列中 9 Node node = addConditionWaiter(); 10 //釋放之前獲取到的鎖資源,因為後續會阻塞該執行緒,所以如果不釋放的話,其他執行緒將會等待該執行緒被喚醒 11 int savedState = fullyRelease(node); 12 int interruptMode = 0; 13 //如果當前節點不在CLH佇列中則阻塞住,等待unpark喚醒 14 while (!isOnSyncQueue(node)) { 15 LockSupport.park(this); 16 /* 17 這裡被喚醒可能是正常的signal操作也可能是被中斷了。但無論是哪種情況,都會將當前節點插入到CLH佇列尾, 18 並退出迴圈(注意,這裡被喚醒除了上面兩種情況之外,還有一種情況是作業系統級別的虛假喚醒(spurious wakeup), 19 也就是當前執行緒毫無理由就會被喚醒了,所以上面需要使用while來規避掉這種情況) 20 */ 21 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 22 break; 23 } 24 //走到這裡說明當前節點已經插入到了CLH佇列中(被signal所喚醒或者被中斷)。然後在CLH佇列中進行獲取鎖資源的操作 25 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 26 /* 27