本文講ArrayBlockingQueue

1. 介紹

一個基於陣列的有界阻塞佇列,FIFO順序。支援等待消費者和生產者執行緒的可選公平策略(預設是非公平的)。公平的話通常會降低吞吐量,但是可以減少可變性並避免之前被阻塞的執行緒飢餓。

1.1 類結構

  • ArrayBlockingQueue繼承關係

  • ArrayBlockingQueue類圖

構造器

    // 預設是非公平的
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
} public ArrayBlockingQueue(int capacity, boolean fair) {
...
} public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
...
}

比較重要的幾個引數


// 儲存元素的陣列
final Object[] items; /** items index for next take, poll, peek or remove */
// 與putIndex相互配合可以將陣列變成一個可迴圈利用的陣列,不需要擴容,後面會講到
// 每次出隊的索引
int takeIndex; /** items index for next put, offer, or add */
// 每次入隊的索引
int putIndex; /** Number of elements in the queue */
int count; /**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
// 迭代的時候會用到,在後面詳講
transient Itrs itrs = null;

保證執行緒安全的措施


/** Main lock guarding all access */
final ReentrantLock lock; /** Condition for waiting takes */
private final Condition notEmpty; /** Condition for waiting puts */
private final Condition notFull;

我們可以看到ArrayBlockingQueue使用的是單鎖控制執行緒安全,而LinkedBlockingQueue雙鎖控制的, 後者的細粒度更小。

2. 原始碼剖析

ArrayBlockingQueue也是繼承至BlockingQueue(可以去看看上面提到的那篇部落格有提到BlockingQueue),它對於不同的方法不能立即滿足要求的,作出的迴應是不一樣的。

我們分別介紹下面的方法的具體實現

  • offer(E e)
  • offer(E e, long timeout, TimeUnit unit)
  • put(E e)
  • poll()
  • remove(Object o)

2.1 offer(E e) & poll()

插入成功就返回true;若佇列滿了就直接返回false,不會阻塞自己

    public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

上面的程式碼比較簡單,我們來看看入隊的具體操作

    private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x; // 為什麼putIndex+1 等於陣列長度時會變成0
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

為了解答上面註釋中的問題,我們先看看poll()的實現

    public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
    private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null; // takeIndex + 1等於了陣列的長度也會將值置為0
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

結合上面的入隊、出隊原始碼,我們來分析一下:

  • 單執行緒下,首先執行
        ArrayBlockingQueue<String> array = new ArrayBlockingQueue<>(3);
array.offer("A");
array.offer("B");
array.offer("C");

此時佇列的狀態

  • 再執行
        array.poll();
array.offer("D");

最後佇列的狀態

大家可能會有點疑問,上面的佇列不是輸出是"D B C", 咋回事? 肯定不是啦,我們看看類重寫的toString就明白了。

 public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k == 0)
return "[]"; final Object[] items = this.items;
StringBuilder sb = new StringBuilder();
sb.append('['); // 主要程式碼
for (int i = takeIndex; ; ) {
Object e = items[i];
sb.append(e == this ? "(this Collection)" : e);
if (--k == 0)
return sb.append(']').toString();
sb.append(',').append(' ');
if (++i == items.length)
i = 0;
}
} finally {
lock.unlock();
}
}

思考一下,就會明白了。

通過上面的分析,我們看出了陣列就像一個迴圈陣列一樣,每個地址都被重複使用。我們也知道了基於陣列的佇列如何實現的

offer(E e, long timeout, TimeUnit unit)put(E e)實現都比較簡單,大家看看原始碼即可。

2.2 remove(Object o)

若o存在則移除,返回true;反之。這個操作會改變佇列的結構,但是該方法一般很少使用

 public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
// 主要刪除邏輯
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove // slide over all others up through putIndex.
// 此時removeIndex != takeIndex
// 為啥要執行下面的程式碼,大家可以按照上面圖片的最後狀態,
// 按照下面程式碼走一下,就明白了.主要是設定putIndex
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}

2.3 解釋解釋Itrs

    // 當前活動迭代器的共享狀態; 允許佇列操作更新迭代器的狀態;
transient Itrs itrs = null;

這個變數可以理解成,在一個執行緒使用迭代器時,其他的執行緒可以對佇列進行更新操作的一個保障。

原始碼註釋中對Itrs的描述,迭代器和它們的佇列之間共享資料,允許在刪除元素時修改佇列以更新迭代器。 我們可以看到對佇列進行了刪除操作時,佇列都會執行下面的語句

   if (itrs != null)
itrs.removedAt(removeIndex);

初始化該值是在使用迭代器時

    public Iterator<E> iterator() {
return new Itr();
} ... Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
...
itrs = new Itrs(this);
...
}
} finally {
lock.unlock();
}
}

3. 總結

ArrayBlockingQueue的實現整體不難,使用ReetrantLock保證了執行緒安全,putIndextakeIndex分別維護入隊與出隊的位置,一起構成一個迴圈陣列