1. 程式人生 > >J.U.C 之阻塞佇列:ArrayBlockingQueue

J.U.C 之阻塞佇列:ArrayBlockingQueue

1. 簡介

ArrayBlockingQueue,一個由陣列實現的有界阻塞佇列。該佇列採用 FIFO 的原則對元素進行排序新增的。

ArrayBlockingQueue 為有界且固定,其大小在構造時由建構函式來決定,確認之後就不能再改變了。

ArrayBlockingQueue 支援對等待的生產者執行緒和使用者執行緒進行排序的可選公平策略,但是在預設情況下不保證執行緒公平的訪問,在構造時可以選擇公平策略(fair = true)。公平性通常會降低吞吐量,但是減少了可變性和避免了“不平衡性”。

2. 構造方法

先看看 java.util.concurrent.ArrayBlockingQueue

的構造方法,程式碼如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {

    private static final long serialVersionUID = -817911632652898426L;
    
    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    // 重入鎖
    final ReentrantLock lock;
    // notEmpty condition
    private final Condition notEmpty;
    // notFull condition
    private final Condition notFull;
    transient ArrayBlockingQueue.Itrs itrs;
    
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
}
  • 可以清楚地看到 ArrayBlockingQueue 繼承 java.util.AbstractQueue ,實現 java.util.concurrent.BlockingQueue 介面。看過 java.util 包原始碼的同學應該都認識AbstractQueue,該類在 java.util.Queue 介面中扮演著非常重要的作用,該類提供了對queue 操作的骨幹實現(具體內容移駕其原始碼)。
  • java.util.concurrent.BlockingQueue 繼承 java.util.Queue 介面,為阻塞佇列的核心介面,提供了在多執行緒環境下的出列、入列操作。作為使用者,則不需要關心佇列在什麼時候阻塞執行緒,什麼時候喚醒執行緒,所有一切均由 BlockingQueue 來完成。
  • ArrayBlockingQueue 內部使用可重入鎖 ReentrantLock + Condition 來完成多執行緒環境的併發操作。

    • items 變數,一個定長陣列,維護 ArrayBlockingQueue 的元素。
      • takeIndex 變數,int ,為 ArrayBlockingQueue 隊首位置。
      • putIndex 變數,int ,ArrayBlockingQueue 隊尾位置。
      • count 變數,元素個數。
    • lock 變數,ReentrantLock ,ArrayBlockingQueue 出列入列都必須獲取該鎖,兩個步驟共用一個鎖。
      • notEmpty 變數,非空,即出列條件。
      • notFull 變數,未滿,即入列條件。

3. 入隊

ArrayBlockingQueue 提供了諸多方法,可以將元素加入佇列尾部。

  • #add(E e) 方法:將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量),在成功時返回 true ,如果此佇列已滿,則丟擲 IllegalStateException 異常。
  • #offer(E e) 方法:將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量),在成功時返回 true ,如果此佇列已滿,則返回 false 。
  • #offer(E e, long timeout, TimeUnit unit) 方法:將指定的元素插入此佇列的尾部,如果該佇列已滿,則在到達指定的等待時間之前等待可用的空間。
  • #put(E e) 方法:將指定的元素插入此佇列的尾部,如果該佇列已滿,則等待可用的空間。

3.1 add

// ArrayBlockingQueue.java
@Override
public boolean add(E e) {
    return super.add(e);
}

// AbstractQueue.java
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
  • #add(E e)方法,呼叫 #offer(E e) 方法,如果返回false,則直接丟擲 IllegalStateException 異常。

3.2 offer

@Override
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
  • 首先,檢查是否為 null
  • 然後,獲取 Lock 鎖。獲取鎖成功後,如果佇列已滿則,直接返回 false 。
  • 最後,呼叫 #enqueue(E e) 方法,它為入列的核心方法,所有入列的方法最終都將呼叫該方法,在佇列尾部插入元素。

3.2.1 enqueue

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    // 新增元素
    final Object[] items = this.items;
    items[putIndex] = x;
    // 到達隊尾,迴歸隊頭
    if (++putIndex == items.length)
        putIndex = 0;
    // 總數+1
    count++;
    // 通知阻塞在出列的執行緒
    notEmpty.signal();
}
  • 該方法就是在 putIndex(對尾)位置處,新增元素,最後呼叫 notEmpty#signal() 方法,通知阻塞在出列的執行緒(如果佇列為空,則進行出列操作是會阻塞)。

3.3 可超時的 offer

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 獲得鎖
    lock.lockInterruptibly();
    try {
        // <1> 若佇列已滿,迴圈等待被通知,再次檢查佇列是否非空
        while (count == items.length) {
            // 可等待的時間小於等於零,直接返回失敗
            if (nanos <= 0)
                return false;
            // 等待,直到超時
            nanos = notFull.awaitNanos(nanos); // 返回的為剩餘可等待時間,相當於每次等待,都會扣除相應已經等待的時間。
        }
        // 入隊
        enqueue(e);
        return true;
    } finally {
        // 解鎖
        lock.unlock();
    }
}
  • 相比 #offer(E e) 方法,增加了 <1> 處:
    • 若佇列已滿,呼叫 notFull#awaitNanos(long nanos) 方法,等待被通知(元素出列時,會呼叫 notFull#signal() 方法,進行通知阻塞等待的入列執行緒)或者超時
    • 被通知後,再次檢查佇列是否非空。若非空,繼續向下執行,否則繼續等待被通知。

