1. 程式人生 > >不忘初心,方能始終。

不忘初心,方能始終。

什麼是阻塞佇列

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞的插入和移除方法。

  • 1)支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿。

  • 2)支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。

阻塞佇列常用於生產者和消費者的場景,生產者是向佇列裡新增元素的執行緒,消費者是從佇列裡取元素的執行緒。阻塞佇列就是生產者用來存放元素、消費者用來獲取元素的容器。

方法 丟擲異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
檢查方法 element() peek() 不可用 不可用
  • 丟擲異常:當佇列滿時,如果再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列空時,從佇列裡獲取元素會丟擲NoSuchElementException異常。

  • 返回特殊值:當往佇列插入元素時,會返回元素是否插入成功,成功返回true。如果是移除方法,則是從佇列裡取出一個元素,如果沒有則返回null。

  • 一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到佇列可用或者響應中斷退出。當佇列空時,如果消費者執行緒從佇列裡take元素,佇列會阻塞住消費者執行緒,直到佇列不為空。

  • 超時退出:當阻塞佇列滿時,如果生產者執行緒往佇列裡插入元素,佇列會阻塞生產者執行緒一段時間,如果超過了指定的時間,生產者執行緒就會退出。這兩個附加操作的4種處理方式不方便記憶,所以我找了一下這幾個方法的規律。put和take分別尾首含有字母t,offer和poll都含有字母o。

注意:如果是無界阻塞佇列,佇列不可能會出現滿的情況,所以使用put或offer方法永遠不會被阻塞,而且使用offer方法時,該方法永遠返回true。

Java裡的阻塞佇列

  • ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列。
  • PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。
  • DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列。
  • LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

SynchronousQueue

SynchronousQueue 是一個不儲存元素的阻塞佇列。每一個put操作必須等待一個take操作,否則不能繼續新增元素。

它支援公平訪問佇列。預設情況下執行緒採用非公平性策略訪問佇列。使用以下構造方法可以建立公平性訪問的SynchronousQueue,如果設定為true,則等待的執行緒會採用先進先出的順序訪問佇列。

SynchronousQueue可以看成是一個傳球手,負責把生產者執行緒處理的資料直接傳遞給消費者執行緒。佇列本身並不儲存任何元素,非常適合傳遞性場景。

實現原理

使用通知模式實現。所謂通知模式,就是當生產者往滿的佇列裡新增元素時會阻塞住生產者,當消費者消費了一個佇列中的元素後,會通知生產者當前佇列可用。

以 JDk 的 ArrayBlockingQueue 為例:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    // ......
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }
        
}

當往佇列裡插入一個元素時,如果佇列不可用(已滿),那麼阻塞生產者主要通過 condition 的 await() 方法,最終是呼叫 LockSupport.park(this)來實現。

park這個方法會阻塞當前執行緒,只有以下4種情況中的一種發生時,該方法才會返回。

  • 與park對應的unpark執行或已經執行時。“已經執行”是指unpark先執行,然後再執行park 的情況。
  • 執行緒被中斷時。
  • 等待完time引數指定的毫秒數時。
  • 異常現象發生時,這個異常現象沒有任何原因。

當從佇列中取一個元素時,如果佇列為空,同樣會用 condition 的 await() 方法阻塞消費者執行緒。只是這個兩個不同的 Condition ,也就是兩個不同的等待佇列。

取的時候要求佇列不為空,一旦佇列為空,消費者就要加入內建等待佇列,然後就要阻塞等待佇列不為空的訊號。這就是通知模式。