1. 程式人生 > >BlockingQueue介面及實現類分析

BlockingQueue介面及實現類分析

1 BlockingQueue 介面及其實現類

BlocingQueue介面定義如下,僅列舉幾個常用方法:

  • put(E)  在佇列尾部放入元素,若佇列滿則等待;
  • take()  取佇列頭部元素返回,若佇列空則等待;
  • offer(E)  在佇列尾部放入元素,若成功則返回true, 否則false;不阻塞;
  • poll()      取佇列頭部元素返回,若有元素則返回元素,否則null;不阻塞;

實現BlockingQueue的常用介面:

  • ArrayBlockingQueue: 使用迴圈陣列實現佇列,由一把鎖控制put 和 take, 同一把鎖上實現了兩個等待佇列(Condition);
  • LinkedBlockingQueue: 使用連結串列實現佇列,由兩把鎖分別控制put 和 take,兩個等待佇列分別在兩把鎖上;
  • PriorityBlockingQueue: 使用小頂堆實現優先順序佇列,取出元素順序有Comparable, Comparator決定,多執行緒安全;

2 ArrayBlockingQueue 原始碼解讀

ArrayBlockingQueue建立的時候需要指定容量capacity(可以儲存的最大的元素個數,因為它不會自動擴容)。其中一個構造方法為:

[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. public ArrayBlockingQueue(int capacity, boolean fair) {  
  2.        if (capacity <= 0)  
  3.            throw
    new IllegalArgumentException();  
  4.        this.items = (E[]) new Object[capacity];  
  5.        lock = new ReentrantLock(fair);  
  6.        notEmpty = lock.newCondition();  
  7.        notFull =  lock.newCondition();  
  8.    }  

    ArrayBlockingQueue類中定義的變數有:

[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** The queued items  */
  2. privatefinal E[] items;  
  3. /** items index for next take, poll or remove */
  4. privateint takeIndex;  
  5. /** items index for next put, offer, or add. */
  6. privateint putIndex;  
  7. /** Number of items in the queue */
  8. privateint count;  
  9. /* 
  10.  * Concurrency control uses the classic two-condition algorithm 
  11.  * found in any textbook. 
  12.  */
  13. /** Main lock guarding all access */
  14. privatefinal ReentrantLock lock;  
  15. /** Condition for waiting takes */
  16. privatefinal Condition notEmpty;  
  17. /** Condition for waiting puts */
  18. privatefinal Condition notFull;  
使用陣列items來儲存元素,由於是迴圈佇列,使用takeIndex和putIndex來標記put和take的位置。可以看到,該類中只定義了一個鎖ReentrantLock,定義兩個Condition物件:notEmputy和notFull,分別用來對take和put操作進行所控制。注:本文主要講解put()和take()操作,其他方法類似。

put(E e)方法的原始碼如下。進行put操作之前,必須獲得鎖並進行加鎖操作,以保證執行緒安全性。加鎖後,若發現佇列已滿,則呼叫notFull.await()方法,如當前執行緒陷入等待。直到其他執行緒take走某個元素後,會呼叫notFull.signal()方法來啟用該執行緒。啟用之後,繼續下面的插入操作。

[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.      * Inserts the specified element at the tail of this queue, waiting 
  3.      * for space to become available if the queue is full. 
  4.      * 
  5.      */
  6.     publicvoid put(E e) throws InterruptedException {  
  7.         //不能存放 null  元素
  8.         if (e == nullthrownew NullPointerException();  
  9.         final E[] items = this.items;   //陣列佇列
  10.         final ReentrantLock lock = this.lock;  
  11.         //加鎖
  12.         lock.lockInterruptibly();  
  13.         try {  
  14.             try {  
  15.                 //當佇列滿時,呼叫notFull.await()方法,使該執行緒阻塞。
  16.                 //直到take掉某個元素後,呼叫notFull.signal()方法啟用該執行緒。
  17.                 while (count == items.length)  
  18.                     notFull.await();  
  19.             } catch (InterruptedException ie) {  
  20.                 notFull.signal(); // propagate to non-interrupted thread
  21.                 throw ie;  
  22.             }  
  23.             //把元素 e 插入到隊尾
  24.             insert(e);  
  25.         } finally {  
  26.             //解鎖
  27.             lock.unlock();  
  28.         }  
  29.     }  
insert(E e) 方法如下: [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.    * Inserts element at current put position, advances, and signals. 
  3.    * Call only when holding lock. 
  4.    */
  5.   privatevoid insert(E x) {  
  6.       items[putIndex] = x;    
  7. //下標加1或者等於0
  8.       putIndex = inc(putIndex);  
  9.       ++count;  //計數加1
  10. //若有take()執行緒陷入阻塞,則該操作啟用take()執行緒,繼續進行取元素操作。
  11. //若沒有take()執行緒陷入阻塞,則該操作無意義。
  12.       notEmpty.signal();  
  13.   }  
  14. **  
  15.    * Circularly increment i.  
  16.    */  
  17.   finalint inc(int i) {  
  18. //此處可以看到使用了迴圈佇列
  19.       return (++i == items.length)? 0 : i;  
  20.   }  
take()方法程式碼如下。take操作和put操作相反,故不作詳細介紹。 [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. public E take() throws InterruptedException {  
  2.         final ReentrantLock lock = this.lock;  
  3.         lock.lockInterruptibly();  //加鎖
  4.         try {  
  5.             try {  
  6.                 //當佇列空時,呼叫notEmpty.await()方法,使該執行緒阻塞。
  7.                 //直到take掉某個元素後,呼叫notEmpty.signal()方法啟用該執行緒。
  8.                 while (count == 0)  
  9.                     notEmpty.await();  
  10.             } catch (InterruptedException ie) {  
  11.                 notEmpty.signal(); // propagate to non-interrupted thread
  12.                 throw ie;  
  13.             }  
  14.             //取出隊頭元素
  15.             E x = extract();  
  16.             return x;  
  17.         } finally {  
  18.             lock.unlock();  //解鎖
  19.         }  
  20.     }  
extract() 方法如下: [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.      * Extracts element at current take position, advances, and signals. 
  3.      * Call only when holding lock. 
  4.      */
  5.     private E extract() {  
  6.         final E[] items = this.items;  
  7.         E x = items[takeIndex];  
  8.         items[takeIndex] = null;  
  9.         takeIndex = inc(takeIndex);  
  10.         --count;  
  11.         notFull.signal();  
  12.         return x;  
  13.     }  
小結:進行put和take操作,共用同一個鎖物件。也即是說,put和take無法並行執行!

3 LinkedBlockingQueue 原始碼解讀

基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。

 LinkedBlockingQueue 類中定義的變數有:

[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** The capacity bound, or Integer.MAX_VALUE if none */
  2. privatefinalint capacity;  
  3. /** Current number of elements */
  4. privatefinal AtomicInteger count = new AtomicInteger(0);  
  5. /** Head of linked list */
  6. privatetransient Node<E> head;  
  7. /** Tail of linked list */
  8. privatetransient Node<E> last;  
  9. /** Lock held by take, poll, etc */
  10. privatefinal ReentrantLock takeLock = new ReentrantLock();  
  11. /** Wait queue for waiting takes */
  12. privatefinal Condition notEmpty = takeLock.newCondition();  
  13. /** Lock held by put, offer, etc */
  14. privatefinal ReentrantLock putLock = new ReentrantLock();  
  15. /** Wait queue for waiting puts */
  16. privatefinal Condition notFull = putLock.newCondition();  
該類中定義了兩個ReentrantLock鎖:putLock和takeLock,分別用於put端和take端。也就是說,生成端和消費端各自獨立擁有一把鎖,避免了讀(take)寫(put)時互相競爭鎖的情況。 [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.      * Inserts the specified element at the tail of this queue, waiting if 
  3.      * necessary for space to become available. 
  4.      */
  5.     publicvoid put(E e) throws InterruptedException {  
  6.         if (e == nullthrownew NullPointerException();  
  7.         // Note: convention in all put/take/etc is to preset local var
  8.         // holding count negative to indicate failure unless set.
  9.         int c = -1;  
  10.         final ReentrantLock putLock = this.putLock;  
  11.         final AtomicInteger count = this.count;  
  12.         putLock.lockInterruptibly(); //加 putLock 鎖
  13.         try {  
  14.             /* 
  15.              * Note that count is used in wait guard even though it is 
  16.              * not protected by lock. This works because count can 
  17.              * only decrease at this point (all other puts are shut 
  18.              * out by lock), and we (or some other waiting put) are 
  19.              * signalled if it ever changes from 
  20.              * capacity. Similarly for all other uses of count in 
  21.              * other wait guards. 
  22.              */
  23.             //當佇列滿時,呼叫notFull.await()方法釋放鎖,陷入等待狀態。
  24.             //有兩種情況會啟用該執行緒
  25.             //第一、 某個put執行緒新增元素後,發現佇列有空餘,就呼叫notFull.signal()方法啟用阻塞執行緒
  26.             //第二、 take執行緒取元素時,發現佇列已滿。則其取出元素後,也會呼叫notFull.signal()方法啟用阻塞執行緒
  27.             while (count.get() == capacity) {   
  28.                     notFull.await();  
  29.             }  
  30.             // 把元素 e 新增到佇列中(隊尾)
  31.             enqueue(e);  
  32.             c = count.getAndIncrement();  
  33.             //發現佇列未滿,呼叫notFull.signal()啟用阻塞的put執行緒(可能存在)
  34.             if (c + 1 < capacity)  
  35.                 notFull.signal();  
  36.         } finally {  
  37.             putLock.unlock();  
  38.         }  
  39.         if (c == 0)  
  40.             //佇列空,說明已經有take執行緒陷入阻塞,故呼叫signalNotEmpty啟用阻塞的take執行緒
  41.             signalNotEmpty();  
  42.     }  
enqueue(E e)方法如下: [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.  * Creates a node and links it at end of queue. 
  3.  * @param x the item 
  4.  */
  5. privatevoid enqueue(E x) {  
  6.     // assert putLock.isHeldByCurrentThread();
  7.     last = last.next = new Node<E>(x);  
  8. }  
take()方法程式碼如下。take操作和put操作相反,故不作詳細介紹。
[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. public E take() throws InterruptedException {  
  2.        E x;  
  3.        int c = -1;  
  4.        final AtomicInteger count = this.count;  
  5.        final ReentrantLock takeLock = this.takeLock;  
  6.        takeLock.lockInterruptibly();  
  7.        try {  
  8.                while (count.get() == 0) {  
  9.                    notEmpty.await();  
  10.                }  
  11.            x = dequeue();  
  12.            c = count.getAndDecrement();  
  13.            if (c > 1)  
  14.                notEmpty.signal();  
  15.        } finally {  
  16.            takeLock.unlock();  
  17.        }  
  18.        if (c == capacity)  
  19.            signalNotFull();  
  20.        return x;  
  21.    }  
dequeue()方法如下: [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.  * Removes a node from head of queue. 
  3.  * @return the node 
  4.  */
  5. private E dequeue() {  
  6.     // assert takeLock.isHeldByCurrentThread();
  7.     Node<E> h = head;  
  8.     Node<E> first = h.next;  
  9.     h.next = h; // help GC
  10.     head = first;  
  11.     E x = first.item;  
  12.     first.item = null;  
  13.     return x;  
  14. }  

小結:take和put操作各有一把鎖,可並行讀取。


4 SynchronousQueue, ArrayBlockingQueue 和 LinkedBlockingQueue 效能對比

  • 執行緒多(>20),Queue長度長(>30),使用LinkedBlockingQueue

  • 執行緒少 (<20) ,Queue長度短 (<30) , 使用SynchronousQueue

當然,使用SynchronousQueue的時候不要忘記應用的擴充套件,如果將來需要進行擴充套件還是選擇LinkedBlockingQueue好,儘量把SynchronousQueue限制在特殊場景中使用。

  • 少用ArrayBlcokingQueue,似乎沒找到它的好處,高手給給建議吧!

參考文章: