1. 程式人生 > >【搞定Java併發程式設計】第22篇:Java中的阻塞佇列 BlockingQueue 詳解

【搞定Java併發程式設計】第22篇:Java中的阻塞佇列 BlockingQueue 詳解

上一篇:Java併發容器之ConcurrentHashMap詳解

本文目錄

1、阻塞佇列的基本概念

2、ArrayBlockingQueue

2.1、ArrayBlockingQueue的基本使用

2.2、ArrayBlockingQueue原理概要

2.3、ArrayBlockingQueue的(阻塞)新增的實現原理

2.3.1、add(E  e)方法 和 offer(E e)方法

2.3.2、put(E  e)方法

2.4、ArrayBlockingQueue的(阻塞)移除實現原理

2.4.1、poll()方法

2.4.2、remove(Object o)方法

2.4.3、take()方法

2.4.4、peek()方法

3、LinkedBlockingQueue

3.1、LinkedBlockingQueue的基本概要

3.2、LinkedBlockingQueue的實現原理概論

3.3、新增方法的實現原理

3.3.1、add(E  e) 和 offer(E  e) 方法

3.4、移除方法的實現原理

3.4.1、remove方法

3.4.2、poll方法

3.4.3、take方法

3.5、檢查方法的實現原理

3.6、時間阻塞的方法

4、 LinkedBlockingQueue和ArrayBlockingQueue迥異


推薦幾篇不錯的文章:

1、深入剖析java併發之阻塞佇列LinkedBlockingQueue與ArrayBlockingQueue

2、解讀 Java 併發佇列 BlockingQueue

本文大部分內容轉載自:https://blog.csdn.net/javazejian/article/details/77410889。還有一小部分出自於《Java併發程式設計的藝術》書中。

本文在開篇介紹了Java提供的7種阻塞佇列的基本概念,原始碼分析中只對ArrayBlockingQueue和LinkedBlockingQueue做了分析,如果想了解其他五種阻塞佇列的原始碼分析,可以閱讀這篇文章:

解讀 Java 阻塞佇列 BlockingQueue

1、阻塞佇列的基本概念

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加操作支援阻塞的插入和移除方法。

1、支援阻塞的插入方法:當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿;

2、支援阻塞的移除方法:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。

阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。

Java中的阻塞佇列介面BlockingQueue繼承自Queue介面,先來看看Queue介面的情況:

public interface Queue<E> extends Collection<E> {
    
    boolean add(E e);      // 插入方法

    boolean offer(E e);    // 插入方法

    E remove();            // 刪除方法

    E poll();              // 刪除方法

    E element();           // 檢查方法

    E peek();              // 檢查方法
}

再來看看阻塞佇列介面為我們提供的主要方法:

public interface BlockingQueue<E> extends Queue<E> {

    boolean add(E e); 

    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 

    void put(E e) throws InterruptedException; 

    E take() throws InterruptedException; 

    E poll(long timeout, TimeUnit unit) throws InterruptedException; 

    boolean remove(Object o); 
}

總結下上訴方法,可以分為以下三類:

方法\處理方式 丟擲異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用

1、丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。

2、返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null

3、一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。

4、超時退出:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。

  • 插入方法:

add(E e) : 新增成功返回true,失敗拋IllegalStateException異常;

offer(E e) : 成功返回 true,如果此佇列已滿,則返回 false;

put(E e) :將元素插入此佇列的尾部,如果該佇列已滿,則一直阻塞。

  • 刪除方法:

remove(Object o) :移除指定元素,成功返回true,失敗返回false;

poll() : 獲取並移除此佇列的頭元素,若佇列為空,則返回 null;

take():獲取並移除此佇列頭元素,若沒有元素則一直阻塞。

  • 檢查方法:

element() :獲取但不移除此佇列的頭元素,沒有元素則拋異常;

peek() :獲取但不移除此佇列的頭;若佇列為空,則返回 null。

  • 需要注意的幾個點:

BlockingQueue 不接受 null 值的插入,相應的方法在碰到 null 的插入時會丟擲 NullPointerException 異常。null 值在這裡通常用於作為特殊值返回(表格中的第三列),代表 poll 失敗。所以,如果允許插入 null 值的話,那獲取的時候,就不能很好地用 null 來判斷到底是代表失敗,還是獲取的值就是 null 值。

一個 BlockingQueue 可能是有界的,如果在插入的時候,發現佇列滿了,那麼 put 操作將會阻塞。通常,在這裡我們說的無界佇列也不是說真正的無界,而是它的容量是 Integer.MAX_VALUE(21億多)。

BlockingQueue 是設計用來實現生產者-消費者佇列的。當然,你也可以將它當做普通的 Collection 來用,前面說了,它實現了 java.util.Collection 介面。例如,我們可以用 remove(x) 來刪除任意一個元素,但是,這類操作通常並不高效,所以儘量只在少數的場合使用,比如一條訊息已經入隊,但是需要做取消操作的時候。

BlockingQueue 的實現都是執行緒安全的,但是批量的集合操作:如 addAllcontainsAllretainAll和 removeAll 不一定是原子操作。如 addAll(c) 有可能在添加了一些元素後中途丟擲異常,此時 BlockingQueue 中已經添加了部分元素,這個是允許的,取決於具體的實現。

