1. 程式人生 > >Java併發程式設計(六)阻塞佇列

Java併發程式設計(六)阻塞佇列

前言

Android多執行緒(一)執行緒池這篇文章時,當我們要建立ThreadPoolExecutor的時候需要傳進來一個型別為BlockingQueue的引數,它就是阻塞佇列,在這一篇文章裡我們會介紹阻塞佇列的定義、種類、實現原理以及應用。

1.什麼是阻塞佇列

阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。

BlockingQueue有兩個常見阻塞場景

  1. 當佇列中沒有資料的情況下,消費者端的所有執行緒都會被自動阻塞(掛起),直到有資料放入佇列。

    這裡寫圖片描述

  2. 當佇列中填滿資料的情況下,生產者端的所有執行緒都會被自動阻塞(掛起),直到佇列中有空的位置,執行緒被自動喚醒。

    這裡寫圖片描述

那麼支援以上兩種阻塞場景的佇列我們稱之為阻塞佇列。

BlockingQueue的核心方法

放入資料:

  • offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,
    則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒)  
  • offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中
    加入BlockingQueue,則返回失敗。
  • put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.

獲取資料:

  • poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,
    取不到時返回null;
  • poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,
    佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。
  • take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到
    BlockingQueue有新的資料被加入;
  • drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數),
    通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。

插入和移除操作的4種處理方式

  • 丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue
    full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。

  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null

  • 一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。

  • 超時退出:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。

2.Java中的阻塞佇列

JDK7提供了7個阻塞佇列,分別是:

  • ArrayBlockingQueue :由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue :由連結串列結構組成的有界阻塞佇列。
  • PriorityBlockingQueue :支援優先順序排序的無界阻塞佇列。
  • DelayQueue:使用優先順序佇列實現的無界阻塞佇列。
  • SynchronousQueue:不儲存元素的阻塞佇列。
  • LinkedTransferQueue:由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque:由連結串列結構組成的雙向阻塞佇列。

ArrayBlockingQueue

用陣列實現的有界阻塞佇列。此佇列按照先進先出(FIFO)的原則對元素進行排序。預設情況下不保證訪問者公平的訪問佇列,所謂公平訪問佇列是指阻塞的所有生產者執行緒或消費者執行緒,當佇列可用時,可以按照阻塞的先後順序訪問佇列,即先阻塞的生產者執行緒,可以先往佇列裡插入元素,先阻塞的消費者執行緒,可以先從佇列裡獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下程式碼建立一個公平的阻塞佇列:

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

LinkedBlockingQueue

基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,此佇列按照先進先出(FIFO)的原則對元素進行排序,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞佇列,一般情況下,在處理多執行緒間的生產者消費者問題,使用這兩個類足以。

PriorityBlockingQueue

是一個支援優先順序的無界佇列。預設情況下元素採取自然順序升序排列。可以自定義實現compareTo()方法來指定元素進行排序規則,或者初始化PriorityBlockingQueue時,指定構造引數Comparator來對元素進行排序。需要注意哦的是不能保證同優先順序元素的順序。

DelayQueue

是一個支援延時獲取元素的無界阻塞佇列。佇列使用PriorityQueue來實現。佇列中的元素必須實現Delayed介面,在建立元素時可以指定多久才能從佇列中獲取當前元素。只有在延遲期滿時才能從佇列中提取元素。我們可以將DelayQueue運用在以下應用場景:

  • 快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。
  • 定時任務排程:使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。

SynchronousQueue

是一個不儲存元素的阻塞佇列。每一個put操作必須等待一個take操作,否則不能繼續新增元素。SynchronousQueue可以看成是一個傳球手,負責把生產者執行緒處理的資料直接傳遞給消費者執行緒。佇列本身並不儲存任何元素,非常適合於傳遞性場景,比如在一個執行緒中使用的資料,傳遞給另外一個執行緒使用,SynchronousQueue的吞吐量高於LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue

是一個由連結串列結構組成的無界阻塞TransferQueue佇列。相對於其他阻塞佇列,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法。如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生產者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會將元素存放在佇列的tail節點,並等到該元素被消費者消費了才返回。transfer方法的關鍵程式碼如下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行程式碼是試圖把存放當前元素的s節點作為tail節點。第二行程式碼是讓CPU自旋等待消費者消費元素。因為自旋會消耗CPU,所以自旋一定的次數後使用Thread.yield()方法來暫停當前正在執行的執行緒,並執行其他執行緒。

