1. 程式人生 > >併發程式設計(八)—— Java 併發佇列 BlockingQueue 實現之 ArrayBlockingQueue 原始碼分析

併發程式設計(八)—— Java 併發佇列 BlockingQueue 實現之 ArrayBlockingQueue 原始碼分析

開篇先介紹下 BlockingQueue 這個介面的規則,後面再看其實現。

阻塞佇列概要

阻塞佇列與我們平常接觸的普通佇列(LinkedList或ArrayList等)的最大不同點,在於阻塞佇列的阻塞新增和阻塞刪除方法。

阻塞新增
所謂的阻塞新增是指當阻塞佇列元素已滿時,佇列會阻塞加入元素的執行緒,直佇列元素不滿時才重新喚醒執行緒執行元素加入操作。

阻塞刪除
阻塞刪除是指在佇列元素為空時,刪除佇列元素的執行緒將被阻塞,直到佇列不為空再執行刪除操作(一般都會返回被刪除的元素)。

 

由於Java中的阻塞佇列介面BlockingQueue繼承自Queue介面,因此先來看看阻塞佇列介面為我們提供的主要方法

 1 public interface BlockingQueue<E> extends Queue<E> {
 2 
 3     //將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量)
 4     //在成功時返回 true,如果此佇列已滿,則拋IllegalStateException。 
 5     boolean add(E e); 
 6 
 7     //將指定的元素插入到此佇列的尾部(如果立即可行且不會超過該佇列的容量) 
 8     // 將指定的元素插入此佇列的尾部,如果該佇列已滿, 
 9     //則在到達指定的等待時間之前等待可用的空間,該方法可中斷 
10 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 11 12 //將指定的元素插入此佇列的尾部,如果該佇列已滿,則一直等到(阻塞)。 13 void put(E e) throws InterruptedException; 14 15 //獲取並移除此佇列的頭部,如果沒有元素則等待(阻塞), 16 //直到有元素將喚醒等待執行緒執行該操作 17 E take() throws InterruptedException;
18 19 //獲取並移除此佇列的頭部,在指定的等待時間前一直等到獲取元素, //超過時間方法將結束 20 E poll(long timeout, TimeUnit unit) throws InterruptedException; 21 22 //從此佇列中移除指定元素的單個例項(如果存在)。 23 boolean remove(Object o); 24 }

這裡我們把上述操作進行分類

插入方法:

  add(E e) : 新增成功返回true,失敗拋IllegalStateException異常
  offer(E e) : 成功返回 true,如果此佇列已滿,則返回 false。
  put(E e) :將元素插入此佇列的尾部,如果該佇列已滿,則一直阻塞
刪除方法:

  remove(Object o) :移除指定元素,成功返回true,失敗返回false
  poll() : 獲取並移除此佇列的頭元素,若佇列為空,則返回 null
  take():獲取並移除此佇列頭元素,若沒有元素則一直阻塞。

阻塞佇列的對元素的增刪查操作主要就是上述的三類方法,通常情況下我們都是通過這3類方法操作阻塞佇列,瞭解完阻塞佇列的基本方法後,下面我們將分析阻塞佇列中的兩個實現類ArrayBlockingQueue和LinkedBlockingQueue的簡單使用和實現原理,其中實現原理是這篇文章重點分析的內容。

ArrayBlockingQueue

在看原始碼之前,通過查詢API發現對ArrayBlockingQueue特點的簡單介紹:

1、一個由陣列支援的有界佇列,此佇列按FIFO(先進先出)原則對元素進行排序。
2、新元素插入到佇列的尾部,佇列獲取操作則是從佇列頭部開始獲得元素
3、這是一個簡單的“有界快取區”,一旦建立,就不能在增加其容量
4、在向已滿佇列中新增元素會導致操作阻塞,從空佇列中提取元素也將導致阻塞
5、此類支援對等待的生產者執行緒和使用者執行緒進行排序的可選公平策略。預設情況下,不保證是這種排序的。然而通過將公平性(fairness)設定為true,而構造的佇列允許按照FIFO順序訪問執行緒。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。

簡單的來說,ArrayBlockingQueue 是一個用陣列實現的有界阻塞佇列,其內部按先進先出的原則對元素進行排序,其中put方法和take方法為新增和刪除的阻塞方法,下面我們通過ArrayBlockingQueue佇列實現一個生產者消費者的案例,通過該案例簡單瞭解其使用方式

使用示例

Consumer 消費者和 Producer 生產者,通過ArrayBlockingQueue 佇列獲取和新增元素,其中消費者呼叫了take()方法獲取元素當佇列沒有元素就阻塞,生產者呼叫put()方法新增元素,當佇列滿時就阻塞,通過這種方式便實現生產者消費者模式。比直接使用等待喚醒機制或者Condition條件佇列來得更加簡單。

 1 package com.zejian.concurrencys.Queue;
 2 import java.util.concurrent.ArrayBlockingQueue;
 3 import java.util.concurrent.TimeUnit;
 4 
 5 /**
 6  * Created by chenhao on 2018/01/07
 7  */
 8 public class ArrayBlockingQueueDemo {
 9     private final static ArrayBlockingQueue<Apple> queue= new ArrayBlockingQueue<>(1);
10     public static void main(String[] args){
11         new Thread(new Producer(queue)).start();
12         new Thread(new Producer(queue)).start();
13         new Thread(new Consumer(queue)).start();
14         new Thread(new Consumer(queue)).start();
15     }
16 }
17 
18  class Apple {
19     public Apple(){
20     }
21  }
22 
23 /**
24  * 生產者執行緒
25  */
26 class Producer implements Runnable{
27     private final ArrayBlockingQueue<Apple> mAbq;
28     Producer(ArrayBlockingQueue<Apple> arrayBlockingQueue){
29         this.mAbq = arrayBlockingQueue;
30     }
31 
32     @Override
33     public void run() {
34         while (true) {
35             Produce();
36         }
37     }
38 
39     private void Produce(){
40         try {
41             Apple apple = new Apple();
42             mAbq.put(apple);
43             System.out.println("生產:"+apple);
44         } catch (InterruptedException e) {
45             e.printStackTrace();
46         }
47     }
48 }
49 
50 /**
51  * 消費者執行緒
52  */
53 class Consumer implements Runnable{
54 
55     private ArrayBlockingQueue<Apple> mAbq;
56     Consumer(ArrayBlockingQueue<Apple> arrayBlockingQueue){
57         this.mAbq = arrayBlockingQueue;
58     }
59 
60     @Override
61     public void run() {
62         while (true){
63             try {
64                 TimeUnit.MILLISECONDS.sleep(1000);
65                 comsume();
66             } catch (InterruptedException e) {
67                 e.printStackTrace();
68             }
69         }
70     }
71 
72     private void comsume() throws InterruptedException {
73         Apple apple = mAbq.take();
74         System.out.println("消費Apple="+apple);
75     }
76 }

輸出:

1 生產:[email protected]
2 消費Apple=[email protected]
3 生產:[email protected]
4 生產:[email protected]
5 消費Apple=[email protected]
6 消費Apple=[email protected]
7 ........

原始碼剖析

ArrayBlockingQueue內部的阻塞佇列是通過重入鎖ReenterLock和Condition條件佇列實現的,所以ArrayBlockingQueue中的元素存在公平訪問與非公平訪問的區別,對於公平訪問佇列,被阻塞的執行緒可以按照阻塞的先後順序訪問佇列,即先阻塞的執行緒先訪問佇列。而非公平佇列,當佇列可用時,阻塞的執行緒將進入爭奪訪問資源的競爭中,也就是說誰先搶到誰就執行,沒有固定的先後順序。建立公平與非公平阻塞佇列程式碼如下:

 1 //預設非公平阻塞佇列
 2 ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
 3 //公平阻塞佇列
 4 ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);
 5 
 6 //構造方法原始碼
 7 public ArrayBlockingQueue(int capacity) {
 8      this(capacity, false);
 9  }
