1. 程式人生 > >Java同步數據結構之LinkedBlockingQueue

Java同步數據結構之LinkedBlockingQueue

應該 同步問題 無限 waiting cal 上一個 ued 原來 private

前言

比起ArrayBlockingQueue,LinkedBlockingQueue應該是最被大家常用的阻塞隊列,LinkedBlockingQueue是基於鏈表的一種可選容量的阻塞隊列,也就是說,在構造LinkedBlockingQueue實例的時候,你可以像ArrayBlockingQueue那樣指定隊列大小,也可以不指定大小(這時候默認就是Integer.MAX_VALUE),指定隊列的大小是為了防止隊列過度的擴張,導致內存被過度占用或溢出。鏈表的節點是在每一次插入時動態的創建的,除非這會導致隊列超出容量限制。LinkedBlockingQueue的容量在實例被構造完成之後也是不允許被更改的。

與ArrayBlockingQueue一樣LinkedBlockingQueue不允許插入null值,也是先進先出FIFO隊列,隊列的頭部是隊列中存在時間最長的元素,新元素被插入到隊尾,隊列出隊從頭部開始。與ArrayBlockingQueue相比,LinkedBlockingQueue通常具有更高的吞吐量,但在大多數並發應用程序中性能的可預測性較差。

LinkedBlockingQueue采用了“雙鎖隊列” 算法,元素的入隊和出隊分別由putLock、takeLock兩個獨立的可重入鎖來實現。所以比起ArrayBlockingQueue明顯提高了吞吐量。

源碼分析

先看看其成員變量:

技術分享圖片
 1
static class Node<E> { 2 E item; 3 4 /** 5 * One of: 6 * - the real successor Node 7 * - this Node, meaning the successor is head.next 8 * - null, meaning there is no successor (this is the last node) 9 */ 10 Node<E> next; 11 12 Node(E x) { item = x; }
13 } 14 15 /** The capacity bound, or Integer.MAX_VALUE if none */ 16 private final int capacity; 17 18 /** Current number of elements */ 19 private final AtomicInteger count = new AtomicInteger(); 20 21 /** 22 * Head of linked list. 23 * Invariant: head.item == null 24 */ 25 transient Node<E> head; 26 27 /** 28 * Tail of linked list. 29 * Invariant: last.next == null 30 */ 31 private transient Node<E> last; 32 33 /** Lock held by take, poll, etc */ 34 private final ReentrantLock takeLock = new ReentrantLock(); 35 36 /** Wait queue for waiting takes */ 37 private final Condition notEmpty = takeLock.newCondition(); 38 39 /** Lock held by put, offer, etc */ 40 private final ReentrantLock putLock = new ReentrantLock(); 41 42 /** Wait queue for waiting puts */ 43 private final Condition notFull = putLock.newCondition();
View Code

上面的Node節點內部類顯然就是用於實現鏈表的節點實體,item就是當前節點攜帶的真正對象,next指向下一個節點。head、last分別表示鏈表的首尾節點,值得註意的是,在LinkedBlockingQueue內部實現的時候,head節點不會參與到鏈表的實體綁定,也就是說,真正的有效節點掛載都在head節點之後,所以head.item 永遠都為null。takeLock和putLock兩把鎖以及各自的Condition實例分別用於隊列元素的出隊和入隊,可以看到表示隊列當前元素個數的count是由一個原子變量來保存的,這是為了避免在維護該變量的時候需要同時獲取takeLock、putLock兩個鎖。當然LinkedBlockingQueue內部還是有一些方法需要同時獲取兩個鎖才能執行,後面會介紹。

LinkedBlockingQueue實例在構造的時候可以指定容量也可以不指定,另外和ArrayBlockingQueue一樣也可以在初始化的時候用一個指定的集合初始化隊列:

技術分享圖片
 1 public LinkedBlockingQueue(int capacity) {
 2     if (capacity <= 0) throw new IllegalArgumentException();
 3     this.capacity = capacity;
 4     last = head = new Node<E>(null); //初始化首尾節點
 5 }
 6 
 7 public LinkedBlockingQueue(Collection<? extends E> c) {
 8     this(Integer.MAX_VALUE);
 9     final ReentrantLock putLock = this.putLock;
10     putLock.lock(); // Never contended, but necessary for visibility
11     try {
12         int n = 0;
13         for (E e : c) {
14             if (e == null)
15                 throw new NullPointerException();
16             if (n == capacity)
17                 throw new IllegalStateException("Queue full");
18             enqueue(new Node<E>(e));
19             ++n;
20         }
21         count.set(n);
22     } finally {
23         putLock.unlock();
24     }
25 }
View Code

通過以上的構造方法可見一開始首尾節點其實是同一個節點,使用一個集合構造實例的時候,容量是無限的即Integer.MAX_VALUE,在入隊操作之前先獲取putLock,再循環遍歷每一個元素一個一個的入隊,一旦隊列滿了就會拋出IllegalStateException異常。

可阻塞入隊操作:

技術分享圖片
 1 public void put(E e) throws InterruptedException {
 2     if (e == null) throw new NullPointerException();
 3     // Note: convention in all put/take/etc is to preset local var
 4     // holding count negative to indicate failure unless set.
 5     int c = -1;
 6     Node<E> node = new Node<E>(e);
 7     final ReentrantLock putLock = this.putLock;
 8     final AtomicInteger count = this.count;
 9     putLock.lockInterruptibly();
10     try {
11         /*
12          * Note that count is used in wait guard even though it is
13          * not protected by lock. This works because count can
14          * only decrease at this point (all other puts are shut
15          * out by lock), and we (or some other waiting put) are
16          * signalled if it ever changes from capacity. Similarly
17          * for all other uses of count in other wait guards.
18          */
19         while (count.get() == capacity) {
20             notFull.await();
21         }
22         enqueue(node);
23         c = count.getAndIncrement();
24         if (c + 1 < capacity)
25             notFull.signal();
26     } finally {
27         putLock.unlock();
28     }
29     if (c == 0)
30         signalNotEmpty();
31 }
View Code

通過入隊的元素構造一個Node實例,入隊先獲取putLock,如果隊列滿了就等待,入隊完成之後如果隊列還沒有滿就喚醒其它可能被阻塞的入隊操作,然後釋放putLock。註意在最後如果隊列從空變成非空還要喚醒消費線程即阻塞在takeLock鎖的線程(即signalNotEmpty方法)。另一個入隊方法offer的原理大同小異就不介紹了,實現都差不多,但最終也都是會調用enqueue做入隊操作:

 1 /**
 2  * Links node at end of queue.
 3  *
 4  * @param node the node
 5  */
 6 private void enqueue(Node<E> node) {
 7     // assert putLock.isHeldByCurrentThread();
 8     // assert last.next == null;
 9     last = last.next = node;
10 }

入隊操作其實就是將原來的尾節點的next指向新加入的節點,並且把這個新加入的節點設置成尾節點,由於初始化的時候head==last,所以第一個節點其實就是掛載到head.next的,最終的數據結構如下:head->n1->n2->n3......

可阻塞出隊操作:

技術分享圖片
 1 public E take() throws InterruptedException {
 2     E x;
 3     int c = -1;
 4     final AtomicInteger count = this.count;
 5     final ReentrantLock takeLock = this.takeLock;
 6     takeLock.lockInterruptibly();
 7     try {
 8         while (count.get() == 0) {
 9             notEmpty.await();//隊列為空等待
10         }
11         x = dequeue(); //獲取頭元素
12         c = count.getAndDecrement();
13         if (c > 1) //隊列中至少還有一個元素,喚醒其它可能被阻塞的出隊操作
14             notEmpty.signal();
15     } finally {
16         takeLock.unlock();
17     }
18     if (c == capacity) //隊列從滿變成不滿狀態,喚醒其它可能被阻塞的入隊操作
19         signalNotFull();
20     return x;
21 }
View Code

先獲取takeLock鎖,隊列為空則阻塞等待,隊列不為空時獲取一個頭元素,如果隊列中至少還有一個元素的話就喚醒其它可能被阻塞的出隊操作,註意這裏的c-1才是當前真正的隊列中元素的個數,所以c == capacity表示的就是隊列從滿變成了不滿狀態,所以需要喚醒其它可能被阻塞的入隊操作。另外的出隊方法poll原理差不多,但最終也都是調用dequeue做出隊操作:

 1 private E dequeue() {
 2     // assert takeLock.isHeldByCurrentThread();
 3     // assert head.item == null;
 4     Node<E> h = head;
 5     Node<E> first = h.next; //頭節點的next才是真正的第一個節點
 6     h.next = h; // help GC
 7     head = first; 
 8     E x = first.item;
 9     first.item = null;
10     return x;
11 }

出隊的邏輯很巧妙,由於head.next才是真正的第一個節點,所以拿到第一個節點的數據item之後,會讓原來的head.next指向自己,把本該被移除的第一個節點的item清空,然後把它變成新的head節點。這樣保證head.next才是真正的第一個有效節點,而將原來的head.next指向自己是為了讓GC能夠快速的回收(為什麽不將head.next直接置為null 是為了在叠代器叠代到當前節點時區分到底是被take了還是隊列已經結束,如果為null 就被被判斷為隊列結束)。可以說移除的其實是head節點,而原來的head.next成了新的head。

這裏這種移除節點的方式很關鍵,為什麽出入隊操作使用兩個獨立的鎖不會出問題呢?因為take 操作拿到的雖然確實是head.next即第一個有效節點的數據,但是真正移除的節點確是head節點,原來那個本應該被移除的節點還存在於隊列中只是它變成了head ,試想現在的隊列只有一個元素即隊列長這樣:head -> n1 ,現在兩個線程同時分別進行take 和put 操作,put會將新節點n2掛到n1.next, take 拿走n1.item,移除head,最終變成這樣: n1 (head) -> n2. 因此並不會存在同步問題。

內部節點出隊操作:

技術分享圖片
 1 public boolean remove(Object o) {
 2     if (o == null) return false;
 3     fullyLock();
 4     try {
 5         for (Node<E> trail = head, p = trail.next;
 6              p != null;
 7              trail = p, p = p.next) {
 8             if (o.equals(p.item)) {
 9                 unlink(p, trail); 
10                 return true;
11             }
12         }
13         return false;
14     } finally {
15         fullyUnlock();
16     }
17 }
View Code

 1 void unlink(Node<E> p, Node<E> trail) {
 2     // assert isFullyLocked();
 3     // p.next is not changed, to allow iterators that are
 4     // traversing p to maintain their weak-consistency guarantee.
 5     p.item = null; //僅僅是將被移除的p節點的數據移除,並沒有設置其next為空
 6     trail.next = p.next;
 7     if (last == p)
 8         last = trail;
 9     if (count.getAndDecrement() == capacity)
10         notFull.signal();
11 }

remove(object)可以移除指定的元素,該元素可能位於鏈表的任何位置。值得註意的是,這裏獲取了fullyLock,不允許所有的其它出入隊操作,因為在移除節點的時候會破壞鏈表的結構,這時候如果有出入隊操作很可能會導致元素被掛載到被刪除的節點後面,或者將head指向被刪除的節點,從而導致鏈表節點丟失。unlink方法就是用於將要移除的節點從鏈表中斷開,並讓它的上一個節點指向它的下一個節點。如果隊列從滿變成不滿狀態,那麽喚醒可能阻塞的入隊操作。這裏並沒有改變被移除節點的next指向,這是為了保證剛好叠代到p節點的叠代器能夠繼續往下叠代操作,而不會因為節點的移除而導致叠代器中途停止,即所謂的弱一致。值得註意的是,不論是從頭節點出出隊還是內部節點移除,都沒有將它們的next指向重置為null其實都是為了方便叠代器實現.