BlockingQueue 不支援 close 或 shutdown 等關閉操作,此特性取決於具體的實現,不做強制約束。

最後,BlockingQueue 在生產者-消費者的場景中,是支援多消費者和多生產者的,說的其實就是執行緒安全問題。

  • Java裡的阻塞佇列

JDK7提供了7個阻塞佇列。分別是:

1、ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。

2、LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。

3、PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。

4、DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。

5、SynchronousQueue:一個不儲存元素的阻塞佇列。

6、LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。

7、LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

  • ArrayBlockingQueue

ArrayBlockingQueue是一個用陣列實現的有界阻塞佇列。此佇列按照先進先出(FIFO)的原則對元素進行排序。

預設情況下不保證訪問者公平的訪問佇列,所謂公平訪問佇列是指阻塞的所有生產者執行緒或消費者執行緒,當佇列可用時,可以按照阻塞的先後順序訪問佇列,即先阻塞的生產者執行緒,可以先往佇列裡插入元素,先阻塞的消費者執行緒,可以先從佇列裡獲取元素。通常情況下為了保證公平性會降低吞吐量。

  • LinkedBlockingQueue

LinkedBlockingQueue是一個用連結串列實現的有界阻塞佇列。此佇列的預設和最大長度為Integer.MAX_VALUE。此佇列按照先進先出的原則對元素進行排序。

  • PriorityBlockingQueue

PriorityBlockingQueue是一個支援優先順序的無界佇列。預設情況下元素採取自然順序排列,也可以通過比較器comparator來指定元素的排序規則。元素按照升序排列。

  • DelayQueue

DelayQueue是一個支援延時獲取元素的無界阻塞佇列。佇列使用PriorityQueue來實現。佇列中的元素必須實現Delayed介面,在建立元素時可以指定多久才能從佇列中獲取當前元素。只有在延遲期滿時才能從佇列中提取元素。我們可以將

DelayQueue運用在以下應用場景:

1、快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。

2、定時任務排程:使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。

  • SynchronousQueue

SynchronousQueue是一個不儲存元素的阻塞佇列。每一個put操作必須等待一個take操作,否則不能繼續新增元素

SynchronousQueue可以看成是一個傳球手,負責把生產者執行緒處理的資料直接傳遞給消費者執行緒。佇列本身並不儲存任何元素,非常適合傳遞性場景。

SynchronousQueue的吞吐量高於 LinkedBlockingQueue 和 ArrayBlockingQueue。

  • LinkedTransferQueue

LinkedTransferQueue是一個由連結串列結構組成的無界阻塞TransferQueue佇列。相對於其他阻塞佇列,LinkedBlockingQueue多了tryTransfer和transfer方法。

  • LinkedBlockingDeque

LinkedBlockingDeque是一個由連結串列結構組成的雙向阻塞佇列。所謂雙向佇列指的是可以從佇列的兩端插入和移除元素。

雙向佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,也就減少了一半的競爭。

在初始化LinkedBlockingDeque時可以設定容量防止其過度膨脹。另外,雙向阻塞佇列可以運用在“工作竊取”模式中。


2、ArrayBlockingQueue

下面原始碼部分的講解都是基於JDK1.8。

2.1、ArrayBlockingQueue的基本使用

ArrayBlockingQueue 是一個用陣列實現的有界阻塞佇列,其內部按先進先出的原則對元素進行排序,其中put方法和take方法為新增和刪除的阻塞方法,下面我們通過ArrayBlockingQueue佇列實現一個生產者消費者的案例,通過該案例簡單瞭解其使用方式。

package com.zju.BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueDemo {
	
	private final static ArrayBlockingQueue<Apple> queue = new ArrayBlockingQueue<>(1);
	
	public static void main(String[] args) {
		new Thread(new Producer(queue)).start();
		new Thread(new Producer(queue)).start();
		new Thread(new Consumer(queue)).start();
		new Thread(new Consumer(queue)).start();
	}
}

// 產品類:蘋果
class Apple{
	public Apple(){	
	}
}

// 生產者執行緒
class Producer implements Runnable{

	private ArrayBlockingQueue<Apple> mAbq;
	
	public Producer(ArrayBlockingQueue<Apple> arrayBlockingQueue) {
		this.mAbq = arrayBlockingQueue;
	}
	
	@Override
	public void run() {
		while(true){
			produce();
		}
	}
	