10 
11 public ArrayBlockingQueue(int capacity, boolean fair) {
12      if (capacity <= 0)
13          throw new IllegalArgumentException();
14      this.items = new Object[capacity];
15      lock = new ReentrantLock(fair);
16      notEmpty = lock.newCondition();
17      notFull =  lock.newCondition();
18  }

ArrayBlockingQueue的內部是通過一個可重入鎖ReentrantLock和兩個Condition條件物件來實現阻塞,這裡先看看其內部成員變數

 1 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
 2         implements BlockingQueue<E>, java.io.Serializable {
 3 
 4     /** 儲存資料的陣列 */
 5     final Object[] items;
 6 
 7     /**獲取資料的索引,主要用於take,poll,peek,remove方法 */
 8     int takeIndex;
 9 
10     /**新增資料的索引,主要用於 put, offer, or add 方法*/
11     int putIndex;
12 
13     /** 佇列元素的個數 */
14     int count;
15 
16 
17     /** 控制並非訪問的鎖 */
18     final ReentrantLock lock;
19 
20     /**notEmpty條件物件,用於通知take方法佇列已有元素,可執行獲取操作 */
21     private final Condition notEmpty;
22 
23     /**notFull條件物件,用於通知put方法佇列未滿,可執行新增操作 */
24     private final Condition notFull;
25 
26     /**
27        迭代器
28      */
29     transient Itrs itrs = null;
30 
31 }

