1. 程式人生 > >Java併發包下的阻塞佇列

Java併發包下的阻塞佇列

本文簡要介紹一下什麼是阻塞佇列,Java併發包給我們提供的阻塞佇列有哪些,以及怎麼去簡單使用

阻塞佇列 BlockingQueue

1. 簡單概念

 阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列:支援阻塞的插入和移除:

image

  • 支援阻塞的插入方法:當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列變為不滿

  • 支援阻塞的移除方法:當佇列為空時,獲取元素的執行緒會等待佇列變為非空

阻塞佇列的使用場景: 常用於生產者和消費者的場景,生產者是向佇列裡新增元素的執行緒,消費者是從佇列裡取元素的執行緒。阻塞佇列就是生產者用來存放元素、生產者用來獲取元素的容器

2. API介紹

BlockingQueue提供子類實現的幾個API如下:

  • add()和remove()
  • offer()和poll()
  • put()和take()
// 將指定元素插入佇列,成功返回true;不成功返回false
boolean add(E e);

// 從佇列中移出指定元素並返回該元素
boolean remove(Object o);

// 將指定元素插入佇列,在使用有容量限制的佇列時,該方法優於add()
boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

// 獲取佇列的頭元素並將其從佇列中移除,如果佇列為空返回null     
E poll(long timeout, TimeUnit unit) throws InterruptedException; // 將指定的元素插入佇列中 void put(E e) throws InterruptedException; // 獲取佇列的頭元素並將其從佇列中移除 E take() throws InterruptedException;

阻塞佇列的方法總結如下:

方法/處理方式 丟擲異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove(e) poll() take() poll(time, unit)
  • 丟擲異常:當佇列滿時往佇列裡面插入元素,會丟擲 IllegalStateException 異常;當佇列為空時從佇列裡面獲取元素會丟擲 NoSuchElementException 異常

  • 返回特殊值:當往佇列裡插入元素時會返回元素是否插入成功,成功則返回true;移除方法,會獲取佇列的頭元素並將其從佇列中移除,如果佇列為空返回null

  • 一直阻塞:當佇列滿時,如果生產者執行緒往佇列裡面put元素,佇列會一直阻塞生產者執行緒,直到佇列可用或者響應中斷退出;當佇列為空時,如果消費者執行緒從佇列裡take元素,則佇列會阻塞住消費者執行緒,直到佇列不為空。

  • 超時退出:當阻塞佇列滿時,如果生產者執行緒往佇列裡面插入元素,佇列會阻塞生產者執行緒一段時間,超過指定時間後生產者執行緒將會退出

注意: 上面所說的佇列滿時插入元素的執行緒阻塞是針對有界佇列的,如果是無界佇列,佇列不會出現滿的情況,所以使用put或offer方法永遠不會被阻塞,而且使用offer()方法會永遠返回true。

BlockingQueue的實現類——七大阻塞佇列

下圖是BlockingQueue的繼承體系:

image

  • ArrayBlockingQueue:一個由陣列實現的有界阻塞佇列
  • LinkedBlockingQueue:一個由連結串列實現的有界阻塞佇列
  • LinkedBlockingDeque:一個由連結串列實現的雙向有界阻塞佇列
  • PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列
  • SynchronousQueue:一個不儲存元素的阻塞佇列
  • LinkedTransferQueue:一個由連結串列實現的無界阻塞佇列
  • DelayQueue:一個由優先順序佇列實現的無界阻塞佇列
1. ArrayBlockingQueue

 ArrayBlockingQueue是一個由陣列實現的有界阻塞佇列。按照FIFO的原則對元素進行排序。佇列使用可重入鎖(ReentrantLock)對同步資源加鎖,預設情況下不保證執行緒公平的訪問佇列(非公平鎖)。下面是ArrayBlockingQueue原始碼中的幾個成員變數:

// 儲存佇列元素的陣列
final Object[] items;

// 將要取出元素的索引
int takeIndex;

// 將要新增元素的索引
int putIndex;

// 佇列中已新增元素的數量
int count;

// 可重入鎖
final ReentrantLock lock;

// 取元素執行緒等待佇列:佇列非空可獲取
private final Condition notEmpty;

// 插入元素執行緒等待佇列:佇列非滿可插入
private final Condition notFull;

如下是ArrayBlockingQueue的構造器原始碼:

// 傳入一個容量
public ArrayBlockingQueue(int capacity) {
    // 預設是非公平的
    this(capacity, false);
}

// 可以傳入一個boolean值,true是公平鎖
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

分析ArrayBlockingQueue的put()和take()方法的原始碼:

  • put() 方法
// 插入元素
public void put(E e) throws InterruptedException {
    // 檢查摻入的元素是否為空,如果為空丟擲NullPointerException異常
    checkNotNull(e);
    // 獲取可重入鎖
    final ReentrantLock lock = this.lock;
    // 可響應中斷
    lock.lockInterruptibly();
    try {
        // 如果佇列已滿,則摻入元素的執行緒等待,直到佇列不滿時由獲取元素執行緒喚醒
        while (count == items.length)
            // 等待
            notFull.await();
        // 佇列不滿,插入元素
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

// 向佇列中插入元素
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 喚醒取元素執行緒,可以取元素了
    notEmpty.signal();
}
  • take() 方法
// 獲取元素
public E take() throws InterruptedException {
    // 獲得鎖
    final ReentrantLock lock = this.lock;
    // 響應中斷
    lock.lockInterruptibly();
    try {
        // 當佇列為空,獲取元素的執行緒等待,直到佇列不為空時由插入元素執行緒喚醒
        while (count == 0)
            // 等待
            notEmpty.await();
        // 返回取到的元素
        return dequeue();
    } finally {
        lock.unlock();
    }
}

// 從佇列中獲取元素
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    喚醒等待的插入元素執行緒,可以插入元素了
    notFull.signal();
    return x;
}