	private void produce(){
		try {
			Apple apple = new Apple();
			mAbq.put(apple);
			System.out.println("生產蘋果:" + apple);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

// 消費者執行緒
class Consumer implements Runnable{

	private ArrayBlockingQueue<Apple> mAbq;
	
	public Consumer(ArrayBlockingQueue<Apple> arrayBlockingQueue) {
		this.mAbq = arrayBlockingQueue;
	}
	
	@Override
	public void run() {
		while(true){
			try {
				TimeUnit.MICROSECONDS.sleep(1000);
				consume();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	private void consume() throws InterruptedException{
		Apple apple = mAbq.take();
		System.out.println("消費蘋果:" + apple);
	}
}

程式碼比較簡單, Consumer 消費者和 Producer 生產者,通過ArrayBlockingQueue 佇列獲取和新增元素,其中消費者呼叫了take()方法獲取元素當佇列沒有元素就阻塞,生產者呼叫put()方法新增元素,當佇列滿時就阻塞,通過這種方式便實現生產者消費者模式。比直接使用等待喚醒機制或者Condition條件佇列來得更加簡單。執行程式碼,列印部分Log如下:

有點需要注意的是ArrayBlockingQueue內部的阻塞佇列是通過重入鎖ReenterLock和Condition條件佇列實現的,所以ArrayBlockingQueue中的元素存在公平訪問與非公平訪問的區別。對於公平訪問佇列,被阻塞的執行緒可以按照阻塞的先後順序訪問佇列,即先阻塞的執行緒先訪問佇列。而非公平佇列,當佇列可用時,阻塞的執行緒將進入爭奪訪問資源的競爭中,也就是說誰先搶到誰就執行,沒有固定的先後順序。建立公平與非公平阻塞佇列程式碼如下:

// 預設非公平阻塞佇列
ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
// 公平阻塞佇列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);

// 構造方法原始碼
public ArrayBlockingQueue(int capacity) {
     this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
     if (capacity <= 0)
         throw new IllegalArgumentException();
     this.items = new Object[capacity];
     lock = new ReentrantLock(fair);
     notEmpty = lock.newCondition();
     notFull =  lock.newCondition();
}

除了常用的put、take等方法外,其他方法如下:

// 自動移除此佇列中的所有元素。
void clear() 

// 如果此佇列包含指定的元素,則返回 true。          
boolean contains(Object o) 

// 移除此佇列中所有可用的元素,並將它們新增到給定collection中。           
int drainTo(Collection<? super E> c) 

// 最多從此佇列中移除給定數量的可用元素,並將這些元素新增到給定collection 中。       
int drainTo(Collection<? super E> c, int maxElements) 

// 返回在此佇列中的元素上按適當順序進行迭代的迭代器。         
Iterator<E> iterator() 

// 返回佇列還能新增元素的數量
int remainingCapacity() 

// 返回此佇列中元素的數量。      
int size() 

// 返回一個按適當順序包含此佇列中所有元素的陣列。
Object[] toArray() 

// 返回一個按適當順序包含此佇列中所有元素的陣列;返回陣列的執行時型別是指定陣列的執行時型別。      
<T> T[] toArray(T[] a)

2.2、ArrayBlockingQueue原理概要

BlockingQueue的介面資訊,上面已經表訴的很清楚了,為了後文中更好的理解ArrayBlockingQueue,現在我們來看看它和Queue以及AbstractQueue之間的關係。

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {

    protected AbstractQueue() {
    }

    public boolean add(E e) {
        ...省略
    }

    public E remove() {
        ...省略
    }

    public E element() {
        ...省略
    }

    public void clear() {
        ...省略
    }

    public boolean addAll(Collection<? extends E> c) {
        ...省略
    }
}

ArrayBlockingQueue繼承了AbstractQueue、實現了BlockingQueue介面,其內部是通過一個可重入鎖ReentrantLock和兩個Condition條件物件來實現阻塞,這裡先看看其內部成員變數。

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

    // 儲存資料的陣列
    final Object[] items;

    // 獲取資料的索引,主要用於take,poll,peek,remove方法
    int takeIndex;

    // 新增資料的索引,主要用於 put, offer, or add 方法
    int putIndex;

    // 佇列元素的個數 
    int count;

    // 控制併發訪問的鎖 
    final ReentrantLock lock;

    // notEmpty條件物件,用於通知take方法佇列已有元素,可執行獲取操作
    private final Condition notEmpty;

    // notFull條件物件,用於通知put方法佇列未滿,可執行新增操作
    private final Condition notFull;

    // 迭代器
    transient Itrs itrs = null;

    ......
}

從成員變數可看出,ArrayBlockingQueue內部確實是通過陣列物件items來儲存所有的資料,值得注意的是ArrayBlockingQueue通過一個ReentrantLock來同時控制新增執行緒與移除執行緒的併發訪問,這點與LinkedBlockingQueue區別很大(稍後會分析)。而對於notEmpty條件物件則是用於存放等待或喚醒呼叫take方法的執行緒,告訴他們佇列已有元素,可以執行獲取操作。同理notFull條件物件是用於等待或喚醒呼叫put方法的執行緒,告訴它們,佇列未滿,可以執行新增元素的操作。takeIndex代表的是下一個方法(take,poll,peek,remove)被呼叫時獲取陣列元素的索引,putIndex則代表下一個方法(put, offer, or add)被呼叫時元素新增到陣列中的索引。圖示如下 :

2.3、ArrayBlockingQueue的(阻塞)新增的實現原理

2.3.1、add(E  e)方法 和 offer(E e)方法

  • 第1步:呼叫ArrayBlockingQueue中的add(E  e)方法
public boolean add(E e) {
    return super.add(e);
}

可以看到add方法實際上呼叫的是ArrayBlockingQueue中的add(E  e)方法。

  • 第2步:呼叫AbstractQueue中的add(E  e)方法
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

可以發現ArrayBlockingQueue中的add方法又呼叫了其子類ArrayBlockingQueue中的offer(E  e)方法。

  • 第3步:呼叫ArrayBlockingQueue中的offer(E  e)方法
public boolean offer(E e) {
     checkNotNull(e); // 檢查元素是否為null
     final ReentrantLock lock = this.lock;
     lock.lock();     // 加鎖
     try {
         if (count == items.length)   // 判斷佇列是否滿
             return false;
         else {
             enqueue(e);  // 新增元素到佇列
             return true;
         }
     } finally {
         lock.unlock();
     }
}
  • 第4步:呼叫ArrayBlockingQueue中的enqueue(E  e)方法,入隊操作
private void enqueue(E x) {
    // 獲取當前陣列
    final Object[] items = this.items;
    // 通過putIndex索引對陣列進行賦值
    items[putIndex] = x;
    // 索引自增,如果已是最後一個位置,重新設定 putIndex = 0;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;  // 佇列中元素數量加1
    // 喚醒呼叫take()方法的執行緒,執行元素獲取操作。
    notEmpty.signal();
}

這裡的add方法和offer方法實現比較簡單,其中需要注意的是enqueue(E x)方法,其方法內部通過putIndex索引直接將元素新增到陣列items中。

這裡可能會疑惑的是:當putIndex索引大小等於陣列長度時,需要將putIndex重新設定為0,這是因為當前佇列執行元素獲取時總是從佇列頭部獲取,而新增元素從中從佇列尾部獲取,所以當佇列索引(從0開始)與陣列長度相等時,下次我們就需要從陣列頭部開始添加了,如下圖演示 :

2.3.2、put(E  e)方法

put方法是一個阻塞新增方法,即阻塞時可中斷。

public void put(E e) throws InterruptedException {
     checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();  // 該方法可中斷
      try {
          // 當佇列元素個數與陣列長度相等時,無法新增元素
          while (count == items.length)
              // 將當前呼叫執行緒掛起,新增到notFull條件佇列中等待喚醒
              notFull.await();
          enqueue(e);  // 如果佇列沒有滿直接新增。。
      } finally {
          lock.unlock();
      }
}

put方法是一個阻塞的方法,如果佇列元素已滿,那麼當前執行緒將會被notFull條件物件掛起加到等待佇列中,直到佇列有空位才會喚醒新增操作。但如果佇列沒有滿,那麼就直接呼叫enqueue(e)方法將元素加入到陣列佇列中。

到此我們對三個新增方法即put,offer,add都分析完畢,其中offer,add在正常情況下都是無阻塞的新增,而put方法是阻塞新增。這就是阻塞佇列的新增過程。說白了就是當佇列滿時通過條件物件Condtion來阻塞當前呼叫put方法的執行緒,直到執行緒又再次被喚醒執行。總得來說新增執行緒的執行存在以下兩種情況:

1、佇列已滿,那麼新到來的 put 執行緒將新增到 notFull 的條件佇列中等待;

2、有移除執行緒執行移除操作,移除成功同時喚醒 put 執行緒。

如下圖所示 :

圖片來自: https://blog.csdn.net/javazejian/article/details/77410889#commentBox

2.4、ArrayBlockingQueue的(阻塞)移除實現原理

2.4.1、poll()方法

poll:該方法獲取並移除此佇列的頭元素,若佇列為空,則返回 null 。

public E poll() {
       final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           // 判斷佇列是否為null,不為null執行dequeue()方法,否則返回null
           return (count == 0) ? null : dequeue();
       } finally {
           lock.unlock();
       }
}

 // 刪除佇列頭元素並返回
 private E dequeue() {
     // 拿到當前陣列的資料
     final Object[] items = this.items;
      @SuppressWarnings("unchecked")
      // 獲取要刪除的物件
      E x = (E) items[takeIndex];
      // 將陣列中takeIndex索引位置設定為null
      items[takeIndex] = null;
      // takeIndex索引加1並判斷是否與陣列長度相等,
      // 如果相等說明已到盡頭,恢復為0
      if (++takeIndex == items.length)
          takeIndex = 0;
      count--;  // 佇列個數減1
      if (itrs != null)
          itrs.elementDequeued();  // 同時更新迭代器中的元素資料
      // 刪除了元素說明佇列有空位,喚醒notFull條件物件新增執行緒,執行新增操作
      notFull.signal();
      return x;
}

poll():獲取並刪除佇列頭元素,佇列沒有資料就返回null,內部通過dequeue()方法刪除頭元素,註釋很清晰,這裡不重複了。

2.4.2、remove(Object o)方法

public boolean remove(Object o) {
    if (o == null) return false;
    // 獲取陣列資料
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖
    try {
        // 如果此時佇列不為null,這裡是為了防止併發情況
        if (count > 0) {   // count為佇列中元素的個數
            // 獲取下一個要新增元素時的索引
            final int putIndex = this.putIndex;
            // 獲取當前要被刪除元素的索引
            int i = takeIndex;
            // 執行迴圈查詢要刪除的元素
            do {
                // 找到要刪除的元素
                if (o.equals(items[i])) {
                    removeAt(i);  // 執行刪除
                    return true;  // 刪除成功返回true
                }
                // 當前刪除索引執行加1後判斷是否與陣列長度相等
                // 若為true,說明索引已到陣列盡頭,將i設定為0
                if (++i == items.length)
                    i = 0; 
            } while (i != putIndex);  // 繼承查詢
        }
        return false;
    } finally {
        lock.unlock();
    }
}

// 根據索引刪除元素,實際上是把刪除索引之後的元素往前移動一個位置
void removeAt(final int removeIndex) {

     final Object[] items = this.items;
      // 先判斷要刪除的元素是否為當前佇列頭元素
      if (removeIndex == takeIndex) {
          // 如果是直接刪除
          items[takeIndex] = null;
          // 當前佇列頭元素加1並判斷是否與陣列長度相等,若為true設定為0
          if (++takeIndex == items.length)
              takeIndex = 0;
          count--;  // 佇列元素減1
          if (itrs != null)
              itrs.elementDequeued(); // 更新迭代器中的資料
      } else {
      // 如果要刪除的元素不在佇列頭部,
      // 那麼只需迴圈迭代把刪除元素後面的所有元素往前移動一個位置
          // 獲取下一個要被新增的元素的索引,作為迴圈判斷結束條件
          final int putIndex = this.putIndex;
          // 執行迴圈
          for (int i = removeIndex;;) {
              // 獲取要刪除節點索引的下一個索引
              int next = i + 1;
              // 判斷是否已為陣列長度,如果是從陣列頭部(索引為0)開始找
              if (next == items.length)
                  next = 0;
               // 如果查詢的索引不等於要新增元素的索引,說明元素可以再移動
              if (next != putIndex) {
                  items[i] = items[next]; // 把後一個元素前移覆蓋要刪除的元
                  i = next;
              } else {
              // 在removeIndex索引之後的元素都往前移動完畢後清空最後一個元素
                  items[i] = null;
                  this.putIndex = i;
                  break; // 結束迴圈
              }
          }
          count--; // 佇列元素減1
          if (itrs != null)
              itrs.removedAt(removeIndex); // 更新迭代器資料
      }
      notFull.signal();  // 喚醒新增執行緒
}

remove(Object o)方法的刪除過程相對複雜些,因為該方法並不是直接從佇列頭部刪除元素。首先執行緒先獲取鎖,再一步判斷佇列 count > 0, 這點是保證併發情況下刪除操作安全執行。接著獲取下一個要新增源的索引 putIndex 以及 takeIndex 索引 ,作為後續迴圈的結束判斷,因為只要 putIndex 與 takeIndex 不相等就說明佇列沒有結束。然後通過while迴圈找到要刪除的元素索引,執行 removeAt(i) 方法刪除。

在 removeAt(i) 方法中實際上做了兩件事,一是首先判斷佇列頭部元素是否為刪除元素,如果是直接刪除,並喚醒新增執行緒;二是如果要刪除的元素並不是佇列頭元素,那麼執行迴圈操作,從要刪除元素的索引removeIndex之後的元素都往前移動一個位置,那麼要刪除的元素就被removeIndex之後的元素替換,從而也就完成了刪除操作。

2.4.3、take()方法

take()方法:是一個阻塞方法,直接獲取佇列頭元素並刪除。

// 從佇列頭部刪除,佇列沒有元素就阻塞,可中斷
 public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();  //中斷
      try {
          // 如果佇列沒有元素
          while (count == 0)
              // 執行阻塞操作
              notEmpty.await();  // 等待notEmpty條件
          return dequeue();  // 如果佇列有元素執行刪除操作
      } finally {
          lock.unlock();
      }
}

take 方法其實很簡單,有就刪除,沒有就阻塞。注意這個阻塞是可以中斷的,如果佇列沒有資料那麼就加入notEmpty條件佇列等待(有資料就直接取走,方法結束),如果有新的 put 執行緒添加了資料,那麼 put 操作將會喚醒 take 執行緒,執行 take 操作。圖示如下:

2.4.4、peek()方法

public E peek() {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
       // 直接返回當前佇列的頭元素,但不刪除
          return itemAt(takeIndex); 
      } finally {
          lock.unlock();
      }
}

// 返回佇列陣列下標為i的元素
final E itemAt(int i) {
      return (E) items[i];
}

peek方法非常簡單,直接返回當前佇列的頭元素但不刪除任何元素。

ok~,到此對於ArrayBlockingQueue的主要方法就分析完了!


3、LinkedBlockingQueue

3.1、LinkedBlockingQueue的基本概要

LinkedBlockingQueue 是一個由連結串列實現的有界佇列阻塞佇列,但大小預設值為Integer.MAX_VALUE所以我們在使用 LinkedBlockingQueue 時建議手動傳值,讓其提供我們所需的大小,避免佇列過大造成機器負載或者記憶體爆滿等情況。其建構函式如下:

// 預設大小為Integer.MAX_VALUE
public LinkedBlockingQueue() {
       this(Integer.MAX_VALUE);
}

// 建立指定大小為capacity的阻塞佇列
public LinkedBlockingQueue(int capacity) {
     if (capacity <= 0) throw new IllegalArgumentException();
     this.capacity = capacity;
     last = head = new Node<E>(null);
 }

// 建立大小預設值為Integer.MAX_VALUE的阻塞佇列並新增c中的元素到阻塞佇列
public LinkedBlockingQueue(Collection<? extends E> c) {
     this(Integer.MAX_VALUE);
     final ReentrantLock putLock = this.putLock;
     putLock.lock(); // Never contended, but necessary for visibility
     try {
         int n = 0;
         for (E e : c) {
             if (e == null)
                 throw new NullPointerException();
             if (n == capacity)
                 throw new IllegalStateException("Queue full");
             enqueue(new Node<E>(e));
             ++n;
         }
         count.set(n);
     } finally {
         putLock.unlock();
     }
}

從原始碼看,有三種方式可以構造LinkedBlockingQueue。通常情況下,我們建議建立指定大小的LinkedBlockingQueue阻塞佇列,即上訴程式碼中的第2種。

LinkedBlockingQueue 佇列也是按 FIFO(先進先出)排序元素。佇列的頭部是在佇列中時間最長的元素,佇列的尾部是在佇列中時間最短的元素,新元素插入到佇列的尾部,而佇列執行獲取操作會獲得位於佇列頭部的元素

在正常情況下,基於連結串列的佇列的吞吐量要高於基於陣列的佇列(ArrayBlockingQueue),因為其內部實現新增和刪除操作使用了兩個ReenterLock來控制併發執行,而ArrayBlockingQueue內部只是使用一個ReenterLock控制併發,因此LinkedBlockingQueue的吞吐量要高於ArrayBlockingQueue。

注意LinkedBlockingQueue和ArrayBlockingQueue的 API 幾乎是一樣的,但它們的內部實現原理不太相同,這點稍後會分析。使用LinkedBlockingQueue,我們同樣也能實現生產者消費者模式。只需把前面ArrayBlockingQueue案例中的阻塞佇列物件換成LinkedBlockingQueue即可。這裡限於篇幅就不貼重複程式碼了。接下來我們重點分析LinkedBlockingQueue的內部實現原理,最後我們將對ArrayBlockingQueue和LinkedBlockingQueue 做總結,闡明它們間的不同之處。

3.2、LinkedBlockingQueue的實現原理概論

LinkedBlockingQueue是一個基於連結串列的阻塞佇列,其內部維持一個基於連結串列的資料佇列,實際上我們對LinkedBlockingQueue的 API 操作都是間接操作該資料佇列,這裡我們先看看LinkedBlockingQueue的內部成員變數。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    // 節點類,用於儲存資料 
    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

    // 阻塞佇列的大小,預設為Integer.MAX_VALUE 
    private final int capacity;

    // 當前阻塞佇列中的元素個數
    private final AtomicInteger count = new AtomicInteger();

    // 阻塞佇列的頭結點
    transient Node<E> head;

    // 阻塞佇列的尾節點
    private transient Node<E> last;

    // 獲取並移除元素時使用的鎖,如take, poll, etc 
    private final ReentrantLock takeLock = new ReentrantLock();

    // notEmpty條件物件,當佇列沒有資料時,用於掛起執行刪除的執行緒 
    private final Condition notEmpty = takeLock.newCondition();

    // 新增元素時使用的鎖如 put, offer, etc 
    private final ReentrantLock putLock = new ReentrantLock();

    // notFull條件物件,當佇列資料已滿時,用於掛起執行新增的執行緒 
    private final Condition notFull = putLock.newCondition();
}

從上述可看成,每個新增到LinkedBlockingQueue佇列中的資料都將被封裝成Node節點,新增的連結串列佇列中,其中head和last分別指向佇列的頭結點和尾結點。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對併發進行控制。也就是說,新增和刪除操作並不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。

這裡再次強調如果沒有給LinkedBlockingQueue指定容量大小,其預設值將是Integer.MAX_VALUE,如果存在新增速度大於刪除速度時候,有可能會記憶體溢位。這點在使用前希望慎重考慮。至於LinkedBlockingQueue的實現原理圖與ArrayBlockingQueue是類似的,除了對新增和移除方法使用單獨的鎖控制外,兩者都使用了不同的Condition條件物件作為等待佇列,用於掛起take執行緒和put執行緒。 

3.3、新增方法的實現原理

對於新增方法,主要指的是add,offer以及put。

3.3.1、add(E  e) 和 offer(E  e) 方法

public boolean add(E e) {
     if (offer(e))
         return true;
     else
         throw new IllegalStateException("Queue full");
}

從原始碼可以看出,add方法間接呼叫的是offer方法,如果add方法新增失敗將丟擲IllegalStateException異常,新增成功則返回true,那麼下面我們直接看看offer的相關方法實現。

public boolean offer(E e) {
     // 新增元素為null直接丟擲異常
     if (e == null) throw new NullPointerException();
      // 獲取佇列的個數
      final AtomicInteger count = this.count;
      // 判斷佇列是否已滿
      if (count.get() == capacity)
          return false;
      int c = -1;
      // 構建節點
      Node<E> node = new Node<E>(e);
      final ReentrantLock putLock = this.putLock;
      putLock.lock();
      try {
          // 再次判斷佇列是否已滿,考慮併發情況
          if (count.get() < capacity) {
              enqueue(node); // 新增元素
              c = count.getAndIncrement(); // 拿到當前未新增新元素時的佇列長度
              //如果容量還沒滿
              if (c + 1 < capacity)
                  notFull.signal(); // 喚醒下一個新增執行緒,執行新增操作
          }
      } finally {
          putLock.unlock();
      }
      // 由於存在新增鎖和消費鎖,而消費鎖和新增鎖都會持續喚醒等待執行緒,因此count肯定會變化。
      // 這裡的if條件表示如果佇列中還有1條資料
      if (c == 0) 
        signalNotEmpty(); // 如果還存在資料那麼就喚醒消費鎖
    return c >= 0;  // 新增成功返回true,否則返回false
}

// 入隊操作
private void enqueue(Node<E> node) {
     // 佇列尾節點指向新的node節點
     last = last.next = node;
}

// signalNotEmpty方法
private void signalNotEmpty() {
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lock();
          // 喚醒獲取並刪除元素的執行緒
          notEmpty.signal();
      } finally {
          takeLock.unlock();
      }
}

這裡的offer()方法做了兩件事:

第一件事是:判斷佇列是否滿,滿了就直接釋放鎖,沒滿就將節點封裝成Node入隊,然後再次判斷佇列新增完成後是否已滿,不滿就繼續喚醒等到在條件物件notFull上的新增執行緒;

第二件事是:判斷是否需要喚醒等待在notEmpty條件物件上的消費執行緒。

這裡我們可能會有點疑惑,為什麼新增完成後是繼續喚醒在條件物件notFull上的新增執行緒而不是像ArrayBlockingQueue那樣直接喚醒notEmpty條件物件上的消費執行緒?而又為什麼要當if (c == 0)時才去喚醒消費執行緒呢?

喚醒新增執行緒的原因:在新增新元素完成後,會判斷佇列是否已滿,不滿就繼續喚醒在條件物件notFull上的新增執行緒,這點與前面分析的ArrayBlockingQueue很不相同。在ArrayBlockingQueue內部完成新增操作後,會直接喚醒消費執行緒對元素進行獲取,這是因為ArrayBlockingQueue只用了一個ReenterLock同時對新增執行緒和消費執行緒進行控制,這樣如果在新增完成後再次喚醒新增執行緒的話,消費執行緒可能永遠無法執行。而對於LinkedBlockingQueue來說就不一樣了,其內部對新增執行緒和消費執行緒分別使用了各自的ReenterLock鎖對併發進行控制,也就是說新增執行緒和消費執行緒是不會互斥的,所以新增鎖只要管好自己的新增執行緒即可,新增執行緒自己直接喚醒自己的其他新增執行緒,如果沒有等待的新增執行緒,直接結束了。如果有就直到佇列元素已滿才結束掛起,當然offer方法並不會掛起,而是直接結束,只有put方法才會當佇列滿時才執行掛起操作。注意消費執行緒的執行過程也是如此。這也是為什麼LinkedBlockingQueue的吞吐量要相對大些的原因。

為什麼要判斷if (c == 0)時才去喚醒消費執行緒呢?

這是因為消費執行緒一旦被喚醒是一直在消費的(前提是有資料),所以c值是一直在變化的,c值是新增完元素前佇列的大小,此時c只可能是0或c>0。

如果是 c = 0,那麼說明之前消費執行緒已停止,條件物件上可能存在等待的消費執行緒。新增完資料後應該是c+1,那麼有資料就直接喚醒等待消費執行緒,如果沒有就結束啦,等待下一次的消費操作。

如果 c > 0 那麼消費執行緒就不會被喚醒,只能等待下一個消費操作(poll、take、remove)的呼叫。那為什麼不是條件c > 0才去喚醒呢?我們要明白的是消費執行緒一旦被喚醒會和新增執行緒一樣,一直不斷喚醒其他消費執行緒,如果新增前c>0,那麼很可能上一次呼叫的消費執行緒後,資料並沒有被消費完,條件佇列上也就不存在等待的消費執行緒了,所以c>0喚醒消費執行緒得意義不是很大,當然如果新增執行緒一直新增元素,那麼一直c>0,消費執行緒執行的快就要等待下一次呼叫消費操作了(poll、take、remove)。

3.4、移除方法的實現原理

關於移除的方法主要是指remove和poll以及take方法,下面一一分析。

3.4.1、remove方法

public boolean remove(Object o) {
   if (o == null) return false;
     fullyLock();  // 同時對putLock和takeLock加鎖
     try {
         // 迴圈查詢要刪除的元素
         for (Node<E> trail = head, p = trail.next;
              p != null;
              trail = p, p = p.next) {
             if (o.equals(p.item)) { // 找到要刪除的節點
                 unlink(p, trail);   // 直接刪除
                 return true;
             }
         }
         return false;
     } finally {
         fullyUnlock(); // 解鎖
     }
}

// 兩個同時加鎖
void fullyLock() {
       putLock.lock();
       takeLock.lock();
}

void fullyUnlock() {
      takeLock.unlock();
      putLock.unlock();
}

remove方法刪除指定的物件,這裡我們可能會詫異,為什麼同時對putLock和takeLock加鎖?

這是因為remove方法刪除的資料的位置不確定,為了避免造成並非安全問題,所以需要對2個鎖同時加鎖。

3.4.2、poll方法

public E poll() {
        // 獲取當前佇列的大小
        final AtomicInteger count = this.count;
        if (count.get() == 0)  // 如果沒有元素直接返回null
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 判斷佇列是否有資料
            if (count.get() > 0) {
                // 如果有,直接刪除並獲取該元素值
                x = dequeue();
                // 當前佇列大小減一
                c = count.getAndDecrement();
                // 如果佇列未空,繼續喚醒等待在條件物件notEmpty上的消費執行緒
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        // 判斷c是否等於capacity,這是因為如果滿說明NotFull條件物件上
        // 可能存在等待的新增執行緒
        if (c == capacity)
            signalNotFull();
        return x;
}

  private E dequeue() {
        Node<E> h = head; // 獲取頭結點
        Node<E> first = h.next; // 獲取頭結的下一個節點(要刪除的節點)
        h.next = h;   // 自己next指向自己,即被刪除
        head = first; // 更新頭結點
        E x = first.item;  // 獲取刪除節點的值
        first.item = null; // 清空資料,因為first變成頭結點是不能帶資料的,這樣也就刪除佇列的帶資料的第一個節點
        return x;
}

poll方法也比較簡單,如果佇列沒有資料就返回null,如果佇列有資料,那麼就取出來,如果佇列還有資料那麼喚醒等待在條件物件notEmpty上的消費執行緒。然後判斷if (c == capacity)為true就喚醒新增執行緒,這點與前面分析if(c==0)是一樣的道理。因為只有可能佇列滿了,notFull條件物件上才可能存在等待的新增執行緒。

3.4.3、take方法

public E take() throws InterruptedException {
        E x;
        int c = -1;
        // 獲取當前佇列大小
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//可中斷
        try {
            // 如果佇列沒有資料,掛起當前執行緒到條件物件的等待佇列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 如果存在資料直接刪除並返回該資料
            x = dequeue();
            c = count.getAndDecrement(); // 佇列大小減1
            if (c > 1)
                notEmpty.signal(); // 還有資料就喚醒後續的消費執行緒
        } finally {
            takeLock.unlock();
        }
        // 滿足條件,喚醒條件物件上等待佇列中的新增執行緒
        if (c == capacity)
            signalNotFull();
        return x;
}

take方法是一個可阻塞可中斷的移除方法,主要做了兩件事:

一是:如果佇列沒有資料就掛起當前執行緒到 notEmpty 條件物件的等待佇列中一直等待,如果有資料就刪除節點並返回資料項,同時喚醒後續消費執行緒;

二是:嘗試喚醒條件物件 notFull 上等待佇列中的新增執行緒。

到此關於remove、poll、take的實現也分析完了,其中只有take方法具備阻塞功能。remove方法則是成功返回true失敗返回false,poll方法成功返回被移除的值,失敗或沒資料返回null。

下面再看看兩個檢查方法,即peek和element。

3.5、檢查方法的實現原理

兩個檢查方法,即peek和element。

// 構造方法,head 節點不存放資料
 public LinkedBlockingQueue(int capacity) {
       if (capacity <= 0) throw new IllegalArgumentException();
       this.capacity = capacity;
       last = head = new Node<E>(null);
}

 public E element() {
        E x = peek(); // 直接呼叫peek
        if (x != null)
            return x;
        else
            throw new NoSuchElementException(); // 沒資料拋異常
}

 public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 獲取頭結節點的下一個節點
            Node<E> first = head.next;
            if (first == null)
                return null; // 為null就返回null
            else
                return first.item; // 返回值
        } finally {
            takeLock.unlock();
        }
}