從成員變數可看出,ArrayBlockingQueue內部確實是通過陣列物件items來儲存所有的資料,值得注意的是ArrayBlockingQueue通過一個ReentrantLock來同時控制新增執行緒與移除執行緒的並非訪問,這點與LinkedBlockingQueue區別很大(稍後會分析)。而對於notEmpty條件物件則是用於存放等待或喚醒呼叫take方法的執行緒,告訴他們佇列已有元素,可以執行獲取操作。同理notFull條件物件是用於等待或喚醒呼叫put方法的執行緒,告訴它們,佇列未滿,可以執行新增元素的操作。takeIndex代表的是下一個方法(take,poll,peek,remove)被呼叫時獲取陣列元素的索引,putIndex則代表下一個方法(put, offer, or add)被呼叫時元素新增到陣列中的索引。圖示如下

新增

 1 //add方法實現,間接呼叫了offer(e)
 2 public boolean add(E e) {
 3         if (offer(e))
 4             return true;
 5         else
 6             throw new IllegalStateException("Queue full");
 7     }
 8 
 9 //offer方法
10 public boolean offer(E e) {
11      checkNotNull(e);//檢查元素是否為null
12      final ReentrantLock lock = this.lock;
13      lock.lock();//加鎖
14      try {
15          if (count == items.length)//判斷佇列是否滿
16              return false;
17          else {
18              enqueue(e);//新增元素到佇列
19              return true;
20          }
21      } finally {
22          lock.unlock();
23      }
24  }
25 
26 //入隊操作
27 private void enqueue(E x) {
28     //獲取當前陣列
29     final Object[] items = this.items;
30     //通過putIndex索引對陣列進行賦值
31     items[putIndex] = x;
32     //索引自增,如果已是最後一個位置,重新設定 putIndex = 0;
33     if (++putIndex == items.length)
34         putIndex = 0;
35     count++;//佇列中元素數量加1
36     //喚醒呼叫take()方法的執行緒,執行元素獲取操作。
37     notEmpty.signal();
38 }

這裡的add方法和offer方法實現比較簡單,其中需要注意的是enqueue(E x)方法,當putIndex索引大小等於陣列長度時,需要將putIndex重新設定為0,因為後面講到的取值也是從陣列中第一個開始依次往後面取,取了之後會將原位置的值設定為null,方便迴圈put操作,這裡要注意並不是每次都是取陣列中的第一個值,takeIndex也會增加。因為做了新增操作,陣列中肯定不會空,則 notEmpty條件會喚醒take()方法取值。

