1. 程式人生 > >ArrayBlockingQueue和LinkedBlockingQueue原始碼解析

ArrayBlockingQueue和LinkedBlockingQueue原始碼解析

ArrayBlockingQueue和LinkedBlockingQueue都是java.util.concurrent包中的阻塞佇列。

阻塞佇列就是支援阻塞的插入和移除的容量,即在容量滿時往BlockingQueue中新增資料時會造成阻塞,當容量為空時取元素操作會阻塞。內部的阻塞佇列是通過重入鎖ReenterLock和Condition條件佇列實現的。

看名字就可以知道他們的底層資料結構不同:

        ArrayBlockingQueue是由陣列結構組成的有界阻塞佇列

        LinkedBlockingQueue是由連結串列結構組成的有界阻塞佇列

下面看看原始碼:

        publicclass ArrayBlockingQueue<E> extends AbstractQueue<E>

       implements BlockingQueue<E>, java.io.Serializable {

   /**

    * Serialization ID. This class relies on default serialization

    * even for the items array, which is default-serialized, even if

    * it is empty. Otherwise it could not be declared final, which is

    * necessary here.

    */

   private static final long serialVersionUID = -817911632652898426L;

   /** The queued items */

   final Object[] items;

   /** items index for next take, poll, peek or remove */

   int takeIndex;

   /** items index for next put, offer, or add */

   int putIndex;

   /** Number of elements in the queue */

   int count;

   /*

    * Concurrency control uses the classic two-condition algorithm

    * found in any textbook.

    */

   /** Main lock guarding all access */

   final ReentrantLock lock;

   /** Condition for waiting takes */

   private final Condition notEmpty;

    /** Condition for waiting puts */

   private final Condition notFull;

        可以清楚ArrayBlockingQueue是一個阻塞式的佇列,繼承自AbstractQueue,。底層以陣列的形式儲存資料。而且實現的佇列中的鎖是沒有分離的,即生產和消費用的是同一個鎖。

實現的佇列中在生產和消費的時候,是直接將物件插入或移除的。

public class LinkedBlockingQueue<E>extends AbstractQueue<E>

       implements BlockingQueue<E>, java.io.Serializable {

   private static final long serialVersionUID = -6903933977591709194L;

   /*

    * A variant of the "two lock queue" algorithm.  The putLock gates

    * entry to put (and offer), and has an associated condition for

    * waiting puts.  Similarly for thetakeLock.  The "count" field

    * that they both rely on is maintained as an atomic to avoid

    * needing to get both locks in most cases. Also, to minimize need

    * for puts to get takeLock and vice-versa, cascading notifies are

    * used. When a put notices that it has enabled at least one take,

    * it signals taker. That taker in turn signals others if more

    * items have been entered since the signal. And symmetrically for

    * takes signalling puts. Operations such as remove(Object) and

    * iterators acquire both locks.

    *

    * Visibility between writers and readers is provided as follows:

    *

    * Whenever an element is enqueued, the putLock is acquired and

    * count updated.  A subsequentreader guarantees visibility to the

    * enqueued Node by either acquiring the putLock (via fullyLock)

    * or by acquiring the takeLock, and then reading n = count.get();

    * this gives visibility to the first n items.

    *

    * To implement weakly consistent iterators, it appears we need to

    * keep all Nodes GC-reachable from a predecessor dequeued Node.

    * That would cause two problems:

    * - allow a rogue Iterator to cause unbounded memory retention

    * - cause cross-generational linking of old Nodes to new Nodes if

    *   a Node was tenured while live,which generational GCs have a

    *   hard time dealing with,causing repeated major collections.

    * However, only non-deleted Nodes need to be reachable from

    * dequeued Nodes, and reachability does not necessarily have to

    * be of the kind understood by the GC. We use the trick of

    * linking a Node that has just been dequeued to itself.  Such a

    * self-link implicitly means to advance to head.next.

    */

   /**

    * Linked list node class

    */

   static class Node<E> {

       E item;

       /**

        * One of:

        * - the real successor Node

        * - this Node, meaning the successor is head.next

        * - null, meaning there is no successor (this is the last node)

        */

       Node<E> next;

        Node(E x) { item = x; }

    }

   /** The capacity bound, or Integer.MAX_VALUE if none */

   private final int capacity;

   /** Current number of elements */

   private final AtomicInteger count = new AtomicInteger(0);

   /**

    * Head of linked list.

    * Invariant: head.item == null

    */

   private transient Node<E> head;

   /**

    * Tail of linked list.

    * Invariant: last.next == null

    */

   private transient Node<E> last;

   /** Lock held by take, poll, etc */

   private final ReentrantLock takeLock = new ReentrantLock();

   /** Wait queue for waiting takes */

   private final Condition notEmpty = takeLock.newCondition();

   /** Lock held by put, offer, etc */

   private final ReentrantLock putLock = new ReentrantLock();

   /** Wait queue for waiting puts */

   private final Condition notFull = putLock.newCondition();

 LinkedBlockingQueue也是一個阻塞式的佇列,繼承自AbstractQueue,。底層以連結串列結構儲存資料。注意實現的佇列中的鎖是分離的,即生產用的是putLock,消費是takeLock。這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。  實現的佇列中在生產和消費的時候,需要把列舉物件轉換為Node<E>進行插入或移除,這產生了新的物件,造成了一定的開銷。