使用ArrayBlockingQueue實現一個簡單的生產消費場景:

public class TestArrayBlockingQueue {

    public static void main(String[] args) {
        // 容量為3的阻塞佇列
        final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);

        // 兩個生產者執行緒生產資料
        for (int i = 0; i < 2; i++) {
            new Thread() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            TimeUnit.SECONDS.sleep(1);
                            System.out.println(Thread.currentThread().getName() + " 準備插入資料 " +
                                    (queue.size() == 3 ? "...佇列已滿,正在等待..." : "..."));
                            queue.put(1);
                            System.out.println(Thread.currentThread().getName() + " 插入資料," +
                                    "佇列目前有 " + queue.size() + " 個數據.");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
        }
        
        // 一個消費者執行緒消費資料
        new Thread() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName() + " 準備取出資料 " +
                            (queue.size() == 0 ? "...佇列已空,正在等待..." : "..."));
                    queue.take();
                    System.out.println(Thread.currentThread().getName() + " 取出資料," +
                            "佇列目前有 " + queue.size() + " 個數據.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }
}

結果如下:
Thread-2 準備取出資料 ...佇列已空,正在等待...
Thread-0 準備插入資料 ...
Thread-1 準備插入資料 ...
Thread-1 插入資料,佇列目前有 2 個數據.
Thread-0 插入資料,佇列目前有 2 個數據.
Thread-2 取出資料,佇列目前有 1 個數據.
Thread-0 準備插入資料 ...
Thread-1 準備插入資料 ...
Thread-0 插入資料,佇列目前有 2 個數據.
Thread-1 插入資料,佇列目前有 3 個數據.
Thread-0 準備插入資料 ...佇列已滿,正在等待...
Thread-1 準備插入資料 ...佇列已滿,正在等待...
2. LinkedBlockingQueue

 LinkedBlockingQueue 是一個由連結串列實現的有界阻塞佇列。佇列按照FIFO的原則對元素進行排序,佇列預設的最大長度是:Integer.MAX_VALUE。該佇列也是使用 ReentrantLock 對同步資源加鎖。線上程池中,LinkedBlockingQueue是FixedThreadPool和SingleThreadExecutor的工作佇列。下面是它的構造方法原始碼:

// 預設構造器
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

// 可設定容量的構造器
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
3. LinkedBlockingDeque

 LinkedBlockingDeque 是一個由連結串列結構組成的雙向有界阻塞佇列:可以從佇列的兩端插入和移除元素。雙向佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,減少了一半的競爭。該佇列也是使用 ReentrantLock 對同步資源加鎖。LinkedBlockingDeque 與其他阻塞佇列相比,多了 addFist、addLast、offerFist、offerLast、peekFirst、peekLast等方法:以First結尾的方法表示插入、獲取或移除雙向佇列的第一個元素;以Last結尾的方法表示插入、獲取或移除雙向佇列的最後一個元素。下面是其構造器原始碼:

// 預設構造器
public LinkedBlockingDeque() {
    this(Integer.MAX_VALUE);
}

// 可設定容量的構造器
public LinkedBlockingDeque(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}
4. PriorityBlockingQueue

 PriorityBlockingQueue 是一個支援優先順序的無界阻塞佇列:預設情況下元素採取升序排列,也可以通過自定義類實現compareTo()方法來制定元素的排序規則,或者在初始化時制定構造引數Comparator來對元素進行排序。該佇列也是使用 ReentrantLock 對同步資源加鎖。下面是其構造器原始碼:

// 預設構造器
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

// 定製化排序
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}
5. SynchronousQueue

 SynchronousQueue 是一個不儲存元素的阻塞佇列:每一個put操作必須等待一個take操作,否則不能繼續新增元素。該佇列也是使用 ReentrantLock 對同步資源加鎖,預設情況下執行緒採用非公平策略訪問佇列。SynchronousQueue 的身份類似於一箇中轉者,負責把生產者執行緒處理的資料直接傳遞給消費者執行緒,佇列本身並不儲存任何元素,非常適合傳遞性場景。

6. LinkedTransferQueue

 LinkedTransferQueue 是一個由連結串列組成的無界阻塞佇列。該佇列也是使用 ReentrantLock 對同步資源加鎖。相比於其他佇列,該佇列多了tryTransfer和transfer方法:

  • transfer方法:如果當前消費者正在等待接收元素,transfer方法可以把生產者傳入的元素立刻傳輸給消費者。如果沒有消費者等待接收元素,transfer方法會將元素存放在佇列的tail節點(尾節點),並等到該元素被消費者消費了才返回。

  • tryTransfer方法:用來試探生產者傳入的元素是否能直接傳遞給消費者,如果沒有消費者等待接收,則返回false。與transfer方法的區別在於:無論消費者是否接收都會立即返回,而transfer方法是必須等到消費者消費了才返回。

7. DelayQueue

 DelayQueue 是一個支援支援延時獲取元素的無界阻塞佇列:佇列使用PriorityQueue來實現。該佇列也是使用 ReentrantLock 對同步資源加鎖。佇列中的元素必須實現 Delayed 介面,在建立元素時可以指定多久才能從佇列中獲取當前元素,只有在延時期滿時才能從佇列中提取元素。

DelayQueue的使用場景:

  • 快取系統的設計:使用 DelayQueue 儲存快取元素的有效期,使用一個執行緒迴圈查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示快取有效期已到。

  • 定時任務排程:使用 DelayQueue 儲存當天將會執行的任務和執行時間,一旦從 DelayQueue 中獲取到任務就開始執行。

參考

《Java併發程式設計的藝術》

</font