1. 程式人生 > >BlockingQueue介面及實現類分析


1 BlockingQueue 介面及其實現類


  • put(E)  在佇列尾部放入元素,若佇列滿則等待;
  • take()  取佇列頭部元素返回,若佇列空則等待;
  • offer(E)  在佇列尾部放入元素,若成功則返回true, 否則false;不阻塞;
  • poll()      取佇列頭部元素返回,若有元素則返回元素,否則null;不阻塞;


  • ArrayBlockingQueue: 使用迴圈陣列實現佇列,由一把鎖控制put 和 take, 同一把鎖上實現了兩個等待佇列(Condition);
  • LinkedBlockingQueue: 使用連結串列實現佇列,由兩把鎖分別控制put 和 take,兩個等待佇列分別在兩把鎖上;
  • PriorityBlockingQueue: 使用小頂堆實現優先順序佇列,取出元素順序有Comparable, Comparator決定,多執行緒安全;

2 ArrayBlockingQueue 原始碼解讀


[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. public ArrayBlockingQueue(int capacity, boolean fair) {  
  2.        if (capacity <= 0)  
  3.            throw
    new IllegalArgumentException();  
  4.        this.items = (E[]) new Object[capacity];  
  5.        lock = new ReentrantLock(fair);  
  6.        notEmpty = lock.newCondition();  
  7.        notFull =  lock.newCondition();  
  8.    }  


[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** The queued items  */
  2. privatefinal E[] items;  
  3. /** items index for next take, poll or remove */
  4. privateint takeIndex;  
  5. /** items index for next put, offer, or add. */
  6. privateint putIndex;  
  7. /** Number of items in the queue */
  8. privateint count;  
  9. /* 
  10.  * Concurrency control uses the classic two-condition algorithm 
  11.  * found in any textbook. 
  12.  */
  13. /** Main lock guarding all access */
  14. privatefinal ReentrantLock lock;  
  15. /** Condition for waiting takes */
  16. privatefinal Condition notEmpty;  
  17. /** Condition for waiting puts */
  18. privatefinal Condition notFull;  

put(E e)方法的原始碼如下。進行put操作之前,必須獲得鎖並進行加鎖操作,以保證執行緒安全性。加鎖後,若發現佇列已滿,則呼叫notFull.await()方法,如當前執行緒陷入等待。直到其他執行緒take走某個元素後,會呼叫notFull.signal()方法來啟用該執行緒。啟用之後,繼續下面的插入操作。

[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.      * Inserts the specified element at the tail of this queue, waiting 
  3.      * for space to become available if the queue is full. 
  4.      * 
  5.      */
  6.     publicvoid put(E e) throws InterruptedException {  
  7.         //不能存放 null  元素
  8.         if (e == nullthrownew NullPointerException();  
  9.         final E[] items = this.items;   //陣列佇列
  10.         final ReentrantLock lock = this.lock;  
  11.         //加鎖
  12.         lock.lockInterruptibly();  
  13.         try {  
  14.             try {  
  15.                 //當佇列滿時,呼叫notFull.await()方法,使該執行緒阻塞。
  16.                 //直到take掉某個元素後,呼叫notFull.signal()方法啟用該執行緒。
  17.                 while (count == items.length)  
  18.                     notFull.await();  
  19.             } catch (InterruptedException ie) {  
  20.                 notFull.signal(); // propagate to non-interrupted thread
  21.                 throw ie;  
  22.             }  
  23.             //把元素 e 插入到隊尾
  24.             insert(e);  
  25.         } finally {  
  26.             //解鎖
  27.             lock.unlock();  
  28.         }  
  29.     }  
insert(E e) 方法如下: [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.    * Inserts element at current put position, advances, and signals. 
  3.    * Call only when holding lock. 
  4.    */
  5.   privatevoid insert(E x) {  
  6.       items[putIndex] = x;    
  7. //下標加1或者等於0
  8.       putIndex = inc(putIndex);  
  9.       ++count;  //計數加1
  10. //若有take()執行緒陷入阻塞,則該操作啟用take()執行緒,繼續進行取元素操作。
  11. //若沒有take()執行緒陷入阻塞,則該操作無意義。
  12.       notEmpty.signal();  
  13.   }  
  14. **  
  15.    * Circularly increment i.  
  16.    */  
  17.   finalint inc(int i) {  
  18. //此處可以看到使用了迴圈佇列
  19.       return (++i == items.length)? 0 : i;  
  20.   }  
take()方法程式碼如下。take操作和put操作相反,故不作詳細介紹。 [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. public E take() throws InterruptedException {  
  2.         final ReentrantLock lock = this.lock;  
  3.         lock.lockInterruptibly();  //加鎖
  4.         try {  
  5.             try {  
  6.                 //當佇列空時,呼叫notEmpty.await()方法,使該執行緒阻塞。
  7.                 //直到take掉某個元素後,呼叫notEmpty.signal()方法啟用該執行緒。
  8.                 while (count == 0)  
  9.                     notEmpty.await();  
  10.             } catch (InterruptedException ie) {  
  11.                 notEmpty.signal(); // propagate to non-interrupted thread
  12.                 throw ie;  
  13.             }  
  14.             //取出隊頭元素
  15.             E x = extract();  
  16.             return x;  
  17.         } finally {  
  18.             lock.unlock();  //解鎖
  19.         }  
  20.     }  
extract() 方法如下: [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.      * Extracts element at current take position, advances, and signals. 
  3.      * Call only when holding lock. 
  4.      */
  5.     private E extract() {  
  6.         final E[] items = this.items;  
  7.         E x = items[takeIndex];  
  8.         items[takeIndex] = null;  
  9.         takeIndex = inc(takeIndex);  
  10.         --count;  
  11.         notFull.signal();  
  12.         return x;  
  13.     }  

3 LinkedBlockingQueue 原始碼解讀


 LinkedBlockingQueue 類中定義的變數有:

[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** The capacity bound, or Integer.MAX_VALUE if none */
  2. privatefinalint capacity;  
  3. /** Current number of elements */
  4. privatefinal AtomicInteger count = new AtomicInteger(0);  
  5. /** Head of linked list */
  6. privatetransient Node<E> head;  
  7. /** Tail of linked list */
  8. privatetransient Node<E> last;  
  9. /** Lock held by take, poll, etc */
  10. privatefinal ReentrantLock takeLock = new ReentrantLock();  
  11. /** Wait queue for waiting takes */
  12. privatefinal Condition notEmpty = takeLock.newCondition();  
  13. /** Lock held by put, offer, etc */
  14. privatefinal ReentrantLock putLock = new ReentrantLock();  
  15. /** Wait queue for waiting puts */
  16. privatefinal Condition notFull = putLock.newCondition();  
該類中定義了兩個ReentrantLock鎖:putLock和takeLock,分別用於put端和take端。也就是說,生成端和消費端各自獨立擁有一把鎖,避免了讀(take)寫(put)時互相競爭鎖的情況。 [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.      * Inserts the specified element at the tail of this queue, waiting if 
  3.      * necessary for space to become available. 
  4.      */
  5.     publicvoid put(E e) throws InterruptedException {  
  6.         if (e == nullthrownew NullPointerException();  
  7.         // Note: convention in all put/take/etc is to preset local var
  8.         // holding count negative to indicate failure unless set.
  9.         int c = -1;  
  10.         final ReentrantLock putLock = this.putLock;  
  11.         final AtomicInteger count = this.count;  
  12.         putLock.lockInterruptibly(); //加 putLock 鎖
  13.         try {  
  14.             /* 
  15.              * Note that count is used in wait guard even though it is 
  16.              * not protected by lock. This works because count can 
  17.              * only decrease at this point (all other puts are shut 
  18.              * out by lock), and we (or some other waiting put) are 
  19.              * signalled if it ever changes from 
  20.              * capacity. Similarly for all other uses of count in 
  21.              * other wait guards. 
  22.              */
  23.             //當佇列滿時,呼叫notFull.await()方法釋放鎖,陷入等待狀態。
  24.             //有兩種情況會啟用該執行緒
  25.             //第一、 某個put執行緒新增元素後,發現佇列有空餘,就呼叫notFull.signal()方法啟用阻塞執行緒
  26.             //第二、 take執行緒取元素時,發現佇列已滿。則其取出元素後,也會呼叫notFull.signal()方法啟用阻塞執行緒
  27.             while (count.get() == capacity) {   
  28.                     notFull.await();  
  29.             }  
  30.             // 把元素 e 新增到佇列中(隊尾)
  31.             enqueue(e);  
  32.             c = count.getAndIncrement();  
  33.             //發現佇列未滿,呼叫notFull.signal()啟用阻塞的put執行緒(可能存在)
  34.             if (c + 1 < capacity)  
  35.                 notFull.signal();  
  36.         } finally {  
  37.             putLock.unlock();  
  38.         }  
  39.         if (c == 0)  
  40.             //佇列空,說明已經有take執行緒陷入阻塞,故呼叫signalNotEmpty啟用阻塞的take執行緒
  41.             signalNotEmpty();  
  42.     }  
enqueue(E e)方法如下: [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.  * Creates a node and links it at end of queue. 
  3.  * @param x the item 
  4.  */
  5. privatevoid enqueue(E x) {  
  6.     // assert putLock.isHeldByCurrentThread();
  7.     last = last.next = new Node<E>(x);  
  8. }  
[java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. public E take() throws InterruptedException {  
  2.        E x;  
  3.        int c = -1;  
  4.        final AtomicInteger count = this.count;  
  5.        final ReentrantLock takeLock = this.takeLock;  
  6.        takeLock.lockInterruptibly();  
  7.        try {  
  8.                while (count.get() == 0) {  
  9.                    notEmpty.await();  
  10.                }  
  11.            x = dequeue();  
  12.            c = count.getAndDecrement();  
  13.            if (c > 1)  
  14.                notEmpty.signal();  
  15.        } finally {  
  16.            takeLock.unlock();  
  17.        }  
  18.        if (c == capacity)  
  19.            signalNotFull();  
  20.        return x;  
  21.    }  
dequeue()方法如下: [java] view plain copy print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.  * Removes a node from head of queue. 
  3.  * @return the node 
  4.  */
  5. private E dequeue() {  
  6.     // assert takeLock.isHeldByCurrentThread();
  7.     Node<E> h = head;  
  8.     Node<E> first = h.next;  
  9.     h.next = h; // help GC
  10.     head = first;  
  11.     E x = first.item;  
  12.     first.item = null;  
  13.     return x;  
  14. }  


4 SynchronousQueue, ArrayBlockingQueue 和 LinkedBlockingQueue 效能對比

  • 執行緒多(>20),Queue長度長(>30),使用LinkedBlockingQueue

  • 執行緒少 (<20) ,Queue長度短 (<30) , 使用SynchronousQueue


  • 少用ArrayBlcokingQueue,似乎沒找到它的好處,高手給給建議吧!