從程式碼來看,head頭結節點在初始化時是本身是不帶資料的,僅僅作為頭部head方便我們執行連結串列的相關操作。

peek返回直接獲取頭結點的下一個節點返回其值,如果沒有值就返回null,有值就返回節點對應的值。

element方法內部呼叫的是peek,有資料就返回,沒資料就拋異常。

下面我們最後來看兩個根據時間阻塞的方法,比較有意思,利用的Conditin來實現的。

3.6、時間阻塞的方法

3.6.1、offer(E e, long timeout, TimeUnit unit)

// 在指定時間內阻塞新增的方法,超時就結束
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        // 將時間轉換成納秒
        long nanos = unit.toNanos(timeout);
        int c = -1;
        // 獲取鎖
        final ReentrantLock putLock = this.putLock;
        // 獲取當前佇列大小
        final AtomicInteger count = this.count;
        // 鎖中斷(如果需要)
        putLock.lockInterruptibly();
        try {
            // 判斷佇列是否滿
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                // 如果佇列滿了,則根據等待時間阻塞等待
                nanos = notFull.awaitNanos(nanos);
            }
            // 佇列沒滿直接入隊
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            // 喚醒條件物件上等待的執行緒
            if (c + 1 < capacity)
                notFull.signal();
        } finally { 
            putLock.unlock();
        }
        // 喚醒消費執行緒
        if (c == 0)
            signalNotEmpty();
        return true;
}

對於這個offer方法,我們重點來看看阻塞的這段程式碼

// 判斷佇列是否滿
while (count.get() == capacity) {
    if (nanos <= 0)
        return false;
    // 如果佇列滿根據阻塞的等待
    nanos = notFull.awaitNanos(nanos);
}

// CoditionObject(Codition的實現類)中的awaitNanos方法
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
        // 這裡是將當前新增執行緒封裝成node節點加入Condition的等待佇列中
        // 注意這裡的node是AQS的內部類Node
        Node node = addConditionWaiter();
        // 加入等待,那麼就釋放當前執行緒持有的鎖
        int savedState = fullyRelease(node);
        // 計算過期時間
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;

        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            // 主要看這裡!!由於是while 迴圈,這裡會不斷判斷等待時間
            // nanosTimeout 是否超時
            // static final long spinForTimeoutThreshold = 1000L;