除了remove,還有clear、contains、toArray、toString方法都會在操作的時候回去fullLock,這些方法會阻塞所有的其它方法,所以執行這些方法無疑會降低吞吐量的,在並發量高的場景盡量少使用。

而方法 drainTo() 即從隊列中移除全部的(或指定量的)元素到指定的集合中,僅僅是獲取了takeLock,因為該方法其實也僅僅是出隊操作,只是它是通過從head節點遍歷的方式來轉移節點。

叠代器

LinkedBlockingQueue的叠代器實現比起ArrayBlockingQueue簡單的多,但叠代的實現也有相類似的地方,例如在創建叠代器的時候就已經拿到了第一個有效節點的元素,每一次執行next的時候又準備好下一次叠代的返回對象,同ArrayBlockingQueue一樣,它也有一個lastRet變量用來暫時存儲當前叠代的節點,用於在it.next調用完成之後,調用it.remove()時避免刪除不應該刪除的元素。

技術分享圖片
 1 private class Itr implements Iterator<E> {
 2     /*
 3      * Basic weakly-consistent iterator.  At all times hold the next
 4      * item to hand out so that if hasNext() reports true, we will
 5      * still have it to return even if lost race with a take etc.
 6      */
 7 
 8     private Node<E> current;
 9     private Node<E> lastRet;
10     private E currentElement;
11 
12     Itr() {
13         fullyLock();
14         try {
15             current = head.next;
16             if (current != null)
17                 currentElement = current.item;
18         } finally {
19             fullyUnlock();
20         }
21     }
22 
23     public boolean hasNext() {
24         return current != null;
25     }
View Code

很簡單,在創建叠代器實例時直接拿到head.next即第一個有效節點,以及其數據 currentElement,hasNext直接判斷current不為空即可,這也是為了保證叠代器的弱一致性,如果hasNext為true,那麽next一定會返回非空的對象。

 1 /**
 2      * Returns the next live successor of p, or null if no such.
 3      *
 4      * Unlike other traversal methods, iterators need to handle both:
 5      * - dequeued nodes (p.next == p)
 6      * - (possibly multiple) interior removed nodes (p.item == null)
 7      */
 8     private Node<E> nextNode(Node<E> p) {
 9         for (;;) {
10             Node<E> s = p.next;
11             if (s == p) //如果s節點作為首個有效節點已經出隊
12                 return head.next; //直接返回新的第一個有效節點
13             if (s == null || s.item != null)
14                 return s; //s就是正常的下一個有效節點,為null表示結束
15             p = s; //s.item ==null 說明已經被remove方法移除了,繼續找它的下一個節點
16         }
17     }
18 
19     public E next() {
20         fullyLock();
21         try {
22             if (current == null)
23                 throw new NoSuchElementException();
24             E x = currentElement;
25             lastRet = current; //保留當前遍歷的節點,為接下來調用it.remove時使用。
26             current = nextNode(current);
27             currentElement = (current == null) ? null : current.item;
28             return x;
29         } finally {
30             fullyUnlock();
31         }
32     }

next方法沒有什麽好說的,而nextNode方法很關鍵,它會處理兩種情況,1、當前節點p的下一個節點已經作為首個有效節點出隊了,即p.next == p,這時候下一個節點其實就是新的首節點即head.next。2、如果當前節點的下一個節點被內部刪除了即通過remove (object)移除,那麽s.next不為空,但是s.item為空(具體請看unlink方法),所以需要繼續尋找s.next節點,這裏使用了無條件的for自旋,就可以跳過這種中間的一個或多個被remove方法移除的節點。

叠代器it.remove方法就不貼源碼了,很簡單,獲取fullLocl,根據事先保存的next的返回節點lastRet遍歷整個隊列,發現了就unlink,沒有發現就什麽也不做。

LinkedBlockingQueue的叠代器保證了其弱一致性,除了首個有效節點在創建叠代器實例的時候就已經被保留下來之外(所以在獲取叠代器實例之後,就算移除了頭節點it.next也會返回該節點),隊列中其它節點的變更都能被叠代器同步更新。LinkedBlockimgQueue的叠代器少了ArrayBlockingQueue那樣很多精密的實現例如對於GC的友好性,所以使用多個叠代器實例可能內存性能有不可預測性。

可拆分叠代器Spliterator

LinkedBlockingQueue實現了自己的可拆分叠代器LBQSpliterator,從spliterator方法就可以看到:

public Spliterator<E> spliterator() {
    return new LBQSpliterator<E>(this);
}

可拆分叠代器的 tryAdvance、forEachRemaining、trySplit方法都是需要獲取fullLock的,所以註意對吞吐量的影響,tryAdvance獲取第一個item不為空的節點數據做指定的操作,forEachRemaining循環遍歷當前叠代器中所有沒有被移除的節點(item不為空)做指定的操作源碼都很簡單,就不貼代碼了,它的拆分方法trySplit相對來說有意思的多:

 1 public Spliterator<E> trySplit() {
 2     Node<E> h;
 3     final LinkedBlockingQueue<E> q = this.queue;
 4     int b = batch; //batch初始值為0
     //n第一次為1,第二次為2,依次加1,直到MAX_BATCH就固定下來
5 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; 6 if (!exhausted && //還有節點 7 ((h = current) != null || (h = q.head.next) != null) && 8 h.next != null) { 9 Object[] a = new Object[n]; 10 int i = 0; 11 Node<E> p = current; //上一次拆分的結尾 12 q.fullyLock(); 13 try { 14 if (p != null || (p = q.head.next) != null) { 15 do { 16 if ((a[i] = p.item) != null) //如果沒有被移除就放到數組中 17 ++i; 18 } while ((p = p.next) != null && i < n); //繼續從上一次拆分的結尾往後循環 19 } 20 } finally { 21 q.fullyUnlock(); 22 } 23 if ((current = p) == null) { //更新這一次的結尾到current 24 est = 0L; 25 exhausted = true; 26 } 27 else if ((est -= i) < 0L) //如果已經沒有元素了,設置est為0. 28 est = 0L; 29 if (i > 0) { //這一次拆出了元素,生成新的叠代器 30 batch = i; //更新batch 31 return Spliterators.spliterator 32 (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | 33 Spliterator.CONCURRENT); 34 } 35 } 36 return null; 37 }

LinkedBlockingQueue的叠代器拆分很特別,它不是像ArrayBlockingQueue那樣每次分一半,而是第一次只拆一個元素,第二次拆2個,第三次拆三個,依次內推,拆分的次數越多,後面的叠代器分的得元素越多,直到一個很大的數MAX_BATCH(33554432) ,後面的叠代器每次都分到這麽多的元素,拆分的實現邏輯很簡單,每一次拆分結束都記錄下拆分到哪個元素,下一次拆分從上次結束的位置繼續往下拆分,直到沒有元素可拆分了返回null。

總結

LinkedBlockingQueue使用了鏈表的方式實現隊列,還有一個專門的head節點,所有的有效節點都移除掛載到head節點之後,采用兩個獨立的可重入鎖分別對出入隊進行加鎖,而不像ArrayBlockingQueue那樣所有操作都需要唯一的鎖,所以吞吐量有了很大的提高,這也是LinkedBlockingQueue最被廣泛使用的原因吧,但是它還是有很多方法(remove ,clear ,toArray, toString以及叠代器相關的方法)需要同時獲取兩個鎖才能操作,這無疑會影響吞吐量,所以要合理使用。另外它在實現的時候創建了額外的Node節點實例來綁定真實數據,所以對內存的消耗稍微要多一些。

Java同步數據結構之LinkedBlockingQueue