3.4 put

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 獲得鎖
    lock.lockInterruptibly();
    try {
        // <1> 若佇列已滿,迴圈等待被通知,再次檢查佇列是否非空
        while (count == items.length)
            notFull.await();
        // 入隊
        enqueue(e);
    } finally {
        // 解鎖
        lock.unlock();
    }
}
  • 相比 #offer(E e) 方法,增加了 <1> 處:
    • 若佇列已滿,呼叫 notFull#await() 方法,等待被通知(元素出列時,會呼叫 notFull#await() 方法,進行通知阻塞等待的入列執行緒)。
    • 被通知後,再次檢查佇列是否非空。若非空,繼續向下執行,否則繼續等待被通知。

4. 出隊

ArrayBlockingQueue 提供的出隊方法如下:

  • #poll() 方法:獲取並移除此佇列的頭,如果此佇列為空,則返回 null
  • #poll(long timeout, TimeUnit unit) 方法:獲取並移除此佇列的頭部,在指定的等待時間前等待可用的元素(如果有必要)。
  • #take() 方法:獲取並移除此佇列的頭部,在元素變得可用之前一直等待(如果有必要)。
  • #remove(Object o) 方法:從此佇列中移除指定元素的單個例項(如果存在)。

4.1 poll

public E poll() {
    // 獲得鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 獲得頭元素
        return (count == 0) ? null : dequeue();
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}
  • 如果佇列為空,則返回 null,否則,呼叫 #dequeue() 方法,獲取列頭元素。

3.1.1 dequeue

 private E dequeue() {
    final Object[] items = this.items;
    // 去除隊首元素
    E x = (E) items[takeIndex];
    items[takeIndex] = null; // 置空
    // 到達隊尾,迴歸隊頭
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 總數 - 1
    count--;
    // 維護下迭代器
    if (itrs != null)
        itrs.elementDequeued();
    // 通知阻塞在入列的執行緒
    notFull.signal();
    return x;
}
  • 該方法主要是從列頭(takeIndex 位置)取出元素,同時如果迭代器 itrs 不為 null ,則需要維護下該迭代器。最後,呼叫 notFull#signal() 方法,喚醒阻塞在入列執行緒。

4.2 可超時的 poll

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 獲得鎖
    lock.lockInterruptibly();
    try {
        // <1> 若佇列已空,迴圈等待被通知,再次檢查佇列是否非空
        while (count == 0) {
            // 可等待的時間小於等於零,直接返回 null
            if (nanos <= 0)
                return null;
            // 等待,直到超時
            nanos = notEmpty.awaitNanos(nanos); // 返回的為剩餘可等待時間,相當於每次等待,都會扣除相應已經等待的時間。
        }
        // 出隊
        return dequeue();
    } finally {
        // 解鎖
        lock.unlock();
    }
}
  • 相比 #poll() 方法,增加了 <1> 處:
    • 若佇列已空,呼叫 notEmpty#awaitNanos(long nanos) 方法,等待被通知(元素入列時,會呼叫 notEmpty#signal() 方法,進行通知阻塞等待的出列執行緒)或者超時返回 null
    • 被通知後,再次檢查佇列是否為空。若非空,繼續向下執行,否則繼續等待被通知。

4.3 take

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 獲得鎖
    lock.lockInterruptibly();
    try {
        // <1> 若佇列已空,迴圈等待被通知,再次檢查佇列是否非空
        while (count == 0)
            notEmpty.await();
        // 出列
        return dequeue();
    } finally {
        // 解鎖
        lock.unlock();
    }
}
  • 相比 #poll() 方法,增加了 <1> 處:
    • 若佇列已空,呼叫 notEmpty#await() 方法,等待被通知(元素入列時,會呼叫 notEmpty#signal() 方法,進行通知阻塞等待的出列執行緒)。
    • 被通知後,再次檢查佇列是否為空。若非空,繼續向下執行,否則繼續等待被通知。

4.4 remove

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    // 獲得鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            // 迴圈向下查詢,若匹配,則進行移除。
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}
  • 詳細解析,見程式碼註釋。
  • #removeAt(int removeIndex) 方法,移除指定位置的元素。程式碼如下:

    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        // 移除的為隊頭,直接移除即可
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        // 移除非隊頭,移除的同時,需要向前複製,填補這個空缺。
        } else {
            // an "interior" remove
    
            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        // 通知
        notFull.signal();
    }
    

5. 補充說明

老艿艿:因為本文的重心在 ArrayBlockingQueue 的入隊和出隊,所以其他方法,例如迭代器等等,並未解析。所以,胖友,你懂的,自己研究哈。