ok~,接著看put方法,它是一個阻塞新增的方法:

 1 //put方法,阻塞時可中斷
 2  public void put(E e) throws InterruptedException {
 3      checkNotNull(e);
 4       final ReentrantLock lock = this.lock;
 5       lock.lockInterruptibly();//該方法可中斷
 6       try {
 7           //當佇列元素個數與陣列長度相等時,無法新增元素
 8           while (count == items.length)
 9               //將當前呼叫執行緒掛起,新增到notFull條件佇列中等待喚醒
10               notFull.await();
11           enqueue(e);//如果佇列沒有滿直接新增。。
12       } finally {
13           lock.unlock();
14       }
15   }

put方法是一個阻塞的方法,如果佇列元素已滿,那麼當前執行緒將會被notFull條件物件掛起加到等待佇列中,直到佇列有空檔才會喚醒執行新增操作。但如果佇列沒有滿,那麼就直接呼叫enqueue(e)方法將元素加入到陣列佇列中。到此我們對三個新增方法即put,offer,add都分析完畢,其中offer,add在正常情況下都是無阻塞的新增,而put方法是阻塞新增。

(獲取)刪除

關於刪除先看poll方法,該方法獲取並移除此佇列的頭元素,若佇列為空,則返回 null。

 1 public E poll() {
 2   final ReentrantLock lock = this.lock;
 3    lock.lock();
 4    try {
 5        //判斷佇列是否為null,不為null執行dequeue()方法,否則返回null
 6        return (count == 0) ? null : dequeue();
 7    } finally {
 8        lock.unlock();
 9    }
10 }
11  //刪除佇列頭元素並返回
12  private E dequeue() {
13      //拿到當前陣列的資料
14      final Object[] items = this.items;
15       @SuppressWarnings("unchecked")
16       //獲取要刪除的物件
17       E x = (E) items[takeIndex];
18       將陣列中takeIndex索引位置設定為null
19       items[takeIndex] = null;
20       //takeIndex索引加1並判斷是否與陣列長度相等,
21       //如果相等說明已到盡頭,恢復為0
22       if (++takeIndex == items.length)
23           takeIndex = 0;
24       count--;//佇列個數減1
25       if (itrs != null)
26           itrs.elementDequeued();//同時更新迭代器中的元素資料
27       //刪除了元素說明佇列有空位,喚醒notFull條件物件新增執行緒,執行新增操作
28       notFull.signal();
29       return x;
30  }

接著看take()方法,是一個阻塞方法,獲取佇列頭元素並刪除。

 1 //從佇列頭部刪除,佇列沒有元素就阻塞,可中斷
 2  public E take() throws InterruptedException {
 3     final ReentrantLock lock = this.lock;
 4       lock.lockInterruptibly();//中斷
 5       try {
 6           //如果佇列沒有元素
 7           while (count == 0)
 8               //執行阻塞操作
 9               notEmpty.await();
10           return dequeue();//如果佇列有元素執行刪除操作
11       } finally {
12           lock.unlock();
13       }
14  }

take和poll的區別是,佇列為空時,poll返回null,take則被掛起阻塞,直到有元素新增進來,take執行緒被喚醒,然後獲取第一個元素並刪除。

 

peek方法非常簡單,直接返回當前佇列的頭元素但不刪除任何元素。

 1 public E peek() {
 2       final ReentrantLock lock = this.lock;
 3       lock.lock();
 4       try {
 5        //直接返回當前佇列的頭元素,但不刪除
 6           return itemAt(takeIndex); // null when queue is empty
 7       } finally {
 8           lock.unlock();
 9       }
10   }
11 
12 final E itemAt(int i) {
13       return (E) items[i];
14   }

ok~,到此對於ArrayBlockingQueue的主要方法就分析完了。