1. 程式人生 > >java 併發(五)---AbstractQueuedSynchronizer(5)

java 併發(五)---AbstractQueuedSynchronizer(5)

BlockingQueue

概述

       blockingQueue 是個介面,從名字上看就可以知道它是個阻塞佇列,裡面定義了增刪改查的方法。四種不同的方法用於不同的場景中使用:

1、丟擲異常;

2、返回特殊值(null 或 true/false,取決於具體的操作);

3、阻塞等待此操作,直到這個操作成功;

4、阻塞等待此操作,直到成功或者超時指定時間。總結如下:

 

blockqueueTable

        下面我們重點看一下 put 方法 和 take 方法 。這兩個方法都會阻塞。

 

 

ArrayBlockQueue

         ArrayBlockQueue佇列使用了一個鎖和兩個ConditionObject 物件來控制的物件的put 和 take 。

原始碼分析

  1     public void put(E e) throws InterruptedException {
  2         checkNotNull(e);
  3         final ReentrantLock lock = this.lock;
  4
//獲取鎖,要是獲取不到就會放在sync queue ,可中斷 5 lock.lockInterruptibly(); 6 try { 7 while (count == items.length) 8 //當前數量達到了滿的狀態,向 NotFull wait queue 尾部加入一個元素 等待喚醒 9 notFull.await(); 10 enqueue(e); 11 } finally { 12 lock.unlock(); 13
} 14 }

 

  1     public void put(E e) throws InterruptedException {
  2         checkNotNull(e);
  3         final ReentrantLock lock = this.lock;
  4         //獲取鎖,要是獲取不到就會放在sync queue ,可中斷 
  5         lock.lockInterruptibly();
  6         try {
  7             while (count == items.length)
  8             	//當前數量達到了滿的狀態,向 NotFull wait queue 尾部加入一個元素 等待喚醒
  9                 notFull.await();
 10             enqueue(e);
 11         } finally {
 12             lock.unlock();
 13         }
 14     }
  1         /**
  2      * Inserts element at current put position, advances, and signals.
  3      * Call only when holding lock.
  4      *
  5      *   NotEmpty wait queue 佇列喚醒,就是NotEmpty wait queue 裡面要是有元素的話
  6      *   就會上移一個元素到同步佇列
  7      *
  8      */
  9     private void enqueue(E x) {
 10         // assert lock.getHoldCount() == 1;
 11         // assert items[putIndex] == null;
 12         final Object[] items = this.items;
 13         items[putIndex] = x;
 14         if (++putIndex == items.length)
 15             putIndex = 0;
 16         count++;
 17         notEmpty.signal();
 18     }
  1      //可以看到返回值是是從物件陣列中取出一個數據,同時喚醒 NotFull wait queue()
  2      private E dequeue() {
  3         // assert lock.getHoldCount() == 1;
  4         // assert items[takeIndex] != null;
  5         final Object[] items = this.items;
  6         @SuppressWarnings("unchecked")
  7         E x = (E) items[takeIndex];
  8         items[takeIndex] = null;
  9         if (++takeIndex == items.length)
 10             takeIndex = 0;
 11         count--;
 12         if (itrs != null)
 13             itrs.elementDequeued();
 14         notFull.signal();
 15         return x;
 16     }

 

 

LinkedBlockQueue

         LinkedBlockQueue 使用了兩個鎖,一把 takeLock 和一把 putLock ,還有兩個ConditionObject 物件來併發控制。

