1. 程式人生 > >Java併發(十八):阻塞佇列BlockingQueue BlockingQueue(阻塞佇列)詳解 二叉堆(一)之 圖文解析 和 C語言的實現 多執行緒程式設計:阻塞、併發佇列的使用總結 Java併發程式設計:阻塞佇列 java阻塞佇列 BlockingQueue(阻塞佇列)詳解

Java併發(十八):阻塞佇列BlockingQueue BlockingQueue(阻塞佇列)詳解 二叉堆(一)之 圖文解析 和 C語言的實現 多執行緒程式設計:阻塞、併發佇列的使用總結 Java併發程式設計:阻塞佇列 java阻塞佇列 BlockingQueue(阻塞佇列)詳解

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。

這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。

阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。

阻塞佇列提供了四種處理方法:

方法\處理方式 丟擲異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用


 

 

 

 

 

對於 BlockingQueue,我們的關注點應該在 put(e) 和 take() 這兩個方法,因為這兩個方法是帶阻塞的。

  • 丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null
  • 一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。
  • 超時退出:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。
  • ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。
  • PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。
  • DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列。
  • LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

一、應用

先使用Object.wait()和Object.notify()、非阻塞佇列實現生產者-消費者模式:

public class Test {
    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);

    public static void main(String[] args) {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();

        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                synchronized (queue) {
                    while (queue.size() == 0) {
                        try {
                            System.out.println("佇列空,等待資料");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            queue.notify();
                        }
                    }
                    queue.poll(); // 每次移走隊首元素
                    queue.notify();
                    System.out.println("從佇列取走一個元素,佇列剩餘" + queue.size() + "個元素");
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                synchronized (queue) {
                    while (queue.size() == queueSize) {
                        try {
                            System.out.println("佇列滿,等待有空餘空間");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            queue.notify();
                        }
                    }
                    queue.offer(1); // 每次插入一個元素
                    queue.notify();
                    System.out.println("向佇列取中插入一個元素,佇列剩餘空間:"
                            + (queueSize - queue.size()));
                }
            }
        }
    }
}

使用阻塞佇列實現的生產者-消費者模式:

public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
     
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
         
        producer.start();
        consumer.start();
    }
     
    class Consumer extends Thread{
         
        @Override
        public void run() {
            consume();
        }
         
        private void consume() {
            while(true){
                try {
                    queue.take();
                    System.out.println("從佇列取走一個元素,佇列剩餘"+queue.size()+"個元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
     
    class Producer extends Thread{
         
        @Override
        public void run() {
            produce();
        }
         
        private void produce() {
            while(true){
                try {
                    queue.put(1);
                    System.out.println("向佇列取中插入一個元素,佇列剩餘空間:"+(queueSize-queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Java執行緒(十三):BlockingQueue-執行緒的阻塞佇列  BlockingQueue(阻塞佇列)詳解 中都有應用舉例可以參考

二、ArrayBlockingQueue 

ArrayBlockingQueue,一個由陣列實現的有界阻塞佇列。該佇列採用FIFO的原則對元素進行排序新增的。

ArrayBlockingQueue 實現併發同步的原理:

讀操作和寫操作都需要獲取到同一個 AQS 獨佔鎖才能進行操作。

如果佇列為空,這個時候讀操作的執行緒進入到讀執行緒佇列排隊,等待寫執行緒寫入新的元素,然後喚醒讀執行緒佇列的第一個等待執行緒。

如果佇列已滿,這個時候寫操作的執行緒進入到寫執行緒佇列排隊,等待讀執行緒將佇列元素移除騰出空間,然後喚醒寫執行緒佇列的第一個等待執行緒。

原始碼分析:

// 屬性
// 用於存放元素的陣列
final Object[] items;
// 下一次讀取操作的位置
int takeIndex;
// 下一次寫入操作的位置
int putIndex;
// 佇列中的元素數量
int count;

// 以下幾個就是控制併發用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

put:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) // 自旋 佇列滿時,掛起寫執行緒
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();// 成功插入元素後,喚醒讀執行緒
}

take:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) // 自旋 佇列為空,掛起讀執行緒
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();// 成功讀出一個元素之後,喚醒寫執行緒
    return x;
}

三、LinkedBlockingQueue

LinkedBlockingQueue底層基於單向連結串列實現的阻塞佇列,可以當做無界佇列也可以當做有界佇列來使用。

    // 無界佇列
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    // 有界佇列 
   //注意,這裡會初始化一個空的頭結點,那麼第一個元素入隊的時候,佇列中就會有兩個元素。讀取元素時,也總是獲取頭節點後面的一個節點。count 的計數值不包括這個頭節點。
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } // 佇列容量 private final int capacity; // 佇列中的元素數量 private final AtomicInteger count = new AtomicInteger(0); // 隊頭 private transient Node<E> head; // 隊尾 private transient Node<E> last; // take, poll, peek 等讀操作的方法需要獲取到這個鎖 private final ReentrantLock takeLock = new ReentrantLock(); // 如果讀操作的時候佇列是空的,那麼等待 notEmpty 條件 private final Condition notEmpty = takeLock.newCondition(); // put, offer 等寫操作的方法需要獲取到這個鎖 private final ReentrantLock putLock = new ReentrantLock(); // 如果寫操作的時候佇列是滿的,那麼等待 notFull 條件 private final Condition notFull = putLock.newCondition();

原理:

這裡用了兩個鎖,兩個 Condition

takeLock 和 notEmpty 怎麼搭配:如果要獲取(take)一個元素,需要獲取 takeLock 鎖,但是獲取了鎖還不夠,如果佇列此時為空,還需要佇列不為空(notEmpty)這個條件(Condition)。

putLock 需要和 notFull 搭配:如果要插入(put)一個元素,需要獲取 putLock 鎖,但是獲取了鎖還不夠,如果佇列此時已滿,還需要佇列不是滿的(notFull)這個條件(Condition)。

put():

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // 如果你糾結這裡為什麼是 -1,可以看看 offer 方法。這就是個標識成功、失敗的標誌而已。
    int c = -1;
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 必須要獲取到 putLock 才可以進行插入操作
    putLock.lockInterruptibly();
    try {
        // 如果佇列滿,等待 notFull 的條件滿足。
        while (count.get() == capacity) {
            notFull.await();
        }
        // 入隊
        enqueue(node);
        // count 原子加 1,c 還是加 1 前的值
        c = count.getAndIncrement();
        // 如果這個元素入隊後,還有至少一個槽可以使用,呼叫 notFull.signal() 喚醒等待執行緒。
        // 哪些執行緒會等待在 notFull 這個 Condition 上呢?
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 入隊後,釋放掉 putLock
        putLock.unlock();
    }
    // 如果 c == 0,那麼代表隊列在這個元素入隊前是空的(不包括head空節點),
    // 那麼所有的讀執行緒都在等待 notEmpty 這個條件,等待喚醒,這裡做一次喚醒操作
    if (c == 0)
        signalNotEmpty();
}

// 入隊的程式碼非常簡單,就是將 last 屬性指向這個新元素,並且讓原隊尾的 next 指向這個元素
// 這裡入隊沒有併發問題,因為只有獲取到 putLock 獨佔鎖以後,才可以進行此操作
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

// 元素入隊後,如果需要,呼叫這個方法喚醒讀執行緒來讀
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

take():

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 首先,需要獲取到 takeLock 才能進行出隊操作
    takeLock.lockInterruptibly();
    try {
        // 如果佇列為空,等待 notEmpty 這個條件滿足再繼續執行
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 出隊
        x = dequeue();
        // count 進行原子減 1
        c = count.getAndDecrement();
        // 如果這次出隊後,佇列中至少還有一個元素,那麼呼叫 notEmpty.signal() 喚醒其他的讀執行緒
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 出隊後釋放掉 takeLock
        takeLock.unlock();
    }
    // 如果 c == capacity,那麼說明在這個 take 方法發生的時候,佇列是滿的
    // 既然出隊了一個,那麼意味著佇列不滿了,喚醒寫執行緒去寫
    if (c == capacity)
        signalNotFull();
    return x;
}
// 取隊頭,出隊
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    // 之前說了,頭結點是空的
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    // 設定這個為新的頭結點
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
// 元素出隊後,如果需要,呼叫這個方法喚醒寫執行緒來寫
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

四、PriorityBlockingQueue

PriorityBlockingQueue是一個支援優先順序的無界阻塞佇列。預設情況下元素採用自然順序升序排序,當然我們也可以通過建構函式來指定Comparator來對元素進行排序。需要注意的是PriorityBlockingQueue不能保證同優先順序元素的順序。

PriorityBlockingQueue為無界佇列(ArrayBlockingQueue 是有界佇列,LinkedBlockingQueue 也可以通過在建構函式中傳入 capacity 指定佇列最大的容量,但是 PriorityBlockingQueue 只能指定初始的佇列大小,後面插入元素的時候,如果空間不夠的話會自動擴容)。

需要注意的是PriorityBlockingQueue並不會阻塞資料生產者,而只會在沒有可消費的資料時,阻塞資料的消費者。因此使用的時候要特別注意,生產者生產資料的速度絕對不能快於消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。

PriorityBlockingQueue底層採用二叉堆來實現。

  關於二叉堆: 二叉堆的實現    二叉堆(一)之 圖文解析 和 C語言的實現

屬性:

// 構造方法中,如果不指定大小的話,預設大小為 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 陣列的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 這個就是存放資料的陣列
private transient Object[] queue;

// 隊列當前大小
private transient int size;

// 大小比較器,如果按照自然序排序,那麼此屬性可設定為 null
private transient Comparator<? super E> comparator;

// 併發控制所用的鎖,所有的 public 且涉及到執行緒安全的方法,都必須先獲取到這個鎖
private final ReentrantLock lock;

// 這個很好理解,其例項由上面的 lock 屬性建立
private final Condition notEmpty;

// 這個也是用於鎖,用於陣列擴容的時候,需要先獲取到這個鎖,才能進行擴容操作
// 其使用 CAS 操作
private transient volatile int allocationSpinLock;

// 用於序列化和反序列化的時候用,對於 PriorityBlockingQueue 我們應該比較少使用到序列化
private PriorityQueue q;

put():

    public void put(E e) {
        offer(e); // never need to block
    }

  public boolean offer(E e) {
        // 不能為null
        if (e == null)
            throw new NullPointerException();
        // 獲取鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        // 擴容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            // 根據比較器是否為null,做不同的處理
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            // 喚醒正在等待的消費者執行緒
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

take():

   public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        // 沒有元素 返回null
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            // 出對元素
            E result = (E) array[0];
            // 最後一個元素(也就是插入到空穴中的元素)
            E x = (E) array[n];
            array[n] = null;
            // 根據比較器釋放為null,來執行不同的處理
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

五、DelayQueue

DelayQueue是一個支援延時獲取元素的無界阻塞佇列。

佇列使用PriorityQueue來實現。

佇列中的元素必須實現Delayed介面,在建立元素時可以指定多久才能從佇列中獲取當前元素。

裡面的元素全部都是“可延期”的元素,列頭的元素是最先“到期”的元素,如果佇列裡面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行。也就是說只有在延遲期到時才能夠從佇列中取元素。

DelayQueue應用場景:

  • 快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。
  • 定時任務排程。使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。

佇列中的Delayed必須實現compareTo來指定元素的順序。比如讓延時時間最長的放在佇列的末尾。實現程式碼如下:

    public int compareTo(Delayed other) {
           if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask x = (ScheduledFutureTask)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
       else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }

六、SynchronousQueue

SynchronousQueue是一個不儲存元素的阻塞佇列。每一個put操作必須等待一個take操作,否則不能繼續新增元素。

SynchronousQueue 的佇列其實是虛的,其不提供任何空間(一個都沒有)來儲存元素。資料必須從某個寫執行緒交給某個讀執行緒,而不是寫到某個佇列中等待被消費。

當一個執行緒往佇列中寫入一個元素時,寫入操作不會立即返回,需要等待另一個執行緒來將這個元素拿走;同理,當一個讀執行緒做讀操作的時候,同樣需要一個相匹配的寫執行緒的寫操作。這裡的 Synchronous 指的就是讀執行緒和寫執行緒需要同步,一個讀執行緒匹配一個寫執行緒。

SynchronousQueue可以看成是一個傳球手,負責把生產者執行緒處理的資料直接傳遞給消費者執行緒。佇列本身並不儲存任何元素,非常適合於傳遞性場景,比如在一個執行緒中使用的資料,傳遞給另外一個執行緒使用。

// 構造時,我們可以指定公平模式還是非公平模式,區別之後再說
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue() : new TransferStack();
}
abstract static class Transferer {
    // 從方法名上大概就知道,這個方法用於轉移元素,從生產者手上轉到消費者手上
    // 也可以被動地,消費者呼叫這個方法來從生產者手上取元素
    // 第一個引數 e 如果不是 null,代表場景為:將元素從生產者轉移給消費者
    // 如果是 null,代表消費者等待生產者提供元素,然後返回值就是相應的生產者提供的元素
    // 第二個引數代表是否設定超時,如果設定超時,超時時間是第三個引數的值
    // 返回值如果是 null,代表超時,或者中斷。具體是哪個,可以通過檢測中斷狀態得到。
    abstract Object transfer(Object e, boolean timed, long nanos);
}

我們來看看 transfer 的設計思路,其基本演算法如下:

  1. 當呼叫這個方法時,如果佇列是空的,或者佇列中的節點和當前的執行緒操作型別一致(如當前操作是 put 操作,而佇列中的元素也都是寫執行緒)。這種情況下,將當前執行緒加入到等待佇列即可。
  2. 如果佇列中有等待節點,而且與當前操作可以匹配(如佇列中都是讀操作執行緒,當前執行緒是寫操作執行緒,反之亦然)。這種情況下,匹配等待佇列的隊頭,出隊,返回相應資料。

其實這裡有個隱含的條件被滿足了,佇列如果不為空,肯定都是同種型別的節點,要麼都是讀操作,要麼都是寫操作。這個就要看到底是讀執行緒積壓了,還是寫執行緒積壓了。

put 方法和 take 方法:

// 寫入值
public void put(E o) throws InterruptedException {
    if (o == null) throw new NullPointerException();
    if (transferer.transfer(o, false, 0) == null) { // 1
        Thread.interrupted();
        throw new InterruptedException();
    }
}
// 讀取值並移除
public E take() throws InterruptedException {
    Object e = transferer.transfer(null, false, 0); // 2
    if (e != null)
        return (E)e;
    Thread.interrupted();
    throw new InterruptedException();
}

節點:

static final class QNode {
    volatile QNode next;          // 可以看出來,等待佇列是單向連結串列
    volatile Object item;         // CAS'ed to or from null
    volatile Thread waiter;       // 將執行緒物件儲存在這裡,用於掛起和喚醒
    final boolean isData;         // 用於判斷是寫執行緒節點(isData == true),還是讀執行緒節點

    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }
  ......

transfer 方法:

Object transfer(Object e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        // 佇列空,或佇列中節點型別和當前節點一致,
        // 即我們說的第一種情況,將節點入隊即可。讀者要想著這塊 if 裡面方法其實就是入隊
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            // t != tail 說明剛剛有節點入隊,continue 即可
            if (t != tail)                  // inconsistent read
                continue;
            // 有其他節點入隊,但是 tail 還是指向原來的,此時設定 tail 即可
            if (tn != null) {               // lagging tail
                // 這個方法就是:如果 tail 此時為 t 的話,設定為 tn
                advanceTail(t, tn);
                continue;
            }
            // 
            if (timed && nanos <= 0)        // can't wait
                return null;
            if (s == null)
                s = new QNode(e, isData);
            // 將當前節點,插入到 tail 的後面
            if (!t.casNext(null, s))        // failed to link in
                continue;

            // 將當前節點設定為新的 tail
            advanceTail(t, s);              // swing tail and wait
            // 看到這裡,請讀者先往下滑到這個方法,看完了以後再回來這裡,思路也就不會斷了
            Object x = awaitFulfill(s, e, timed, nanos);
            // 到這裡,說明之前入隊的執行緒被喚醒了,準備往下執行
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? x : e;

        // 這裡的 else 分支就是上面說的第二種情況,有相應的讀或寫相匹配的情況
        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? x : e;
        }
    }
}

void advanceTail(QNode t, QNode nt) {
    if (tail == t)
        UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
// 自旋或阻塞,直到滿足條件,這個方法返回
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {

    long lastTime = timed ? System.nanoTime() : 0;
    Thread w = Thread.currentThread();
    // 判斷需要自旋的次數,
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 如果被中斷了,那麼取消這個節點
        if (w.isInterrupted())
            // 就是將當前節點 s 中的 item 屬性設定為 this
            s.tryCancel(e);
        Object x = s.item;
        // 這裡是這個方法的唯一的出口
        if (x != e)
            return x;
        // 如果需要,檢測是否超時
        if (timed) {
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;
        // 如果自旋達到了最大的次數,那麼檢測
        else if (s.waiter == null)
            s.waiter = w;
        // 如果自旋到了最大的次數,那麼執行緒掛起,等待喚醒
        else if (!timed)
            LockSupport.park(this);
        // spinForTimeoutThreshold 這個之前講 AQS 的時候其實也說過,剩餘時間小於這個閾值的時候,就
        // 不要進行掛起了,自旋的效能會比較好
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

七、LinkedTransferQueue

BlockingQueue對讀或者寫都是鎖上整個佇列,在併發量大的時候,各種鎖是比較耗資源和耗時間的,而前面的SynchronousQueue雖然不會鎖住整個佇列,但它是一個沒有容量的“佇列”。

LinkedTransferQueue是ConcurrentLinkedQueue、SynchronousQueue (公平模式下)、無界的LinkedBlockingQueues等的超集。即可以像其他的BlockingQueue一樣有容量又可以像SynchronousQueue一樣不會鎖住整個佇列

LinkedTransferQueue是一個由連結串列結構組成的無界阻塞TransferQueue佇列。相對於LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法:如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生產者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會將元素存放在佇列的tail節點,並等到該元素被消費者消費了才返回。transfer方法的關鍵程式碼如下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行程式碼是試圖把存放當前元素的s節點作為tail節點。第二行程式碼是讓CPU自旋等待消費者消費元素。因為自旋會消耗CPU,所以自旋一定的次數後使用Thread.yield()方法來暫停當前正在執行的執行緒,並執行其他執行緒。

tryTransfer方法:則是用來試探下生產者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法無論消費者是否接收,方法立即返回。而transfer方法是必須等到消費者消費了才返回。

原始碼分析:【死磕Java併發】—–J.U.C之阻塞佇列:LinkedTransferQueue

八、LinkedBlockingDeque

LinkedBlockingDeque是一個由連結串列結構組成的雙向阻塞佇列。所謂雙向佇列指的你可以從佇列的兩端插入和移出元素。

雙端佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,也就減少了一半的競爭。相比其他的阻塞佇列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法。

在初始化LinkedBlockingDeque時可以初始化佇列的容量,用來防止其再擴容時過渡膨脹。另外雙向阻塞佇列可以運用在“工作竊取”模式中。

原始碼分析:【死磕Java併發】—–J.U.C之阻塞佇列:LinkedBlockingDeque

 

 

參考資料 / 相關推薦:

解讀 Java 併發佇列 BlockingQueue

【死磕Java併發】—–J.U.C之阻塞佇列:ArrayBlockingQueue

多執行緒程式設計:阻塞、併發佇列的使用總結

阻塞佇列

Java併發程式設計:阻塞佇列

java阻塞佇列

Java執行緒(十三):BlockingQueue-執行緒的阻塞佇列

聊聊併發(七)——Java中的阻塞佇列

BlockingQueue(阻塞佇列)詳解