1. 程式人生 > >Java併發包原始碼學習系列:阻塞佇列BlockingQueue及實現原理分析

Java併發包原始碼學習系列:阻塞佇列BlockingQueue及實現原理分析

[toc] 系列傳送門: - [Java併發包原始碼學習系列:AbstractQueuedSynchronizer](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112254373) - [Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112301359) - [Java併發包原始碼學習系列:AQS共享式與獨佔式獲取與釋放資源的區別](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838) - [Java併發包原始碼學習系列:ReentrantLock可重入獨佔鎖詳解](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112454874) - [Java併發包原始碼學習系列:ReentrantReadWriteLock讀寫鎖解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112689635) - [Java併發包原始碼學習系列:詳解Condition條件佇列、signal和await](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112727669) - [Java併發包原始碼學習系列:掛起與喚醒執行緒LockSupport工具類](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112757098) - [Java併發包原始碼學習系列:JDK1.8的ConcurrentHashMap原始碼解析](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/113059783) ## 本篇要點 - 介紹阻塞佇列的概述:支援阻塞式插入和移除的佇列結構。 - 介紹阻塞佇列提供的方法。 - 介紹BlockingQueue介面的幾大實現類及主要特點。 - 以ArrayBlockingQueue為例介紹等待通知實現阻塞佇列的過程。 > 不會涉及到太多原始碼部分,意在對阻塞佇列章節的全域性概覽進行總結,具體的每種具體實現,之後會一一分析學習。 ## 什麼是阻塞佇列 > 阻塞佇列 = 阻塞 + 佇列。 - 佇列:一種**先進先出**的資料結構,支援尾部新增、首部移除或檢視等基礎操作。 - 阻塞:除了佇列提供的基本操作之外,還提供了支援**阻塞式插入和移除**的方式。 下面這些對BlockingQueue的介紹基本翻譯自JavaDoc,非常詳細。 1. 阻塞佇列的頂級介面是`java.util.concurrent.BlockingQueue`,它繼承了Queue,Queue又繼承自Collection介面。 2. BlockingQueue 對插入操作、移除操作、獲取元素操作提供了四種不同的方法用於不同的場景中使用:1、丟擲異常;2、返回特殊值(null 或 true/false,取決於具體的操作);3、阻塞等待此操作,直到這個操作成功;4、阻塞等待此操作,直到成功或者超時指定時間,第二節會有詳細介紹。 3. BlockingQueue不接受null的插入,否則將丟擲空指標異常,因為poll失敗了會返回null,如果允許插入null值,就無法判斷poll是否成功了。 4. BlockingQueue可能是有界的,如果在插入的時候發現佇列滿了,將會阻塞,而無界佇列則有`Integer.MAX_VALUE`大的容量,並不是真的無界。 5. BlockingQueue通常用來作為生產者-消費者的佇列的,但是它也支援Collection介面提供的方法,比如使用remove(x)來刪除一個元素,但是這類操作並不是很高效,因此儘量在少數情況下使用,如:當一條入隊的訊息需要被取消的時候。 6. BlockingQueue的實現都是執行緒安全的,所有佇列的操作或使用內建鎖或是其他形式的併發控制來保證原子。但是一些批量操作如:`addAll`,`containsAll`, `retainAll`和`removeAll`不一定是原子的。如 addAll(c) 有可能在添加了一些元素後中途丟擲異常,此時 BlockingQueue 中已經添加了部分元素。 7. BlockingQueue不支援類似close或shutdown等關閉操作。 > 下面這一段是併發大師 DougLea 寫的一段demo,使用BlockingQueue 來保證多生產者和消費者時的執行緒安全 ```java // Doug Lea: BlockingQueue 可以用來保證多生產者和消費者時的執行緒安全 class Producer implements Runnable{ private final BlockingQueue queue; Producer(BlockingQueue q){ queue = q; } public void run(){ try{ while(true) { queue.put(produce()); // 阻塞式插入 } }catch(InterruptedException ex){ ...handle... } } Object produce() { ... } } class Consumer implements Runnable{ private final BlockingQueue queue; Consumer(BlockingQueue q){ queue = q; } public void run(){ try{ while(true) { consume(queue.take())); // 阻塞式獲取 } }catch(InterruptedException ex){ ...handle... } } void consume(Object x) { ... } } class Setup{ void main(){ BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } } ``` ## 阻塞佇列提供的方法 BlockingQueue 對插入操作、移除操作、獲取元素操作提供了四種不同的方法用於不同的場景中使用: | 方法類別 | 丟擲異常 | 返回特殊值 | 一直阻塞 | 超時退出 | | -------- | --------- | ---------- | -------- | ---------------------- | | 插入 | add(e) | offer(e) | `put(e)` | `offer(e, time, unit)` | | 移除 | remove() | poll() | `take()` | `poll(time, unit)` | | 瞅一瞅 | element() | peek() | | | 博主在這邊大概解釋一下,如果佇列可用時,上面的幾種方法其實效果都差不多,但是當佇列空或滿時,會表現出部分差異: 1. 丟擲異常:當佇列滿時,如果再往佇列裡add插入元素e時,會丟擲`IllegalStateException: Queue full`的異常,如果隊空時,往佇列中取出元素【移除或瞅一瞅】會丟擲`NoSuchElementException`異常。 2. 返回特殊值:佇列滿時,offer插入失敗返回false。佇列空時,poll取出元素失敗返回null,而不是丟擲異常。 3. 一直阻塞:當佇列滿時,put試圖插入元素,將會一直阻塞插入的生產者執行緒,同理,佇列為空時,如果消費者執行緒從佇列裡take獲取元素,也會阻塞,知道佇列不為空。 4. 超時退出:可以理解為一直阻塞情況的超時版本,執行緒阻塞一段時間,會自動退出阻塞。 我們本篇的重點是阻塞佇列,那麼【一直阻塞】和【超時退出】相關的方法是我們分析的重頭啦。 ## 阻塞佇列的七種實現 ![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210126185155703-42543537.png) - ArrayBlockingQueue:由**陣列**構成的有界阻塞佇列。 - LinkedBlockingQueue:由連結串列構成的**界限可選**的阻塞佇列,如不指定邊界,則為`Integer.MAX_VALUE`。 - PriorityBlockingQueue:支援**優先順序排序**【類似於PriorityQueue的排序規則】的無界阻塞佇列。 - DelayQueue:支援**延遲獲取元素**的無界阻塞佇列。 - SynchronousQueue:**不儲存元素**的阻塞佇列,**每個插入的操作必須等待另一個執行緒進行相應的刪除操作**,反之亦然。 另外BlockingQueue有兩個繼承子介面,分別是:`TransferQueue`和`BlockingDeque`,他們有各自的實現類: - LinkedTransferQueue:由連結串列組成的無界**TransferQueue**。 - LinkedBlockingDeque:由連結串列構成的界限可選的**雙端阻塞佇列**,如不指定邊界,則為`Integer.MAX_VALUE`。 BlockingDeque比較好理解一些,支援雙端操作嘛,TransferQueue又是個啥玩意呢? ### TransferQueue和BlockingQueue的區別 BlockingQueue:當生產者向佇列新增元素但佇列已滿時,生產者會被阻塞;當消費者從佇列移除元素但佇列為空時,消費者會被阻塞。 TransferQueue則更進一步,生產者會一直阻塞直到所新增到佇列的元素被某一個消費者所消費(不僅僅是新增到佇列裡就完事)。
新新增的transfer方法用來實現這種約束。顧名思義,阻塞就是發生在元素從一個執行緒transfer到另一個執行緒的過程中,它有效地實現了元素線上程之間的傳遞(以建立Java記憶體模型中的happens-before關係的方式)。 > [併發程式設計網: Java 7中的TransferQueue](http://ifeve.com/java-transfer-queue/) ## 1、ArrayBlockingQueue ArrayBlockingQueue是由**陣列**構成的有界阻塞佇列,支援FIFO的次序對元素進行排序。 這是一個典型的有界緩衝結構,可指定大小儲存元素,供生產執行緒插入,供消費執行緒獲取,但注意,容量一旦指定,便不可修改。 佇列空時嘗試take操作和佇列滿時嘗試put操作都會阻塞執行操作的執行緒。 該類還支援可供選擇的**公平性策略**,`ReentrantLock`可重入鎖實現,預設採用非公平策略,當佇列可用時,阻塞的執行緒都可以爭奪訪問佇列的資格。 ```java // 建立採取公平策略且規定容量為10 的ArrayBlockingQueue ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10, true); ``` ## 2、LinkedBlockingQueue LinkedBlockingQueue是由連結串列構成的**界限可選**的阻塞佇列,如不指定邊界,則為`Integer.MAX_VALUE`,因此如不指定邊界,一般來說,插入的時候都會成功。 LinkedBlockingQueue支援FIFO先進先出的次序對元素進行排序。 ```java public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); } ``` ## 3、PriorityBlockingQueue PriorityBlockingQueue是一個**支援優先順序的無界阻塞佇列**,基於陣列的二叉堆,其實就是執行緒安全的`PriorityQueue`。 預設情況下元素採取自然順序升序排列,也可以自定義類實現`compareTo()`方法來指定元素排序規則,或者初始化PriorityBlockingQueue時,指定構造引數Comparator來對元素進行排序。 需要注意的是如果兩個物件的優先順序相同(`compare` 方法返回 0),此佇列並不保證它們之間的順序。 PriorityBlocking可以傳入一個初始容量,其實也就是底層陣列的最小容量,之後會使用grow擴容。 ```java // 這裡傳入10是初始容量,之後會擴容啊,無界的~ , 後面引數可以傳入比較規則,可以用lambda表示式哦 PriorityQueue priorityQueue = new PriorityQueue<>(10, new Comparator() { @Override public int compare (Integer o1, Integer o2) { return 0; } }); ``` ## 4、DelayQueue DelayQueue是一個**支援延時獲取元素**的無界阻塞佇列,使用PriorityQueue來儲存元素。 隊中的元素必須實現`Delayed`介面【Delay介面又繼承了Comparable,需要實現compareTo方法】,每個元素都需要指明過期時間,通過`getDelay(unit)`獲取元素剩餘時間【剩餘時間 = 到期時間 - 當前時間】。 當從佇列獲取元素時,只有過期的元素才會出佇列。 ```java static class DelayedElement implements Delayed { private final long delayTime; // 延遲時間 private final long expire; // 到期時間 private final String taskName; // 任務名稱 public DelayedElement (long delayTime, String taskName) { this.delayTime = delayTime; this.taskName = taskName; expire = now() + delayTime; } // 獲取當前時間 final long now () { return System.currentTimeMillis(); } // 剩餘時間 = 到期時間 - 當前時間 @Override public long getDelay (TimeUnit unit) { return unit.convert(expire - now(), TimeUnit.MILLISECONDS); } // 靠前的元素是最快過期的元素 @Override public int compareTo (Delayed o) { return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } } ``` ## 5、SynchronousQueue SynchronousQueue是一個**不儲存元素**的阻塞佇列,**每個插入的操作必須等待另一個執行緒進行相應的刪除操作**,反之亦然,因此這裡的Synchronous指的是讀執行緒和寫執行緒需要同步,一個讀執行緒匹配一個寫執行緒
。 你不能在該佇列中使用peek方法,因為peek是隻讀取不移除,不符合該佇列特性,該佇列不儲存任何元素,資料必須從某個寫執行緒交給某個讀執行緒,而不是在佇列中等待倍消費,非常適合傳遞性場景。 SynchronousQueue的吞吐量高於LinkedBlockingQueue和ArrayBlockingQueue。 該類還支援可供選擇的**公平性策略**,預設採用非公平策略,當佇列可用時,阻塞的執行緒都可以爭奪訪問佇列的資格。 ```java public SynchronousQueue() { this(false); } // 公平策略使用TransferQueue 實現, 非公平策略使用TransferStack 實現 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); } ``` ## 6、LinkedTransferQueue LinkedTransferQueue是由連結串列組成的無界**TransferQueue**,相對於其他阻塞佇列,多了tryTransfer和transfer方法。 TransferQueue:生產者會一直阻塞直到所新增到佇列的元素被某一個消費者所消費(不僅僅是新增到佇列裡就完事)。
新新增的transfer方法用來實現這種約束。顧名思義,阻塞就是發生在元素從一個執行緒transfer到另一個執行緒的過程中,它有效地實現了元素線上程之間的傳遞(以建立Java記憶體模型中的happens-before關係的方式)。 ## 7、LinkedBlockingDeque LinkedBlockingDeque是由連結串列構成的界限可選的**雙端阻塞佇列**,支援從兩端插入和移除元素,如不指定邊界,則為`Integer.MAX_VALUE`。 ![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210126185207610-1913906488.png) ## 阻塞佇列的實現機制 > 本文不會過於詳盡地解析每個阻塞佇列原始碼實現,但會總結通用的阻塞佇列的實現機制。 以阻塞佇列介面BlockingQueue為例,我們以其中新增的阻塞相關的兩個方法為主要解析物件,put和take方法。 - put:如果佇列已滿,生產者執行緒便一直阻塞,直到佇列不滿。 - take:如果佇列已空,消費者執行緒便開始阻塞,直到佇列非空。 其實我們之前在學習Condition的時候已經透露過一些內容,這裡利用ReentrantLock實現鎖語義,通過鎖關聯的condition條件佇列來靈活地實現**等待通知**機制。 之前已經詳細地學習過:[Java併發包原始碼學習系列:詳解Condition條件佇列、signal和await](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112727669) ```java public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; // 初始化ReentrantLock lock = new ReentrantLock(fair); // 建立條件物件 notEmpty = lock.newCondition(); notFull = lock.newCondition(); } ``` ### put方法 ```java public void put(E e) throws InterruptedException { // 不能加null 啊 checkNotNull(e); final ReentrantLock lock = this.lock; // 可響應中斷地獲取鎖 lock.lockInterruptibly(); try { // 如果佇列滿了 notFull陷入阻塞,直到signal while (count == items.length) notFull.await(); // 如果佇列沒滿,執行入隊操作 enqueue(e); } finally { // 解鎖 lock.unlock(); } } // 入隊操作 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 注意這裡, 入隊操作成功之後,此時佇列非空, 則喚醒notEmpty佇列中的節點 notEmpty.signal(); } ``` ### take方法 ```java public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 可響應中斷地獲取鎖 lock.lockInterruptibly(); try { // 如果佇列為空, notWait陷入阻塞,直到被signal while (count == 0) notEmpty.await(); // 出隊操作 return dequeue(); } finally { // 解鎖 lock.unlock(); } } // 出隊操作 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 注意這裡, 出隊成功之後, 佇列非滿, 則喚醒notFull中的節點 notFull.signal(); return x; } ``` Condition的await()方法會**將執行緒包裝為等待節點,加入等待佇列中,並將AQS同步佇列中的節點移除**,接著不斷檢查`isOnSyncQueue(Node node)`,如果在等待佇列中,就一直等著,如果signal將它移到AQS佇列中,則退出迴圈。 Condition的signal()方法則是先檢查當前執行緒是否獲取了鎖,接著將等待佇列中的節點通過Node的操作**直接**加入AQS佇列。執行緒並不會立即獲取到資源,從while迴圈退出後,會通過acquireQueued方法加入獲取同步狀態的競爭中。 而上述描述的執行緒等待or阻塞則是通過`LockSupport`的park和unpark方法具體實現,具體可以參考AQS和LockSupport相關內容: - [Java併發包原始碼學習系列:掛起與喚醒執行緒LockSupport工具類](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112757098) ## 參考閱讀 - 方騰飛《Java併發程式設計的藝術》 - 《Java併發程式設計之美》 - [併發程式設計網: Java 7中的TransferQueue](http://ifeve.com/java-transfer-queue/) - [javadoop : 解讀 java 併發佇列 BlockingQueue](https://javadoop.com/post/java-concurrent-queue) - [Java併發包原始碼學習系列:詳解Condition條件佇列、signal和await](https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/11