原始碼分析

         這裡假設有一個零件小H需要進入工廠加工的情景。

  1     //小H將準備進入工廠被加工了,目前在工廠外排隊
  2     public void put(E e) throws InterruptedException {
  3         if (e == null) throw new NullPointerException();
  4         // Note: convention in all put/take/etc is to preset local var
  5         // holding count negative to indicate failure unless set.
  6         int c = -1;
  7         Node<E> node = new Node<E>(e);
  8         final ReentrantLock putLock = this.putLock;
  9         final AtomicInteger count = this.count;
 10         //獲取鎖
 11         putLock.lockInterruptibly();
 12         try {
 13             /*
 14              * Note that count is used in wait guard even though it is
 15              * not protected by lock. This works because count can
 16              * only decrease at this point (all other puts are shut
 17              * out by lock), and we (or some other waiting put) are
 18              * signalled if it ever changes from capacity. Similarly
 19              * for all other uses of count in other wait guards.
 20              */
 21             while (count.get() == capacity) {
 22                 //notFull wait queue 容量滿了,阻塞
 23                 notFull.await();
 24             }
 25             //加入佇列,兩種情況走到這裡 : 1.別喚醒,由於容量滿了 2.沒滿直接加入進來 
 26             enqueue(node);
 27             c = count.getAndIncrement();
 28             //要是此時,容量還有空餘通知後面的進來
 29             if (c + 1 < capacity)
 30                 notFull.signal();
 31         } finally {
 32             //釋放鎖
 33             putLock.unlock();
 34         }
 35         //裡面只有本來是沒有元素的,現在加入了一個,喚醒 NotEmpty 
 36         if (c == 0)
 37             signalNotEmpty();
 38 
 39     }

 

  1     //小H進入了工廠,將要被拿去加工了
  2     public E take() throws InterruptedException {
  3         E x;
  4         int c = -1;
  5         final AtomicInteger count = this.count;
  6         final ReentrantLock takeLock = this.takeLock;
  7         //獲得 takeLock 
  8         takeLock.lockInterruptibly();
  9         try {
 10             //要是此時容量是空的,空的必定是取不出東西,那麼阻塞
 11             while (count.get() == 0) {
 12                 notEmpty.await();
 13             }
 14             //取出一個元素
 15             x = dequeue();
 16             c = count.getAndDecrement();
 17             //只要容量大於1 ,喚醒 notEmpty,意味著“通知下一位進來被加工”
 18             if (c > 1)
 19                 notEmpty.signal();
 20         } finally {
 21             takeLock.unlock();
 22         }
 23         //假如此時容量已滿,既然拿出一個被加工了,那麼意味著可以通知外面排隊的進來多點被加工
 24         if (c == capacity)
 25             signalNotFull();
 26         return x;
 27     }

          take 方法注意一下,當容量沒有元素的時候就會呼叫 notEmpty.wait() , ConditionObject的 wait 方法會釋放掉當前執行緒所有的鎖,並在wait queue 中阻塞。(此處不理解,先要了解 ConditionObject 的用法 ),所有當有多個執行緒在容量沒元素的情況下去獲取元素,都會阻塞,等待喚醒。

 

cascading notifies

         LinkedBlockQueue 開頭有一段註釋,為了最小限度地使put 操作獲取takeLock 和 putLock ,使用了cascading notifies 。什麼意思呢?

  1     /*
  2      * A variant of the "two lock queue" algorithm.  The putLock gates
  3      * entry to put (and offer), and has an associated condition for
  4      * waiting puts.  Similarly for the takeLock.  The "count" field
  5      * that they both rely on is maintained as an atomic to avoid
  6      * needing to get both locks in most cases. Also, to minimize need
  7      * for puts to get takeLock and vice-versa, cascading notifies are
  8      * used. When a put notices that it has enabled at least one take,
  9      * it signals taker. That taker in turn signals others if more
 10      * items have been entered since the signal. And symmetrically for
 11      * takes signalling puts. Operations such as remove(Object) and
 12      * iterators acquire both locks.
 13      *
 14      * Visibility between writers and readers is provided as follows:
 15      *
 16      * Whenever an element is enqueued, the putLock is acquired and
 17      * count updated.  A subsequent reader guarantees visibility to the
 18      * enqueued Node by either acquiring the putLock (via fullyLock)
 19      * or by acquiring the takeLock, and then reading n = count.get();
 20      * this gives visibility to the first n items.
 21      *
 22      * To implement weakly consistent iterators, it appears we need to
 23      * keep all Nodes GC-reachable from a predecessor dequeued Node.
 24      * That would cause two problems:
 25      * - allow a rogue Iterator to cause unbounded memory retention
 26      * - cause cross-generational linking of old Nodes to new Nodes if
 27      *   a Node was tenured while live, which generational GCs have a
 28      *   hard time dealing with, causing repeated major collections.
 29      * However, only non-deleted Nodes need to be reachable from
 30      * dequeued Nodes, and reachability does not necessarily have to
 31      * be of the kind understood by the GC.  We use the trick of
 32      * linking a Node that has just been dequeued to itself.  Such a
 33      * self-link implicitly means to advance to head.next.
 34      */

        從字面上意思翻譯過來就是“流水式通知/喚醒” ,我們設想一下要是 這是一個生產和消費佇列,有多個消費者,當生產以後,馬上就通知了一下消費者,那麼我的put 每回都要獲取 兩把鎖(一把用於放進元素,一把用於通知 NotEmpty queue 有元素進來了)。為了儘量地減少鎖的獲取,在put 方法 :

  1     //put 方法 
  2     if (c == 0)
  3        signalNotEmpty();
  4 
  5 
  6     //take 方法 
  7     if (c > 1)
  8         notEmpty.signal();

        只有在第一個元素進來時就去喚醒,後面的要是多個消費者阻塞在 take 方法中,有一個被喚醒消費後,就會完成後又喚醒下一個,直至消費完成。

  

