1. 程式人生 > >JUC之ArrayBlockingQueue和LinkedBlockingQueue

JUC之ArrayBlockingQueue和LinkedBlockingQueue

部分摘抄自

阻塞佇列

在JDK中,LinkedList或ArrayList就是佇列。但是實際使用者並不多。
阻塞佇列與我們平常接觸的普通佇列(LinkedList或ArrayList等)的最大不同點,在於阻塞佇列支出阻塞新增和阻塞刪除方法。

  • 阻塞新增
    所謂的阻塞新增是指當阻塞佇列元素已滿時,佇列會阻塞加入元素的執行緒,直佇列元素不滿時才重新喚醒執行緒執行元素加入操作。

  • 阻塞刪除
    阻塞刪除是指在佇列元素為空時,刪除佇列元素的執行緒將被阻塞,直到佇列不為空再執行刪除操作(一般都會返回被刪除的元素)

這裡寫圖片描述

ArrayBlockingQueue

首先我們看一下如何使用

private final static ArrayBlockingQueue<Apple> mAbq= new ArrayBlockingQueue<>(1);
//生產者程式碼
Apple apple = new Apple();
mAbq.put(apple);
//消費者程式碼
Apple apple = mAbq.take();
System.out.println("消費Apple="+apple);

在構造方法中

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if
(capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //首先初始化本物件用的鎖。如果設定為公平鎖,則先lock的執行緒先獲取到鎖。 //公平鎖會帶來效能損失 //synchronized是非公平鎖,ReentrantLock在預設的情況下也是非公平鎖 lock = new ReentrantLock(fair); //Conditon中的await()對應Object的wait();
//Condition中的signal()對應Object的notify(); //Condition中的signalAll()對應Object的notifyAll()。 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

首先,我們先看看阻塞的put和take方法。首先,是對

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//加鎖,可以被別的執行緒打斷
        try {
            while (count == items.length)
                notFull.await();//如果佇列已經滿了,則等待(呼叫await就等於釋放了鎖)
            enqueue(e);
        } finally {
            lock.unlock();//多說一句,lock一定要記得在finally中unlock。和JVM支援的synchronized關鍵字不同,一旦發生異常而你沒有unlock,則鎖一直會被鎖死。

        }
    }

   public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)//如果佇列為空,則等待
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

出對入隊都是將陣列當成迴圈連結串列一樣操作

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

LinkedBlockingQueue

唯一的區別是出兌入隊換成了對連結串列的操作。

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

二者異同

1.佇列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE),對於後者而言,當新增速度大於移除速度時,在無界的情況下,可能會造成記憶體溢位等問題。

2.資料儲存容器不同,ArrayBlockingQueue採用的是陣列作為資料儲存容器,而LinkedBlockingQueue採用的則是以Node節點作為連線物件的連結串列。

3.由於ArrayBlockingQueue採用的是陣列的儲存容器,因此在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而LinkedBlockingQueue則會生成一個額外的Node物件。這可能在長時間內需要高效併發地處理大批量資料的時,對於GC可能存在較大影響。

4.兩者的實現佇列新增或移除的鎖不一樣,ArrayBlockingQueue實現的佇列中的鎖是沒有分離的,即新增操作和移除操作採用的同一個ReenterLock鎖,而LinkedBlockingQueue實現的佇列中的鎖是分離的,其新增採用的是putLock,移除採用的則是takeLock,這樣能大大提高佇列的吞吐量,也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。