1. 程式人生 > >Java併發包原始碼學習系列:阻塞佇列實現之ArrayBlockingQueue原始碼解析

Java併發包原始碼學習系列:阻塞佇列實現之ArrayBlockingQueue原始碼解析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112301359) - [Java併發包原始碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838) - [Java併發包原始碼學習系列:ReentrantLock可重入獨佔鎖詳解](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112454874) - [Java併發包原始碼學習系列:ReentrantReadWriteLock讀寫鎖解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112689635) - [Java併發包原始碼學習系列:詳解Condition條件佇列、signal和await](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112727669) - [Java併發包原始碼學習系列:掛起與喚醒執行緒LockSupport工具類](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112757098) - [Java併發包原始碼學習系列:JDK1.8的ConcurrentHashMap原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113059783) - [Java併發包原始碼學習系列:阻塞佇列BlockingQueue及實現原理分析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113186979) ## ArrayBlockingQueue概述 ArrayBlockingQueue是由**陣列**構成的有界阻塞佇列,支援FIFO的次序對元素進行排序。 這是一個典型的有界緩衝結構,可指定大小儲存元素,供生產執行緒插入,供消費執行緒獲取,但注意,容量一旦指定,便不可修改。 佇列空時嘗試take操作和佇列滿時嘗試put操作都會阻塞執行操作的執行緒。 該類還支援可供選擇的**公平性策略**,`ReentrantLock`可重入鎖實現,預設採用非公平策略,當佇列可用時,阻塞的執行緒都可以爭奪訪問佇列的資格。 > 阻塞佇列通過`ReentrantLock` + `Condition`實現併發環境下的等待通知機制:讀操作和寫操作都需要獲取到AQS獨佔鎖才能進行操作,如果佇列為空,則讀操作執行緒將會被包裝為條件節點扔到讀執行緒等待條件佇列中,等待寫執行緒寫入新的元素,同時讀執行緒將會被喚醒,反之亦然。 ## 類圖結構及重要欄位 ![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210127162626840-260834696.png) ```java public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { // 序列號, 用於序列化 private static final long serialVersionUID = -817911632652898426L; // 底層儲存資料的定長陣列 final Object[] items; // 移除操作的index,可以理解為隊頭位置 int takeIndex; // 新增操作的index,可以理解為隊尾位置 int putIndex; // 元素個數 int count; // 獨佔重入鎖 final ReentrantLock lock; // 等待takes的條件物件 private final Condition notEmpty; // 等待puts的條件物件 private final Condition notFull; // Itrs表示佇列和迭代器之間的共享資料,其實用來儲存多個迭代器例項的 transient Itrs itrs = null; } ``` ## 構造器 使用ArrayBlockingQueue的時候,必須指定一個capacity阻塞佇列的容量。可以傳入可選的fair值,以採取不同公平性策略,預設使用非公平的策略。另外,可以傳入集合物件,直接構造阻塞佇列。 ```java // 必須指定容量, 預設採用非公平策略 public ArrayBlockingQueue(int capacity) { this(capacity, false); } // 另外,可指定公平性策略 public ArrayBlockingQueue(int capacity, boolean fair) { // 對容量進行簡單校驗 if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; // 初始化底層陣列 lock = new ReentrantLock(fair); // 初始化lock notEmpty = lock.newCondition(); // 初始化條件變數notEmpty notFull = lock.newCondition(); // 初始化條件變數notFull } // 另外,可指定傳入集合直接構造 public ArrayBlockingQueue(int capacity, boolean fair, Collection c) { this(capacity, fair); // 加鎖只是為了可見性, 而不是為了互斥特性 final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { // 遍歷賦值 checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } ``` ## 出隊和入隊操作 佇列的操作最核心的部分莫過於入隊和出隊了,後面分析的方法基本上都基於這兩個工具方法。 ### 入隊enqueue ```java private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 把元素x放入陣列 items[putIndex] = x; // 下一個元素應該存放的下標位置 if (++putIndex == items.length) putIndex = 0; count++; // 啟用notEmpty的條件佇列因呼叫take操作而被阻塞的一個執行緒 notEmpty.signal(); } ``` 1. 將元素x置入陣列中。 2. 計算下一個元素應該存放的下標位置。 3. 元素個數器遞增,這裡count前加了鎖,值都是從主記憶體中獲取,不會存在記憶體不可見問題,並且更新也會直接重新整理回主記憶體中。 4. 最後啟用notEmpty的條件佇列因呼叫take操作而被阻塞的一個執行緒。 ### 出隊dequeue ```java private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 獲取元素 E x = (E) items[takeIndex]; // 置null items[takeIndex] = null; // 重新設定對頭下標 if (++takeIndex == items.length) takeIndex = 0; // 更新元素計數器 count--; // 更新迭代器中的元素資料,itrs只用在使用迭代器的時候才例項化哦 if (itrs != null) itrs.elementDequeued(); // 啟用notFull的條件佇列因呼叫put操作而被阻塞的一個執行緒 notFull.signal(); return x; } ``` 1. 獲取元素,並將當前位置置null。 2. 重新設定隊頭下標。 3. 元素計數器遞減。 4. 更新迭代器中的元素資料,itrs預設情況下都是為null的,只有使用迭代器的時候才會例項化Itrs。 5. 啟用notFull的條件佇列因呼叫put操作而被阻塞的一個執行緒。 ## 阻塞式操作 ### E take() 阻塞式獲取 take操作將會獲取當前佇列頭部元素並移除,如果佇列為空則阻塞當前執行緒直到佇列不為空,退出阻塞時返回獲取的元素。 > 那,執行緒阻塞至何時如何知道呢,其實當前執行緒將會因`notEmpty.await()`被包裝成等待節點置入notEmpty的條件佇列中,一旦enqueue操作成功觸發,也就是入隊成功,將會執行`notEmpty.signal()`喚醒條件佇列中等待的執行緒,被轉移到AQS佇列中參與鎖的爭奪。 如果執行緒在阻塞時被其他執行緒設定了中斷標誌,則丟擲InterruptedException異常並返回。 ```java public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 可響應中斷式地獲取鎖 lock.lockInterruptibly(); try { // 如果佇列為空,則將當前執行緒包裝為等待節點置入notEmpty的條件佇列中 while (count == 0) notEmpty.await(); // 非空,則執行入隊操作,入隊時喚醒notFull的條件佇列中的第一個執行緒 return dequeue(); } finally { lock.unlock(); } } ``` ### void put(E e) 阻塞式插入 put操作將向隊尾插入元素,如果佇列未滿則插入,如果佇列已滿,則阻塞當前執行緒直到佇列不滿。 如果執行緒在阻塞時被其他執行緒設定了中斷標誌,則丟擲InterruptedException異常並返回。 ```java public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果佇列滿,則將當前執行緒包裝為等待節點置入notFull的條件佇列中 while (count == items.length) notFull.await(); // 非滿,則執行入隊操作,入隊時喚醒notEmpty的條件佇列中的第一個執行緒 enqueue(e); } finally { lock.unlock(); } } ``` ### E poll(timeout, unit) 阻塞式超時獲取 在take阻塞式獲取方法的基礎上額外增加超時功能,傳入一個timeout,獲取不到而阻塞的時候,如果時間到了,即使還獲取不到,也只能立即返回null。 ```java public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { // 佇列仍為空,但是時間到了,必須返回了 if (nanos <= 0) return null; // 在條件佇列裡等著,但是需要更新時間 nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } ``` ### boolean offer(e, timeout, unit) 阻塞式超時插入 在put阻塞式插入方法的基礎上額外增加超時功能,傳入一個timeout,獲取不到而阻塞的時候,如果時間到了,即使還獲取不到,也只能立即返回null。 ```java public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } } ``` ## 其他常規操作 ### boolean offer(E e) offer(E e)是非阻塞的方法,向隊尾插入一個元素,如果佇列未滿,則插入成功並返回true;如果佇列已滿則丟棄當前元素,並返回false。 ```java public boolean offer(E e) { checkNotNull(e); // 如果插入元素為null,則丟擲NullPointerException異常 // 獲取獨佔鎖 final ReentrantLock lock = this.lock; lock.lock(); try { // 如果佇列滿, 則返回false if (count == items.length) return false; else { // 否則則入隊 enqueue(e); return true; } } finally { lock.unlock(); } } ``` ### E poll() 從佇列頭部獲取並移除第一個元素,如果佇列為空則返回null。 ```java public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 如果為空,返回null, 否則執行出隊操作 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } ``` ### Boolean remove(Object o) 移除佇列中與元素o相等【指的是equals方法判定相同】的元素,移除成功返回true,如果佇列為空或沒有匹配元素,則返回false。 ```java public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count >
0) { // 獲取當前隊尾位置 final int putIndex = this.putIndex; // 從隊頭開始遍歷 int i = takeIndex; do { // 找到了對應的元素的位置,removeAt刪除該位置的元素 if (o.equals(items[i])) { removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } } // 移除removeIndex位置的元素 void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >
= 0 && removeIndex < items.length; final Object[] items = this.items; // 如果要移除元素的位置正好就是 隊頭位置,和之前出隊操作一樣 if (removeIndex == takeIndex) { // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { // an "interior" remove // slide over all others up through putIndex. final int putIndex = this.putIndex; // 移除的不是隊頭,那就要對應將後面的元素補充上來,並更新putIndex的位置 for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; // 移除的不是隊尾,後面的元素補充上來 if (next != putIndex) { items[i] = items[next]; i = next; } else { // 移除的是隊尾元素 items[i] = null; this.putIndex = i; break; } } count--; if (itrs != null) itrs.removedAt(removeIndex); } notFull.signal(); } ``` ## 總結 - ArrayBlockingQueue基於陣列的有界阻塞佇列,必須指定容量大小,及佇列中最多允許的元素個數。 - 提供了take和put兩個阻塞式的操作,還提供了阻塞式+超時機制的操作。 - 阻塞佇列通過`ReentrantLock` + `Condition`實現併發環境下的**等待通知**機制:讀操作和寫操作都需要獲取到AQS獨佔鎖才能進行操作,如果佇列為空,則讀操作執行緒將會被包裝為條件節點扔到讀執行緒等待條件佇列中阻塞,等待寫執行緒寫入新的元素,並喚醒等待中的讀執行緒,反之亦然。 本篇重點看了出隊入隊相關方法,其餘部分如迭代器相關不是本文重點,如果想了解學習可以參看:[ArrayBlockingQueue 迭代器](https://blog.csdn.net/qfanmingyiq/article/details/109367230) ## 參考閱讀 - 《Java併發程式設計之美》 - 《Java併發程式設計的