J.U.C 之阻塞佇列:ArrayBlockingQueue
阿新 • • 發佈:2018-12-14
1. 簡介
ArrayBlockingQueue,一個由陣列實現的有界阻塞佇列。該佇列採用 FIFO 的原則對元素進行排序新增的。
ArrayBlockingQueue 為有界且固定,其大小在構造時由建構函式來決定,確認之後就不能再改變了。
ArrayBlockingQueue 支援對等待的生產者執行緒和使用者執行緒進行排序的可選公平策略,但是在預設情況下不保證執行緒公平的訪問,在構造時可以選擇公平策略(fair = true
)。公平性通常會降低吞吐量,但是減少了可變性和避免了“不平衡性”。
2. 構造方法
先看看 java.util.concurrent.ArrayBlockingQueue
|
- 可以清楚地看到 ArrayBlockingQueue 繼承
java.util.AbstractQueue
,實現java.util.concurrent.BlockingQueue
介面。看過java.util
包原始碼的同學應該都認識AbstractQueue,該類在java.util.Queue
介面中扮演著非常重要的作用,該類提供了對queue 操作的骨幹實現(具體內容移駕其原始碼)。 java.util.concurrent.BlockingQueue
繼承java.util.Queue
介面,為阻塞佇列的核心介面,提供了在多執行緒環境下的出列、入列操作。作為使用者,則不需要關心佇列在什麼時候阻塞執行緒,什麼時候喚醒執行緒,所有一切均由 BlockingQueue 來完成。-
ArrayBlockingQueue 內部使用可重入鎖 ReentrantLock + Condition 來完成多執行緒環境的併發操作。
items
變數,一個定長陣列,維護 ArrayBlockingQueue 的元素。takeIndex
變數,int
,為 ArrayBlockingQueue 隊首位置。putIndex
變數,int
,ArrayBlockingQueue 隊尾位置。count
變數,元素個數。
lock
變數,ReentrantLock ,ArrayBlockingQueue 出列入列都必須獲取該鎖,兩個步驟共用一個鎖。notEmpty
變數,非空,即出列條件。notFull
變數,未滿,即入列條件。
3. 入隊
ArrayBlockingQueue 提供了諸多方法,可以將元素加入佇列尾部。
#add(E e)
方法:將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量),在成功時返回 true ,如果此佇列已滿,則丟擲 IllegalStateException 異常。#offer(E e)
方法:將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量),在成功時返回 true ,如果此佇列已滿,則返回 false 。#offer(E e, long timeout, TimeUnit unit)
方法:將指定的元素插入此佇列的尾部,如果該佇列已滿,則在到達指定的等待時間之前等待可用的空間。#put(E e)
方法:將指定的元素插入此佇列的尾部,如果該佇列已滿,則等待可用的空間。
3.1 add
|
#add(E e)
方法,呼叫#offer(E e)
方法,如果返回false,則直接丟擲 IllegalStateException 異常。
3.2 offer
|
- 首先,檢查是否為
null
。 - 然後,獲取 Lock 鎖。獲取鎖成功後,如果佇列已滿則,直接返回 false 。
- 最後,呼叫
#enqueue(E e)
方法,它為入列的核心方法,所有入列的方法最終都將呼叫該方法,在佇列尾部插入元素。
3.2.1 enqueue
|
- 該方法就是在
putIndex
(對尾)位置處,新增元素,最後呼叫notEmpty
的#signal()
方法,通知阻塞在出列的執行緒(如果佇列為空,則進行出列操作是會阻塞)。
3.3 可超時的 offer
|
- 相比
#offer(E e)
方法,增加了<1>
處:- 若佇列已滿,呼叫
notFull
的#awaitNanos(long nanos)
方法,等待被通知(元素出列時,會呼叫notFull
的#signal()
方法,進行通知阻塞等待的入列執行緒)或者超時。 - 被通知後,再次檢查佇列是否非空。若非空,繼續向下執行,否則繼續等待被通知。
- 若佇列已滿,呼叫
3.4 put
|
- 相比
#offer(E e)
方法,增加了<1>
處:- 若佇列已滿,呼叫
notFull
的#await()
方法,等待被通知(元素出列時,會呼叫notFull
的#await()
方法,進行通知阻塞等待的入列執行緒)。 - 被通知後,再次檢查佇列是否非空。若非空,繼續向下執行,否則繼續等待被通知。
- 若佇列已滿,呼叫
4. 出隊
ArrayBlockingQueue 提供的出隊方法如下:
#poll()
方法:獲取並移除此佇列的頭,如果此佇列為空,則返回null
。#poll(long timeout, TimeUnit unit)
方法:獲取並移除此佇列的頭部,在指定的等待時間前等待可用的元素(如果有必要)。#take()
方法:獲取並移除此佇列的頭部,在元素變得可用之前一直等待(如果有必要)。#remove(Object o)
方法:從此佇列中移除指定元素的單個例項(如果存在)。
4.1 poll
|
- 如果佇列為空,則返回
null
,否則,呼叫#dequeue()
方法,獲取列頭元素。
3.1.1 dequeue
|
- 該方法主要是從列頭(
takeIndex
位置)取出元素,同時如果迭代器itrs
不為null
,則需要維護下該迭代器。最後,呼叫notFull
的#signal()
方法,喚醒阻塞在入列執行緒。
4.2 可超時的 poll
|
- 相比
#poll()
方法,增加了<1>
處:- 若佇列已空,呼叫
notEmpty
的#awaitNanos(long nanos)
方法,等待被通知(元素入列時,會呼叫notEmpty
的#signal()
方法,進行通知阻塞等待的出列執行緒)或者超時返回 null。 - 被通知後,再次檢查佇列是否為空。若非空,繼續向下執行,否則繼續等待被通知。
- 若佇列已空,呼叫
4.3 take
|
- 相比
#poll()
方法,增加了<1>
處:- 若佇列已空,呼叫
notEmpty
的#await()
方法,等待被通知(元素入列時,會呼叫notEmpty
的#signal()
方法,進行通知阻塞等待的出列執行緒)。 - 被通知後,再次檢查佇列是否為空。若非空,繼續向下執行,否則繼續等待被通知。
- 若佇列已空,呼叫
4.4 remove
|
- 詳細解析,見程式碼註釋。
-
#removeAt(int removeIndex)
方法,移除指定位置的元素。程式碼如下:void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items; // 移除的為隊頭,直接移除即可 if (removeIndex == takeIndex) { // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 移除非隊頭,移除的同時,需要向前複製,填補這個空缺。 } else { // an "interior" remove // slide over all others up through putIndex. final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; if (next != putIndex) { items[i] = items[next]; i = next; } else { items[i] = null; this.putIndex = i; break; } } count--; if (itrs != null) itrs.removedAt(removeIndex); } // 通知 notFull.signal(); }
5. 補充說明
老艿艿:因為本文的重心在 ArrayBlockingQueue 的入隊和出隊,所以其他方法,例如迭代器等等,並未解析。所以,胖友,你懂的,自己研究哈。