1. 程式人生 > >Java併發指南11:解讀 Java 阻塞佇列 BlockingQueue

Java併發指南11:解讀 Java 阻塞佇列 BlockingQueue

解讀 Java 併發佇列 BlockingQueue

轉自:https://javadoop.com/post/java-concurrent-queue

最近得空,想寫篇文章好好說說 java 執行緒池問題,我相信很多人都一知半解的,包括我自己在仔仔細細看原始碼之前,也有許多的不解,甚至有些地方我一直都沒有理解到位。

說到執行緒池實現,那麼就不得不涉及到各種 BlockingQueue 的實現,那麼我想就 BlockingQueue 的問題和大家分享分享我瞭解的一些知識。

本文沒有像之前分析 AQS 那樣一行一行原始碼分析了,不過還是把其中最重要和最難理解的程式碼說了一遍,所以不免篇幅略長。本文涉及到比較多的 Doug Lea 對 BlockingQueue 的設計思想,希望有心的讀者真的可以有一些收穫,我覺得自己還是寫了一些乾貨的。

本文直接參考 Doug Lea 寫的 Java doc 和註釋,這也是我們在學習 java 併發包時最好的材料了。希望大家能有所思、有所悟,學習 Doug Lea 的程式碼風格,並將其優雅、嚴謹的作風應用到我們寫的每一行程式碼中。

目錄

阻塞佇列概覽

1. 什麼是阻塞佇列?

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。

阻塞佇列提供了四種處理方法:

方法\處理方式丟擲異常返回特殊值一直阻塞超時退出
插入方法add(e)offer(e)put(e)offer(e,time,unit)
移除方法remove()poll()take()poll(time,unit)
檢查方法element()peek()不可用不可用
  • 丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null
  • 一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。
  • 超時退出:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。

2、Java裡的阻塞佇列

JDK7提供了7個阻塞佇列。分別是

  • ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。
  • PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。
  • DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列。
  • LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

ArrayBlockingQueue

ArrayBlockingQueue是一個用陣列實現的有界阻塞佇列。此佇列按照先進先出(FIFO)的原則對元素進行排序。預設情況下不保證訪問者公平的訪問佇列,所謂公平訪問佇列是指阻塞的所有生產者執行緒或消費者執行緒,當佇列可用時,可以按照阻塞的先後順序訪問佇列,即先阻塞的生產者執行緒,可以先往佇列裡插入元素,先阻塞的消費者執行緒,可以先從佇列裡獲取元素。通常情況下為了保證公平性會降低吞吐量。

LinkedBlockingQueue

LinkedBlockingQueue是一個用連結串列實現的有界阻塞佇列。此佇列的預設和最大長度為Integer.MAX_VALUE。此佇列按照先進先出的原則對元素進行排序。

PriorityBlockingQueue

PriorityBlockingQueue是一個支援優先順序的無界佇列。預設情況下元素採取自然順序排列,也可以通過比較器comparator來指定元素的排序規則。元素按照升序排列。

DelayQueue

DelayQueue是一個支援延時獲取元素的無界阻塞佇列。佇列使用PriorityQueue來實現。佇列中的元素必須實現Delayed介面,在建立元素時可以指定多久才能從佇列中獲取當前元素。只有在延遲期滿時才能從佇列中提取元素。我們可以將DelayQueue運用在以下應用場景:

  • 快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。
  • 定時任務排程。使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。
3、阻塞佇列原始碼分析:

BlockingQueue

首先,最基本的來說, BlockingQueue 是一個先進先出的佇列(Queue),為什麼說是阻塞(Blocking)的呢?是因為 BlockingQueue 支援當獲取佇列元素但是佇列為空時,會阻塞等待佇列中有元素再返回;也支援新增元素時,如果佇列已滿,那麼等到佇列可以放入新元素時再放入。

BlockingQueue 是一個介面,繼承自 Queue,所以其實現類也可以作為 Queue 的實現來使用,而 Queue 又繼承自 Collection 介面。

BlockingQueue 對插入操作、移除操作、獲取元素操作提供了四種不同的方法用於不同的場景中使用:1、丟擲異常;2、返回特殊值(null 或 true/false,取決於具體的操作);3、阻塞等待此操作,直到這個操作成功;4、阻塞等待此操作,直到成功或者超時指定時間。總結如下:

Throws exceptionSpecial valueBlocksTimes out
Insertadd(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()not applicablenot applicable

BlockingQueue 的各個實現都遵循了這些規則,當然我們也不用死記這個表格,知道有這麼回事,然後寫程式碼的時候根據自己的需要去看方法的註釋來選取合適的方法即可。

對於 BlockingQueue,我們的關注點應該在 put(e) 和 take() 這兩個方法,因為這兩個方法是帶阻塞的。

BlockingQueue 不接受 null 值的插入,相應的方法在碰到 null 的插入時會丟擲 NullPointerException 異常。null 值在這裡通常用於作為特殊值返回(表格中的第三列),代表 poll 失敗。所以,如果允許插入 null 值的話,那獲取的時候,就不能很好地用 null 來判斷到底是代表失敗,還是獲取的值就是 null 值。

一個 BlockingQueue 可能是有界的,如果在插入的時候,發現佇列滿了,那麼 put 操作將會阻塞。通常,在這裡我們說的無界佇列也不是說真正的無界,而是它的容量是 Integer.MAX_VALUE(21億多)。

BlockingQueue 是設計用來實現生產者-消費者佇列的,當然,你也可以將它當做普通的 Collection 來用,前面說了,它實現了 java.util.Collection 介面。例如,我們可以用 remove(x) 來刪除任意一個元素,但是,這類操作通常並不高效,所以儘量只在少數的場合使用,比如一條訊息已經入隊,但是需要做取消操作的時候。

BlockingQueue 的實現都是執行緒安全的,但是批量的集合操作如 addAllcontainsAllretainAll 和 removeAll 不一定是原子操作。如 addAll(c) 有可能在添加了一些元素後中途丟擲異常,此時 BlockingQueue 中已經添加了部分元素,這個是允許的,取決於具體的實現。

BlockingQueue 不支援 close 或 shutdown 等關閉操作,因為開發者可能希望不會有新的元素新增進去,此特性取決於具體的實現,不做強制約束。

最後,BlockingQueue 在生產者-消費者的場景中,是支援多消費者和多生產者的,說的其實就是執行緒安全問題。

相信上面說的每一句都很清楚了,BlockingQueue 是一個比較簡單的執行緒安全容器,下面我會分析其具體的在 JDK 中的實現,這裡又到了 Doug Lea 表演時間了。

BlockingQueue 實現之 ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 介面的有界佇列實現類,底層採用陣列來實現。

其併發控制採用可重入鎖來控制,不管是插入操作還是讀取操作,都需要獲取到鎖才能進行操作。

如果讀者看過我之前寫的《一行一行原始碼分析清楚 AbstractQueuedSynchronizer(二)》 的關於 Condition 的文章的話,那麼你一定能很容易看懂 ArrayBlockingQueue 的原始碼,它採用一個 ReentrantLock 和相應的兩個 Condition 來實現。

ArrayBlockingQueue 共有以下幾個屬性:

// 用於存放元素的陣列
final Object[] items;
// 下一次讀取操作的位置
int takeIndex;
// 下一次寫入操作的位置
int putIndex;
// 佇列中的元素數量
int count;

// 以下幾個就是控制併發用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

我們用個示意圖來描述其同步機制:

array-blocking-queue

ArrayBlockingQueue 實現併發同步的原理就是,讀操作和寫操作都需要獲取到 AQS 獨佔鎖才能進行操作。如果佇列為空,這個時候讀操作的執行緒進入到讀執行緒佇列排隊,等待寫執行緒寫入新的元素,然後喚醒讀執行緒佇列的第一個等待執行緒。如果佇列已滿,這個時候寫操作的執行緒進入到寫執行緒佇列排隊,等待讀執行緒將佇列元素移除騰出空間,然後喚醒寫執行緒佇列的第一個等待執行緒。

對於 ArrayBlockingQueue,我們可以在構造的時候指定以下三個引數:

  1. 佇列容量,其限制了佇列中最多允許的元素個數;
  2. 指定獨佔鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高,公平鎖可以保證每次都是等待最久的執行緒獲取到鎖;
  3. 可以指定用一個集合來初始化,將此集合中的元素在構造方法期間就先新增到佇列中。

更具體的原始碼我就不進行分析了,因為它就是 AbstractQueuedSynchronizer 中 Condition 的使用,感興趣的讀者請看我寫的《一行一行原始碼分析清楚 AbstractQueuedSynchronizer(二)》,因為只要看懂了那篇文章,ArrayBlockingQueue 的程式碼就沒有分析的必要了,當然,如果你完全不懂 Condition,那麼基本上也就可以說看不懂 ArrayBlockingQueue 的原始碼了。

BlockingQueue 實現之 LinkedBlockingQueue

底層基於單向連結串列實現的阻塞佇列,可以當做無界佇列也可以當做有界佇列來使用。看構造方法:

// 傳說中的無界佇列
publicLinkedBlockingQueue(){
    this(Integer.MAX_VALUE);
}
// 傳說中的有界佇列
publicLinkedBlockingQueue(int capacity){
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

我們看看這個類有哪些屬性:

// 佇列容量
private final int capacity;

// 佇列中的元素數量
private final AtomicInteger count = new AtomicInteger(0);

// 隊頭
private transient Node<E> head;

// 隊尾
private transient Node<E> last;

// take, poll, peek 等讀操作的方法需要獲取到這個鎖
private final ReentrantLock takeLock = new ReentrantLock();

// 如果讀操作的時候佇列是空的,那麼等待 notEmpty 條件
private final Condition notEmpty = takeLock.newCondition();

// put, offer 等寫操作的方法需要獲取到這個鎖
private final ReentrantLock putLock = new ReentrantLock();

// 如果寫操作的時候佇列是滿的,那麼等待 notFull 條件
private final Condition notFull = putLock.newCondition();

這裡用了兩個鎖,兩個 Condition,簡單介紹如下:

takeLock 和 notEmpty 怎麼搭配:如果要獲取(take)一個元素,需要獲取 takeLock 鎖,但是獲取了鎖還不夠,如果佇列此時為空,還需要佇列不為空(notEmpty)這個條件(Condition)。

putLock 需要和 notFull 搭配:如果要插入(put)一個元素,需要獲取 putLock 鎖,但是獲取了鎖還不夠,如果佇列此時已滿,還需要佇列不是滿的(notFull)這個條件(Condition)。

首先,這裡用一個示意圖來看看 LinkedBlockingQueue 的併發讀寫控制,然後再開始分析原始碼:

linked-blocking-queue

看懂這個示意圖,原始碼也就簡單了,讀操作是排好隊的,寫操作也是排好隊的,唯一的併發問題在於一個寫操作和一個讀操作同時進行,只要控制好這個就可以了。

先上構造方法:

publicLinkedBlockingQueue(int capacity){
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

注意,這裡會初始化一個空的頭結點,那麼第一個元素入隊的時候,佇列中就會有兩個元素。讀取元素時,也總是獲取頭節點後面的一個節點。count 的計數值不包括這個頭節點。

我們來看下 put 方法是怎麼將元素插入到隊尾的:

publicvoidput(E e)throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // 如果你糾結這裡為什麼是 -1,可以看看 offer 方法。這就是個標識成功、失敗的標誌而已。
    int c = -1;
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 必須要獲取到 putLock 才可以進行插入操作
    putLock.lockInterruptibly();
    try {
        // 如果佇列滿,等待 notFull 的條件滿足。
        while (count.get() == capacity) {
            notFull.await();
        }
        // 入隊
        enqueue(node);
        // count 原子加 1,c 還是加 1 前的值
        c = count.getAndIncrement();
        // 如果這個元素入隊後,還有至少一個槽可以使用,呼叫 notFull.signal() 喚醒等待執行緒。
        // 哪些執行緒會等待在 notFull 這個 Condition 上呢?
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 入隊後,釋放掉 putLock
        putLock.unlock();
    }
    // 如果 c == 0,那麼代表隊列在這個元素入隊前是空的(不包括head空節點),
    // 那麼所有的讀執行緒都在等待 notEmpty 這個條件,等待喚醒,這裡做一次喚醒操作
    if (c == 0)
        signalNotEmpty();
}

// 入隊的程式碼非常簡單,就是將 last 屬性指向這個新元素,並且讓原隊尾的 next 指向這個元素
// 這裡入隊沒有併發問題,因為只有獲取到 putLock 獨佔鎖以後,才可以進行此操作
privatevoidenqueue(Node<E> node){
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

// 元素入隊後,如果需要,呼叫這個方法喚醒讀執行緒來讀
privatevoidsignalNotEmpty(){
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

我們再看看 take 方法:

public E take()throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 首先,需要獲取到 takeLock 才能進行出隊操作
    takeLock.lockInterruptibly();
    try {
        // 如果佇列為空,等待 notEmpty 這個條件滿足再繼續執行
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 出隊
        x = dequeue();
        // count 進行原子減 1
        c = count.getAndDecrement();
        // 如果這次出隊後,佇列中至少還有一個元素,那麼呼叫 notEmpty.signal() 喚醒其他的讀執行緒
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 出隊後釋放掉 takeLock
        takeLock.unlock();
    }
    // 如果 c == capacity,那麼說明在這個 take 方法發生的時候,佇列是滿的
    // 既然出隊了一個,那麼意味著佇列不滿了,喚醒寫執行緒去寫
    if (c == capacity)
        signalNotFull();
    return x;
}
// 取隊頭,出隊
private E dequeue(){
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    // 之前說了,頭結點是空的
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    // 設定這個為新的頭結點
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
// 元素出隊後,如果需要,呼叫這個方法喚醒寫執行緒來寫
privatevoidsignalNotFull(){
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

原始碼分析就到這裡結束了吧,畢竟還是比較簡單的原始碼,基本上只要讀者認真點都看得懂。

BlockingQueue 實現之 SynchronousQueue

它是一個特殊的佇列,它的名字其實就蘊含了它的特徵 - - 同步的佇列。為什麼說是同步的呢?這裡說的並不是多執行緒的併發問題,而是因為當一個執行緒往佇列中寫入一個元素時,寫入操作不會立即返回,需要等待另一個執行緒來將這個元素拿走;同理,當一個讀執行緒做讀操作的時候,同樣需要一個相匹配的寫執行緒的寫操作。這裡的 Synchronous 指的就是讀執行緒和寫執行緒需要同步,一個讀執行緒匹配一個寫執行緒。

我們比較少使用到 SynchronousQueue 這個類,不過它線上程池的實現類 ScheduledThreadPoolExecutor 中得到了應用,感興趣的讀者可以在看完這個後去看看相應的使用。

雖然上面我說了佇列,但是 SynchronousQueue 的佇列其實是虛的,其不提供任何空間(一個都沒有)來儲存元素。資料必須從某個寫執行緒交給某個讀執行緒,而不是寫到某個佇列中等待被消費。

你不能在 SynchronousQueue 中使用 peek 方法(在這裡這個方法直接返回 null),peek 方法的語義是隻讀取不移除,顯然,這個方法的語義是不符合 SynchronousQueue 的特徵的。SynchronousQueue 也不能被迭代,因為根本就沒有元素可以拿來迭代的。雖然 SynchronousQueue 間接地實現了 Collection 介面,但是如果你將其當做 Collection 來用的話,那麼集合是空的。當然,這個類也是不允許傳遞 null 值的(併發包中的容器類好像都不支援插入 null 值,因為 null 值往往用作其他用途,比如用於方法的返回值代表操作失敗)。

接下來,我們來看看具體的原始碼實現吧,它的原始碼不是很簡單的那種,我們需要先搞清楚它的設計思想。

原始碼加註釋大概有 1200 行,我們先看大框架:

// 構造時,我們可以指定公平模式還是非公平模式,區別之後再說
publicSynchronousQueue(boolean fair){
    transferer = fair ? new TransferQueue() : new TransferStack();
}
abstract static classTransferer{
    // 從方法名上大概就知道,這個方法用於轉移元素,從生產者手上轉到消費者手上
    // 也可以被動地,消費者呼叫這個方法來從生產者手上取元素
    // 第一個引數 e 如果不是 null,代表場景為:將元素從生產者轉移給消費者
    // 如果是 null,代表消費者等待生產者提供元素,然後返回值就是相應的生產者提供的元素
    // 第二個引數代表是否設定超時,如果設定超時,超時時間是第三個引數的值
    // 返回值如果是 null,代表超時,或者中斷。具體是哪個,可以通過檢測中斷狀態得到。
    abstract Object transfer(Object e, boolean timed, long nanos);
}

Transferer 有兩個內部實現類,是因為構造 SynchronousQueue 的時候,我們可以指定公平策略。公平模式意味著,所有的讀寫執行緒都遵守先來後到,FIFO 嘛,對應 TransferQueue。而非公平模式則對應 TransferStack。

SynchronousQueue採用佇列TransferQueue來實現公平性策略,採用堆疊TransferStack來實現非公平性策略,他們兩種都是通過連結串列實現的,其節點分別為QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演著非常重要的作用,SynchronousQueue的put、take操作都是委託這兩個類來實現的。

synchronous-queue

我們先採用公平模式分析原始碼,然後再說說公平模式和非公平模式的區別。

接下來,我們看看 put 方法和 take 方法:

// 寫入值
publicvoidput(E o)throws InterruptedException {
    if (o == null) throw new NullPointerException();
    if (transferer.transfer(o, false, 0) == null) { // 1
        Thread.interrupted();
        throw new InterruptedException();
    }
}
// 讀取值並移除
public E take()throws InterruptedException {
    Object e = transferer.transfer(null,