self-link

        自連結。在註釋的最後一段闡述了這個問題。為了實現“弱一致性的”迭代器,我們需要讓所有從被刪除的前任(前面指向你的節點)節點中的所有節點都是 可回收的,即被刪除了都應該是可回收的。那麼這樣就會出現兩個問題 :

  • 可能導致一個懷有惡意的迭代器會佔用大量內容空間
  • 節點跨帶引用問題

         以下解釋一下出現這兩個問題的原因。

      

    以下圖片和部分分析來自參考資料,感謝那位作者,寫得很不錯!

 

sl1

兩個新元素A和B入佇列,在Young Gen

 

sl2

A和B出佇列,新元素C D E入佇列,這時候A和B還在Young Gen,在minor gc的時候直接回收掉

 

sl3

C元素進入Old Gen

 

sl

C元素出佇列,但是是在Old Gen,需要Major GC才會回收,而Major GC發生的頻率比較低,C會在Old Gen保留比較長時間

 

sl5

D到J都已經出佇列,但是由於有Old Gen的C的引用,在minor GC的時候不會回收

 

sl6

       D-I全部進入Old Gen

 

   跨代引用造成的後果是大量本應該在Minor GC回收的物件進入Old Gen,在Minor GC的時候需要複製大量的物件,在Major的時候需要回收更多物件,而且還不好並行回收,因此GC壓力很大。

 

        這是 LinkedBlockQueue 的迭代器。

  1     public Iterator<E> iterator() {
  2         return new Itr();
  3     }
  4 
  1     private class Itr implements Iterator<E> {
  2         /*
  3          * Basic weakly-consistent iterator.  At all times hold the next
  4          * item to hand out so that if hasNext() reports true, we will
  5          * still have it to return even if lost race with a take etc.
  6          */
  7 
  8         private Node<E> current;
  9         private Node<E> lastRet;
 10         private E currentElement;
 11 
 12         Itr() {
 13             fullyLock();
 14             try {
 15                 current = head.next;
 16                 if (current != null)
 17                     currentElement = current.item;
 18             } finally {
 19                 fullyUnlock();
 20             }
 21         }
 22 
 23         public boolean hasNext() {
 24             return current != null;
 25         }
 26 
 27         /**
 28          * Returns the next live successor of p, or null if no such.
 29          *
 30          * Unlike other traversal methods, iterators need to handle both:
 31          * - dequeued nodes (p.next == p)
 32          * - (possibly multiple) interior removed nodes (p.item == null)
 33          */
 34         private Node<E> nextNode(Node<E> p) {
 35             for (;;) {
 36                 Node<E> s = p.next;
 37                 if (s == p)
 38                     return head.next;
 39                 if (s == null || s.item != null)
 40                     return s;
 41                 p = s;
 42             }
 43         }
 44 
 45         public E next() {
 46             fullyLock();
 47             try {
 48                 if (current == null)
 49                     throw new NoSuchElementException();
 50                 E x = currentElement;
 51                 lastRet = current;
 52                 current = nextNode(current);
 53                 currentElement = (current == null) ? null : current.item;
 54                 return x;
 55             } finally {
 56                 fullyUnlock();
 57             }
 58         }
  1 private E dequeue() {
  2     // assert takeLock.isHeldByCurrentThread();
  3     // assert head.item == null;
  4     Node<E> h = head;
  5     Node<E> first = h.next;
  6     h.next = h; // help GC
  7     head = first;
  8     E x = first.item;
  9     first.item = null;
 10     return x;
 11 }

        說到併發集合的迭代器不得不提到官方文件中這段話。就是他們的 Iterators 和 Spliterators 提供的 “弱一致性”相比於 “fast-fail”的區別 :  weakly consistent iterators

 

Most concurrent Collection implementations (including most Queues) also differ from the usual java.util conventions in that their Iterators and Spliterators provide weakly consistent rather than fast-fail traversal:

  • they may proceed concurrently with other operations
  • they will never throw ConcurrentModificationException
  • they are guaranteed to traverse elements as they existed upon construction exactly once, and may (but are not guaranteed to) reflect any modifications subsequent to construction.

 

          再回到原始碼,生成迭代器的時候只是返回一個Itr物件,在Itr的構造方法內會呼叫next 方法。我們設想以下的情景 :

 

  1. 佇列中最開始有A B C D四個元素
  2. 這個時候生成迭代器,current指向A,currentElement值為A
  3. 迭代還沒開始,A B C出佇列,且都是self-link,佇列中只剩下D
  4. 由於A還有current引用,B和C 沒有其他引用,這個時候如果GC了B和C可以回收掉
  5. 開始迭代,由於current指向A,不為空,且currentElement的值為A,因此A肯定會輸出,然後再輸出D,這裡就體現了weakly consistent,A已經出佇列,但是迭代的時候卻還在。

 

      因此一個簡單的self-link就解決了上面所說單向連結串列的跨代GC問題。如果把h.next = h改成h.next = null可以嗎?還是考慮上面的情況,在2中current指向A,但是A指向null,3和4都沒問題,GC正常;但是5的時候會出問題,current指向A,不為空,且currentElement的值為A,因此A還是會輸出;在nextNode(A)函式中Node<E> s = p.next;為null,s==null成立,直接返回null,迭代結束,不會輸出D。
      總結來說,self-link解決了兩個問題:1. GC跨代引用問題 2. 作為已經出佇列的元素的標識,這裡可以看Node類中的註釋,和開頭貼的註釋的最後一句:self-link含蓄地暗示要跳到head.next。

 

 

ArrayBlockQueue  和 LinkedBlockQueue  的異同

  •   同 :
    - 都是支援有界的(bounded)、阻塞的(blocking)的佇列,按照FIFO(先進先出)原則出現元素
    - 內部實現都是使用到了ReentranLock 和 ConditionObject 
    - 都適用於生產者-消費者的模式
  • 異 :
    - ArrayBlockingQueue 初始化時可以指定大小,LinkBlockingQueue可以不指定,預設為 65535
    - ArrayBlockingQueue 是基於陣列操作的,常常在大多數併發場景下比 LinkBlockingQueue(基於連結串列操作的)有更好的效能--                              官方文件
    - ArrayBlockingQueue 使用一把鎖控制放入和拿出,而 LinkBlockingQueue 使用兩把鎖控制放入和拿出(即鎖分離)

 

 

SynchronousQueue

          首先看一下構造方法。同步佇列的原理就是一個生產者對應一個消費者,一一對應,等不到就阻塞,等到了就牽走。

  1     /**
  2      * Creates a {@code SynchronousQueue} with nonfair access policy.
  3      */
  4     public SynchronousQueue() {
  5         this(false);
  6     }
  7 
  8     /**
  9      * Creates a {@code SynchronousQueue} with the specified fairness policy.
 10      *
 11      * @param fair if true, waiting threads contend in FIFO order for
 12      *        access; otherwise the order is unspecified.
 13      */
 14     public SynchronousQueue(boolean fair) {
 15         transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
 16     }

        同步佇列分為公平模式和非公平模式,可以看到預設的非公平採用的 TransferStack ,而公平模式採用的是 TransferQueue,同步佇列的操作實際上就是在這個 transfer 中進行操作的,我們先來看一下預設的非公平模式  ,既然是非公平模式,那麼一定不是按順序。

        Stack和Queue 各自的優勢是什麼?(官方文件)

 

The performance of the two is generally similar. Fifo usually supports higher throughput under
contention but Lifo maintains higher thread locality in common applications.

 

 

        我們首先看原始碼之前需要知道 : FULFILL 指的是一個消費者和一個生產者進行配對。

        我們假設有這麼樣的一個流程 : put –> put –> take –> take

         思路跟著流程走一遍。先看非公平模式下的

TransferStack

  1    public void put(E e) throws InterruptedException {
  2         if (e == null) throw new NullPointerException();
  3         if (transferer.transfer(e, false, 0) == null) {
  4             Thread.interrupted();
  5             throw new InterruptedException();
  6         }
  7     }

 

  1     /**
  2      * Retrieves and removes the head of this queue, waiting if necessary
  3      * for another thread to insert it.
  4      *
  5      * @return the head of this queue
  6      * @throws InterruptedException {@inheritDoc}
  7      */
  8     public E take() throws InterruptedException {
  9         E e = transferer.transfer(null, false, 0);
 10         if (e != null)
 11             return e;
 12         Thread.interrupted();
 13         throw new InterruptedException();
 14     }

 

  1 
  2  		@SuppressWarnings("unchecked")
  3         E transfer(E e, boolean timed, long nanos) {
  4             /*
  5              * Basic algorithm is to loop trying one of three actions:
  6              *
  7              * 1. If apparently empty or already containing nodes of same
  8              *    mode, try to push node on stack and wait for a match,
  9              *    returning it, or null if cancelled.
 10              *
 11              * 2. If apparently containing node of complementary mode,
 12              *    try to push a fulfilling node on to stack, match
 13              *    with corresponding waiting node, pop both from
 14              *    stack, and return matched item. The matching or
 15              *    unlinking might not actually be necessary because of
 16              *    other threads performing action 3:
 17              *
 18              * 3. If top of stack already holds another fulfilling node,
 19              *    help it out by doing its match and/or pop
 20              *    operations, and then continue. The code for helping
 21              *    is essentially the same as for fulfilling, except
 22              *    that it doesn't return the item.
 23              */
 24 
 25             SNode s = null; // constructed/reused as needed
 26             //確定模式,模式的作用是識別是消費者還是生產者
 27             int mode = (e == null) ? REQUEST : DATA;
 28 
 29             for (;;) {
 30                 SNode h = head;
 31                 //我們的流程是先兩次put ,那麼兩次肯定走這裡
 32                 if (h == null || h.mode == mode) {  // empty or same-mode
 33                 	//時間到了
 34                     if (timed && nanos <= 0) {      // can't wait
 35                         if (h != null && h.isCancelled())
 36                             casHead(h, h.next);     // pop cancelled node
 37                         else
 38                             return null;
 39 
 40                     //時間沒到,cas成功後阻塞
 41                     } else if (casHead(h, s = snode(s, e, h, mode))) {
 42                         SNode m = awaitFulfill(s, timed, nanos);
 43                         if (m == s) {               // wait was cancelled
 44                             clean(s);
 45                             return null;
 46                         }
 47 
 48                         if ((h = head) != null && h.next == s)
 49                             casHead(h, s.next);     // help s's fulfiller
 50                         return (E) ((mode == REQUEST) ? m.item : s.item);
 51                     }
 52                 } else if (!isFulfilling(h.mode)) { // try to fulfill  當前不是fulfill模式
 53                     if (h.isCancelled())            // already cancelled
 54                         casHead(h, h.next);         // pop and retry
 55                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {   //建立一個模式為fullfill的節點,head指向它
 56                         for (;;) { // loop until matched or waiters disappear
 57                             SNode m = s.next;       // m is s's match
 58                             if (m == null) {        // all waiters are gone
 59                                 casHead(s, null);   // pop fulfill node
 60                                 s = null;           // use new node next time
 61                                 break;              // restart main loop
 62                             }
 63                             SNode mn = m.next;
 64                             //這是就是真正匹配的地方,匹配不到,說明有人在競爭,那麼跳過這一個,迴圈繼續!!繼續搶
 65                             if (m.tryMatch(s)) {
 66                                 casHead(s, mn);     // pop both s and m  匹配到了,head指標指向匹配到的元素的後面一個元素
 67                                 return (E) ((mode == REQUEST) ? m.item : s.item);
 68                             } else                  // lost match
 69                                 s.casNext(m, mn);   // help unlink
 70                         }
 71                     }
 72                 } else {                            // help a fulfiller
 73                     //走到這說明有人正在匹配,幫助它快點匹配,增加 CAS 的次數
 74                     SNode m = h.next;               // m is h's match
 75                     if (m == null)                  // waiter is gone
 76                         casHead(h, null);           // pop fulfilling node
 77                     else {
 78                         SNode mn = m.next;
 79                         if (m.tryMatch(h))          // help match
 80                             casHead(h, mn);         // pop both h and m
 81                         else                        // lost match
 82                             h.casNext(m, mn);       // help unlink
 83                     }
 84                 }
 85             }
 86         }

         程式碼蒼白無力,上圖。

syncqueuesta

圖一 只有一個在匹配的情況

 

syncsta1

圖二  兩個消費者競爭匹配

           那麼有沒有可能出現下面的情況呢?

sycnsta3

            是不可能的,大家可以推一下。

            我們可以看到transferStack,新元素的進入都是和 Stack 一樣的,最後進的最先被消費。下面是阻塞的方法,看註釋

  1  SNode awaitFulfill(SNode s, boolean timed, long nanos) {
  2             /*
  3              * When a node/thread is about to block, it sets its waiter
  4              * field and then rechecks state at least one more time
  5              * before actually parking, thus covering race vs
  6              * fulfiller noticing that waiter is non-null so should be
  7              * woken.
  8              *
  9              * When invoked by nodes that appear at the point of call
 10              * to be at the head of the stack, calls to park are
 11              * preceded by spins to avoid blocking when producers and
 12              * consumers are arriving very close in time.  This can
 13              * happen enough to bother only on multiprocessors.
 14              *
 15              * The order of checks for returning out of main loop
 16              * reflects fact that interrupts have precedence over
 17              * normal returns, which have precedence over
 18              * timeouts. (So, on timeout, one last check for match is
 19              * done before giving up.) Except that calls from untimed
 20              * SynchronousQueue.{poll/offer} don't check interrupts
 21              * and don't wait at all, so are trapped in transfer
 22              * method rather than calling awaitFulfill.
 23              */
 24             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 25             Thread w = Thread.currentThread();
 26             int spins = (shouldSpin(s) ?
 27                          (timed ? maxTimedSpins : maxUntimedSpins) : 0);
 28             for (;;) {
 29                 if (w.isInterrupted())
 30                     s.tryCancel();
 31                 SNode m = s.match;
 32                 if (m != null)
 33                     return m;
 34                 if (timed) {
 35                     nanos = deadline - System.nanoTime();
 36                     if (nanos <= 0L) {
 37                         s.tryCancel();
 38                         continue;
 39                     }
 40                 }
 41                 if (spins > 0)
 42                     spins = shouldSpin(s) ? (spins-1) : 0;
 43                 else if (s.waiter == null)
 44                     s.waiter = w; // establish waiter so can park next iter
 45                 else if (!timed)
 46                     LockSupport.park(this);
 47                 else if (nanos > spinForTimeoutThreshold)
 48                     LockSupport.parkNanos(this, nanos);
 49             }
 50         }
 51 

          可以看到要是可以自旋就採取自旋的方式,提升效能。

 

TransferQueue

           公平模式下,使用 TransferQueue ,內部有一個Head 還有一個Tail ,新元素插入的都是在tail後面增加。而被消費的元素都是head ,這就是公平模式,一個個需要排隊進行。

          節點有一個 isData 變數用來識別是不是生產者還是消費者。

 

syncque

 

        下面原始碼分析來自 參考文章

 

  1 /**
  2  * Puts or takes an item.
  3  */
  4 Object transfer(Object e, boolean timed, long nanos) {
  5 
  6     QNode s = null; // constructed/reused as needed
  7     boolean isData = (e != null);
  8 
  9     for (;;) {
 10         QNode t = tail;
 11         QNode h = head;
 12         if (t == null || h == null)         // saw uninitialized value
 13             continue;                       // spin
 14 
 15         // 佇列空,或佇列中節點型別和當前節點一致,
 16         // 即我們說的第一種情況,將節點入隊即可。讀者要想著這塊 if 裡面方法其實就是入隊
 17         if (h == t || t.isData == isData) { // empty or same-mode
 18             QNode tn = t.next;
 19             // t != tail 說明剛剛有節點入隊,continue 即可
 20             if (t != tail)                  // inconsistent read
 21                 continue;
 22             // 有其他節點入隊,但是 tail 還是指向原來的,此時設定 tail 即可
 23             if (tn != null) {               // lagging tail
 24                 // 這個方法就是:如果 tail 此時為 t 的話,設定為 tn
 25                 advanceTail(t, tn);
 26                 continue;
 27             }
 28             // 
 29             if (timed && nanos <= 0)        // can't wait
 30                 return null;
 31             if (s == null)
 32                 s = new QNode(e, isData);
 33             // 將當前節點,插入到 tail 的後面
 34             if (!t.casNext(null, s))        // failed to link in
 35                 continue;
 36 
 37             // 將當前節點設定為新的 tail
 38             advanceTail(t, s);              // swing tail and wait
 39             // 看到這裡,請讀者先往下滑到這個方法,看完了以後再回來這裡,思路也就不會斷了
 40             Object x = awaitFulfill(s, e, timed, nanos);
 41             // 到這裡,說明之前入隊的執行緒被喚醒了,準備往下執行
 42             if (x == s) {                   // wait was cancelled
 43                 clean(t, s);
 44                 return null;
 45             }
 46 
 47             if (!s.isOffList()) {           // not already unlinked
 48                 advanceHead(t, s);          // unlink if head
 49                 if (x != null)              // and forget fields
 50                     s.item = s;
 51                 s.waiter = null;
 52             }
 53             return (x != null) ? x : e;
 54 
 55         // 這裡的 else 分支就是上面說的第二種情況,有相應的讀或寫相匹配的情況
 56         } else {                            // complementary-mode
 57             QNode m = h.next;               // node to fulfill
 58             if (t != tail || m == null || h != head)
 59                 continue;                   // inconsistent read
 60 
 61             Object x = m.item;
 62             if (isData == (x != null) ||    // m already fulfilled
 63                 x == m ||                   // m cancelled
 64                 !m.casItem(x, e)) {         // lost CAS
 65                 advanceHead(h, m);          // dequeue and retry
 66                 continue;
 67             }
 68 
 69             advanceHead(h, m);              // successfully fulfilled
 70             LockSupport.unpark(m.waiter);
 71             return (x != null) ? x : e;
 72         }
 73     }
 74 }
 75 
 76 void advanceTail(QNode t, QNode nt) {
 77     if (tail == t)
 78         UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
 79 }

 

     後續的方法大家可以認真研究,主要是這兩種資料型別的操作要搞清楚。

 

PriorityBlockingQueue

           priority是優先順序的意思,PriorityBlockQueue 底層使用二叉平衡樹。可以注意到最上面的節點是最小值,並且放在陣列的第一個的位置。

 

priority-blocking-queue-1

         

      帶排序的 BlockingQueue 實現,其併發控制採用的是 ReentrantLock,佇列為無界佇列(ArrayBlockingQueue 是有界佇列,LinkedBlockingQueue 也可以通過在建構函式中傳入 capacity 指定佇列最大的容量,但是 PriorityBlockingQueue 只能指定初始的佇列大小,後面插入元素的時候,如果空間不夠的話會自動擴容)。

 

     簡單地說,它就是 PriorityQueue 的執行緒安全版本。不可以插入 null 值,同時,插入佇列的物件必須是可比較大小的(comparable),否則報 ClassCastException 異常。它的插入操作 put 方法不會 block,因為它是無界佇列(take 方法在佇列為空的時候會阻塞)。

 

         既然是繼承了BlockQueue 介面,我們直接看 put 和 take 方法。

  1     public void put(E e) {
  2         offer(e); // never need to block
  3     }
  4 
  5 
  6     //先不考慮擴容
  7     public boolean offer(E e) {
  8         if (e == null)
  9             throw new NullPointerException();
 10         final ReentrantLock lock = this.lock;
 11         //獲得鎖
 12         lock.lock();
 13         int n, cap;
 14         Object[] array;
 15         while ((n = size) >= (cap = (array = queue).length))
 16             tryGrow(array, cap);
 17         try {
 18             //是否有比較器
 19             Comparator<? super E> cmp = comparator;
 20             //下面就是插入節點了
 21             if (cmp == null)
 22                 siftUpComparable(n, e, array);
 23             else
 24                 siftUpUsingComparator(n, e, array, cmp);
 25             size = n + 1;
 26             //有可能有消費者在阻塞
 27             notEmpty.signal();
 28         } finally {
 29             lock.unlock();
 30         }
 31         return true;
 32     }
 33 
 34 
 35 
 36     //節點插入(上浮---二叉樹的上浮操作)
 37     private static <T> void siftUpComparable(int k, T x, Object[] array) {
 38         Comparable<? super T> key = (Comparable<? super T>) x;
 39         while (k > 0) {
 40             int parent = (k - 1) >>> 1;
 41             Object e = array[parent];
 42             //它的節點比parent的大
 43             if (key.compareTo((T) e) >= 0)
 44                 break;
 45             //parent的節點比它大,所以它現在的位置應該放它parent 的節點,然後改變一下最終歸屬位置
 46             array[k] = e;
 47             k = parent;
 48         }
 49         //這裡得出的 k 是它最終應該在的位置,key是它的值
 50         array[k] = key;
 51     }
 52 

           主要插入的方法就是二叉樹的插入操作。

  1     public E take() throws InterruptedException {
  2         final ReentrantLock lock = this.lock;
  3         lock.lockInterruptibly();
  4         E result;
  5         try {
  6             while ( (result = dequeue()) == null)
  7                 notEmpty.await();
  8         } finally {
  9             lock.unlock();
 10         }
 11         return result;
 12     }
 13 
 14 
 15 
 16     private E dequeue() {
 17         int n = size - 1;
 18         if (n < 0)
 19             return null;
 20         else {
 21             Object[] array = queue;
 22             //拿第一個節點,看到retrun 返回這個值,那麼下面的操作應該就是維持二叉樹平衡
 23             E result = (E) array[0];
 24             E x = (E) array[n];
 25             array[n] = null;
 26             Comparator<? super E> cmp = comparator;
 27             if (cmp == null)
 28                 siftDownComparable(0, x, array, n);
 29             else
 30                 siftDownUsingComparator(0, x, array, n, cmp);
 31             size = n;
 32             //返回第一個節點
 33             return result;
 34         }
 35     }
 36 
 37     //下沉操作 (此時 k=0 x=最後一個節點的值 )
 38     private static <T> void siftDownComparable(int k, T x, Object[] array,
 39                                                int n) {
 40         if (n > 0) {
 41             //最後一個節點的位置
 42             Comparable<? super T> key = (Comparable<? super T>)x;
 43             int half = n >>> 1;           // loop while a non-leaf  子頁
 44             while (k < half) {
 45                 int child = (k << 1) + 1; // assume left child is least
 46                 Object c = array[child];
 47                 int right = child + 1;
 48                 if (right < n &&
 49                     ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
 50                     c = array[child = right];
 51                 if (key.compareTo((T) c) <= 0)
 52                     break;
 53                 array[k] = c;
 54                 k = child;
 55             }
 56             array[k] = key;
 57         }
 58     }
 59 

         出列,然後二叉平衡樹的下沉操作。

         現在看一下擴容。

  1    private void tryGrow(Object[] array, int oldCap) {
  2         lock.unlock(); // must release and then re-acquire main lock 先釋放鎖再獲得鎖
  3         Object[] newArray = null;  //釋放後有可能鎖被人搶了,所以下面擴容操作一定會有判斷措施
  4         if (allocationSpinLock == 0 &&
  5             UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
  6                                      0, 1)) {
  7             try {
  8                 int newCap = oldCap + ((oldCap < 64) ?
  9                                        (oldCap + 2) : // grow faster if small 
 10                                        (oldCap >> 1));
 11                 if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
 12                     int minCap = oldCap + 1;
 13                     if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
 14                         throw new OutOfMemoryError();
 15                     newCap = MAX_ARRAY_SIZE;
 16                 }
 17                 if (newCap > oldCap && queue == array)
 18                     newArray = new Object[newCap];
 19             } finally {
 20                 allocationSpinLock = 0;     //用於自旋的一個標誌
 21             }
 22         }
 23         //有可能別的執行緒人家正在操作,執行緒讓步
 24         if (newArray == null) // back off if another thread is allocating
 25             Thread.yield();
 26         lock.lock();
 27         //獲取鎖後有可能擴容大小被改,此時必須判斷
 28         if (newArray != null && queue == array) {
 29             queue = newArray;
 30             System.arraycopy(array, 0, newArray, 0, oldCap);
 31         }
 32     }
 33 

            其他方法可以再深入,下面看一下構造二叉樹的方法。在這個類的建構函式裡呼叫了heapify方法

  1 public PriorityBlockingQueue(Collection<? extends E> c)

        

  1     /**
  2      * Establishes the heap invariant (described above) in the entire tree,
  3      * assuming nothing about the order of the elements prior to the call.
  4      */
  5     private void heapify() {
  6         Object[] array = queue;
  7         int n = size;
  8         int half = (n >>> 1) - 1;
  9         Comparator<? super E> cmp = comparator;
 10         if (cmp == null) {
 11             for (int i = half; i >= 0; i--)
 12                 siftDownComparable(i, (E) array[i], array, n);
 13         }
 14         else {
 15             for (int i = half; i >= 0; i--)
 16                 siftDownUsingComparator(i, (E) array[i], array, n, cmp);
 17         }
 18     }

         可以看到就是一個個元素進行下沉操作。

 

 

 

參考資料 :