1. 程式人生 > >BlockingQueue介面及其實現類的原始碼分析

BlockingQueue介面及其實現類的原始碼分析

      BlockingQueue是一個阻塞佇列的介面,提供了一系列的介面方法。其中方法主要可以分為三類,包括Insert相關的add、offer、put,remove相關的remove()、poll()、take()方法,以及檢視相關的peek()、element方法等。阻塞佇列是執行緒安全的容器,其元素不允許為null,否則會丟擲空指標異常。阻塞佇列可以用於生產者消費者場景中去。

      常見的實現類有ArrayBlockingQueue以及LinkedBlockingQueue兩種,下面來詳細來看一下ArrayBlockingQueue以及LinkedBlockingQueue的具體實現。

  ArrayBlockingQueue重要引數及建構函式

  /** 佇列中元素的資料結構 */
    final Object[] items;

    /** 隊首標記,用於出隊操作 */
    int takeIndex;

    /** 隊尾標記,用於入隊操作 */
    int putIndex;

    /** 佇列中元素的個數 */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /**對於任何操作都需要加鎖,這裡是一個可重入鎖 */
    final ReentrantLock lock;

    /** 用於通知取元素的條件 */
    private final Condition notEmpty;

    /** 控制放元素的條件 */
    private final Condition notFull;
  /**
     * 創造一個給定容量的非公平鎖的阻塞佇列
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /**
     * 創造一個給定容量的陣列,並根據是否公平建立相應的公平鎖。初始化條件。
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

ArrayBlockingQueue入隊操作

      以常用的add方法為例子,在這個方法內部實際上是呼叫了offer方法,而offer方法在加鎖的基礎之上呼叫了enqueue方法來將元素放在陣列的尾部,並喚醒那些阻塞了的取元素方法。

add、offer、put的主要區別如下:

add方法在成功是返回true,如果失敗就丟擲異常。

offer方法在新增成功返回true,新增失敗返回false。

put方法如果新增不成功就會一直等待。不會出現丟節點的情況一般。

 /**
     * 將元素插入佇列中,如果成功則返回true,否則的話丟擲異常。可以看出其本身還是呼叫了offer方法。
     *
     * <p>This implementation returns <tt>true</tt> if <tt>offer</tt> succeeds,
     * else throws an <tt>IllegalStateException</tt>.
     *
     *  
     */
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
 /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.  This method is generally preferable to method {@link #add},
     * which can fail to insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);//判斷是否為空,如果元素為null,則丟擲異常
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
 /**
     * 將元素插入陣列尾部,並喚醒等待取元素的執行緒。
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
/**
     *向陣列中插入元素,如果沒有空間就等待。
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

ArrayBlockingQueue出隊操作

    出隊操作也是要加鎖的,以remove為例,從頭開始遍歷,一直到尾部標記的地方為止,當找到一個和所給元素相等的元素時,刪除這個節點的元素。至於take方法和poll方法都是刪除頭部元素,區別在於take方法會等待,而poll方法如果沒有元素可取則會直接返回null。

 /**
     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.
     * Returns {@code true} if this queue contained the specified element
     * (or equivalently, if this queue changed as a result of the call).
     *
     * <p>Removal of interior elements in circular array based queues
     * is an intrinsically slow and disruptive operation, so should
     * be undertaken only in exceptional circumstances, ideally
     * only when the queue is known not to be accessible by other
     * threads.
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)//當元素為0時,會自動阻塞到條件佇列中去。知道被其他方法喚醒。
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

 /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

ArrayBlockingQueue總結

       ArrayBlockingQueue本質上就是對一個數組進行操作,它嚴格不允許元素為null,但是這些操作都是加了鎖的操作,因此它們都是執行緒安全的,並且可以根據實際情況來選擇鎖的公平非公平,公平鎖可以嚴格保證執行緒不會飢餓,而非公平鎖則可以提高系統的吞吐量。並且由於它還是一個佇列,對於佇列的操作也大多數都是在頭部或者是尾部的操作。除了鎖之外,ArrayBlockingQueue還提供了兩個condition來實現等待操作,這裡的方法其實就是把那些被阻塞的執行緒放在Condition佇列中,然後當有signal操作是就喚醒最前面的執行緒執行。整體而言這些操作就是在陣列的基礎之上加鎖,相對簡單。

LinkedBlockingQueue原始碼分析

     類似與ArrayList和LinkedList,和ArrayBlockingQueue對應的是LinkedBlockingqueue,不同的地方在於一個底層是陣列實現,一個底層是當連結串列來實現。再就是一個底層只有一把鎖,而另一個有兩把鎖。首先來看一下其中的重要引數。

   LinkedBlockingQueue有兩把鎖,對應著隊尾和隊頭的操作,也就是說新增和刪除操作不互斥,這和上面的是不一樣的,因此其併發量要高於ArrayBlockingQueue。

 /**
     * 這裡的節點相對簡單,單向連結串列。
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** 記錄連結串列容量 */
    private final int capacity;

    /**元素數目 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 頭結點
     */
    transient Node<E> head;

    /**
     * 尾節點
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

LinkedBlockingQueue入隊操作

      以offer方法為例子,當我們嘗試入隊一個null時就會丟擲異常。其他情況下當容量不夠時就返回false,否則就可以給這個入隊操作進行加鎖,當元素個數小於容量時就新增節點到尾部然後給count+1,喚醒其他被阻塞的新增元素執行緒。這裡獲取的count是還沒有加一之前的值,因此它的值如果為0,那麼至少佇列中還是有一個元素的,可以喚醒消費執行緒消費了。

  /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.
     * When using a capacity-restricted queue, this method is generally
     * preferable to method {@link BlockingQueue#add add}, which can fail to
     * insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();//為null丟擲異常
        final AtomicInteger count = this.count;
        if (count.get() == capacity)  //  量不足返回false.
            return false;  
        int c = -1; 
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();//獲取put鎖
        try {
            if (count.get() < capacity) {//count小於容量在隊尾入隊
                enqueue(node);
                c = count.getAndIncrement();//count加一
                if (c + 1 < capacity)
                    notFull.signal();//仍然有剩餘容量,喚醒等待的put執行緒
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();//喚醒消費執行緒
        return c >= 0;
    }
 /**
     * 這個方法意味這last.next=node,也就是把node作為last的下一個節點。
     *然後將last=last.next,也就是把last向後移。
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
 /**
     * Signals a waiting take. Called only from put/offer (which do not
     * otherwise ordinarily lock takeLock.)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

LinkedBlockingQueue出隊操作

     先來看一下remove方法的出隊操作。由於remove方法要刪除的元素不固定,可能出現在佇列中的任何地方,因此需要同時鎖住隊首和隊尾兩把鎖,然後從頭到尾諸葛遍歷元素,當找到元素之後就刪除這個元素。這裡其實類似於單鏈表中的節點刪除,不同的地方在於要加鎖!

 /**
     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.
     * Returns {@code true} if this queue contained the specified element
     * (or equivalently, if this queue changed as a result of the call).
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }
   poll 方法返回隊頭元素並刪除之。如果佇列中沒有資料就返回null,否則的話就說明有元素,那麼就可以講一個元素出列,同時將count值減一,C>1意味著佇列中至少有一個元素,因此可以喚醒等待著的消費執行緒進行消費。當c的值等於容量時,此時c的實際值是容量減一,可以喚醒等待新增元素的執行緒進行新增。因為他們之前最有可能會被阻塞。
 public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();//獲得count值並將其減一
                if (c > 1)
                    notEmpty.signal();//至少有一個元素,喚醒等待著的消費執行緒。
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();喚醒
        return x;
    }

LinkedBlockingQueue總結

     LinkedBlockingQueue本質上是一個連結串列,區別於普通連結串列的地方在於它在隊首和隊尾的地方加了兩把鎖,分別對應於生產元素和消費元素,因此和獨佔鎖比起來會快一些(畢竟可以首尾同時操作),它本身的元素也是不允許為null的。

ArrayBlockingQueue和LinkedBlockingQueue比較

1.ABQ的底層是陣列實現,LBQ的底層是連結串列實現。

2.ABQ加鎖只加一把,並且是全域性的,而LBQ的鎖有兩把,分別對應著隊首和隊尾,同時也有兩個Condition佇列。也就是說,ABQ的取元素和放元素是互斥的,而LBQ則沒有相互關聯,因此就併發性而言,LBQ要優於ABQ。

3.ABQ 的容量是必須需要的,並且不可以擴容,而LBQ的大小可以指定,也可以不指定,不指定的情況下其值為最大整數值。

4.ABQ 支援公平鎖和非公平鎖,而LBQ不可指定,其本身內部使用的也是非公平鎖。

參考資料:

相關推薦

BlockingQueue介面及其實現原始碼分析

      BlockingQueue是一個阻塞佇列的介面,提供了一系列的介面方法。其中方法主要可以分為三類,包括Insert相關的add、offer、put,remove相關的remove()、poll()、take()方法,以及檢視相關的peek()、element方法等

併發Queue之BlockingQueue介面及其實現

1、下面先簡單介紹BlockingQueue介面的五個實現:ArrayBlockingQueue:基於陣列的阻塞佇列實現,在ArrayBlockingQueue內部,維護了一個定長的陣列,以便快取佇列中的資料物件,其內部沒實現讀寫分離,也就意味著生產和消費者不能完全並行。長度

java中List介面實現 ArrayList,LinkedList,Vector 的區別 list實現原始碼分析

java面試中經常被問到list常用的類以及內部實現機制,平時開發也經常用到list集合類,因此做一個原始碼級別的分析和比較之間的差異。 首先看一下List介面的的繼承關係: list介面繼承Col

BlockingQueue介面實現分析

1 BlockingQueue 介面及其實現類 BlocingQueue介面定義如下,僅列舉幾個常用方法: put(E)  在佇列尾部放入元素,若佇列滿則等待;take()  取佇列頭部元素返回,若佇列空則等待;offer(E)  在佇列尾部放入元素,若成功則返回tru

四,Java集合(2)——Set介面及其實現

1,Set介面及其實現類 Set集合與Collection基本相同,沒有提供任何額外的方法。實際上Set就是Collection,只是行為略有不同。Set集合不允許包含相同的元素,如果試圖把兩個相同的元素加入同一個Set集合中,新增操作失敗,add()方法返回

常用集合及其特性一之List介面及其實現

list介面: 1>元素有序(插入有序),元素可重複( 可重複的更確切概念即e1.equals(e2) )。 2>如果列表本身允許null元素的話,允許多個null值。 3>list藉口提供了特殊的迭代器ListIterater,除了允許Itera

Java容器學習筆記(二) Set介面及其實現的相關知識總結

在Java容器學習筆記(一)中概述了Collection的基本概念及介面實現,並且總結了它的一個重要子介面List及其子類的實現和用法。 本篇主要總結Set介面及其實現類的用法,包括HashSet(無序不重複),LinkedHashSet(按放入順序有序不重複),TreeS

netty中的發動機--EventLoop及其實現NioEventLoop的原始碼分析

EventLoop 在之前介紹Bootstrap的初始化以及啟動過程時,我們多次接觸了NioEventLoopGroup這個類,關於這個類的理解,還需要了解netty的執行緒模型。NioEventLoopGroup可以理解為一組執行緒,這些執行緒每一個都可以獨立地處理多個channel產生的io事件。 Nio

[五]載入機制雙親委派機制 底層程式碼實現原理 原始碼分析 java載入雙親委派機制是如何實現

Launcher啟動類 本文是雙親委派機制的原始碼分析部分,類載入機制中的雙親委派模型對於jvm的穩定執行是非常重要的 不過原始碼其實比較簡單,接下來簡單介紹一下 我們先從啟動類說起 有一個Launcher類   sun.misc.Launcher; 仔細看下這簡

commons-pool2原始碼走讀(一) 池物件PooledObject介面及其實現

commons-pool2原始碼走讀(一) 池物件PooledObject<T>介面及其實現 PooledObject<T>用來定義池物件的一個wrapper 介面,用於跟蹤物件的附加資訊,比如狀態、建立時間、使用時間等。這個類的實現必須是執行緒安全的。

java基礎庫學習(二.3)List子介面實現

List子介面的實現類:ArrayList/Vector/LinkedList List集合:元素有序。可重複的集合,List集合預設按元素的新增順序設定元素的索引,通過索引來訪問物件 List集合原始碼? public interface List<E> extends

java基礎庫學習(二.2)Set子介面實現

Set子介面的實現類:HashSet/LinkedHashSet/TreeSet/EnumSet/SortedSet Set子介面和Collection父介面原始碼對比? 1Collection父介面原始碼   public interface Collection&l

Java中的執行緒池及其實現ThreadPoolExecutor

前言:像我們連線資料庫一樣,需要不斷地建立連線,銷燬連線,如果都是人為地一個個建立和銷燬的話會很費勁,所以就誕生了資料庫連線池,執行緒池的產生也是同樣的道理。 執行緒池預先建立了若干數量的執行緒,並且不能由使用者直接對執行緒的建立進行控制,在這個前提下重複使用固定或較為固定數目的執行緒來完成任務

Java_59_陣列_模擬ArrayList容器的底層實現JDK原始碼分析程式碼

package cn.pmcse.myCollection; public class ArrayList {     private Object[] value;     private    in

介面實現裡使用@Override註解報錯

問題分析 @Override註解用來檢測子類對父類或介面的方法的重寫是否正確,但有一次我在Eclipse裡對介面的實現類裡使用@Override註解卻報錯,不過在父類的子類裡使用該註解卻是正常的。 百度了下才知道原來這是jdk1.5時的一個bug,在1.6時已經被修復;那麼問題來了,我使用的jdk是1.8

caffe Layer基原始碼分析

建構函式 //標頭檔案 include/caffe/layer.hpp //實現檔案 src/caffe/layer.cpp // src/caffe/layer.cu /*

介面呼叫實現&& 為什麼Autowired定義在介面上

1、介面與回撥 package edu.cqu.interfaceTest; import java.awt.Toolkit; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.u

Java中List及其實現的解析

集合: 集合,集合是java中提供的一種容器,可以用來儲存多個數據。集合的長度是可變的,集合中儲存的元素必須是引用型別資料。 集合繼承關係圖:          List:     &n

JavaEE開發service層為什麼要分介面實現

面向介面開發。多人分模組開發時,寫service(業務層)的人將介面定義好提交到SVN,其它層的人直接可以呼叫介面方法,而寫service層的人也可以通過實現類寫具體方法邏輯。達到多人同時開發。

為什麼需要一個介面,一個介面實現,而不是直接呼叫裡的方法

作者:Dion連結:https://www.zhihu.com/question/20111251/answer/14012223來源:知乎著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。“介面是個規範”,這句沒錯。“不如直接就在這個類中寫實現方法豈不是更