tryTransfer方法。則是用來試探下生產者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法無論消費者是否接收,方法立即返回。而transfer方法是必須等到消費者消費了才返回。

對於帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再返回,如果超時還沒消費元素,則返回false,如果在超時時間內消費了元素,則返回true。

LinkedBlockingDeque

是一個由連結串列結構組成的雙向阻塞佇列。所謂雙向佇列指的你可以從佇列的兩端插入和移出元素。雙端佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,也就減少了一半的競爭。相比其他的阻塞佇列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結尾的方法,表示插入,獲取(peek)或移除雙端佇列的第一個元素。以Last單詞結尾的方法,表示插入,獲取或移除雙端佇列的最後一個元素。另外插入方法add等同於addLast,移除方法remove等效於removeFirst。但是take方法卻等同於takeFirst,不知道是不是Jdk的bug,使用時還是用帶有First和Last字尾的方法更清楚。

在初始化LinkedBlockingDeque時可以設定容量防止其過渡膨脹。另外雙向阻塞佇列可以運用在“工作竊取”模式中。

3.阻塞佇列的實現原理(JDK1.7)

以ArrayBlockingQueue為例,我們先來看看程式碼:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private static final long serialVersionUID = -817911632652898426L;
    /** The queued items */
    final Object[] items;
    /** items index for next take, poll, peek or remove */
    int takeIndex;
    /** items index for next put, offer, or add */
    int putIndex;
    /** Number of elements in the queue */
    int count;
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;
 ...省略
 }

從上面程式碼可以看出ArrayBlockingQueue是維護一個Object型別的陣列,takeIndex和putIndex分別表示隊首元素和隊尾元素的下標,count表示佇列中元素的個數,lock則是一個可重入鎖,notEmpty和notFull是等待條件。接下來我們看看關鍵方法put:

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

從put方法的實現可以看出,它先獲取了鎖,並且獲取的是可中斷鎖,然後判斷當前元素個數是否等於陣列的長度,如果相等,則呼叫notFull.await()進行等待,當被其他執行緒喚醒時,通過enqueue(e)方法插入元素,最後解鎖。

   /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();
    }

插入成功後,通過notEmpty喚醒正在等待取元素的執行緒。再來看看take方法:

   public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

跟put方法實現類似,put方法等待的是notFull訊號,而take方法等待的是notEmpty訊號。在take方法中,如果可以取元素,則通過dequeue方法取得元素,下面是dequeue方法的實現:

 private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

4.阻塞佇列的使用場景

除了執行緒池的實現使用阻塞佇列之外,我們可以在生產者-消費者模式來使用阻塞佇列,首先使用Object.wait()、Object.notify()和非阻塞佇列實現生產者-消費者模式:

public class Test {
    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);  
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();       
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread{         
        @Override
        public void run() {
            while(true){
                synchronized (queue) {
                    while(queue.size() == 0){
                        try {
                            System.out.println("佇列空,等待資料");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            queue.notify();
                        }
                    }
                    queue.poll();          //每次移走隊首元素
                    queue.notify();
                }
            }
        }
    }

    class Producer extends Thread{       
        @Override
        public void run() {
            while(true){
                synchronized (queue) {
                    while(queue.size() == queueSize){
                        try {
                            System.out.println("佇列滿,等待有空餘空間");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            queue.notify();
                        }
                    }
                    queue.offer(1);        //每次插入一個元素
                    queue.notify();
                }
            }
        }
    }       
}

下面是使用阻塞佇列實現的生產者-消費者模式:

public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); 
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();         
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread{  
        @Override
        public void run() {
            while(true){
                try {
                    queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }   
    }

    class Producer extends Thread{    
        @Override
        public void run() {         
            while(true){
                try {
                    queue.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }     
    }
}

很顯然使用阻塞佇列實現不需要單獨考慮同步和執行緒間通訊的問題,實現起來很簡單。

參考資料:
《Java併發程式設計的藝術》