1. 程式人生 > >【Java】BlockingQueue深入分析

【Java】BlockingQueue深入分析

一、概述:
BlockingQueue作為執行緒容器,可以為執行緒同步提供有力的保障。


二、BlockingQueue定義的常用方法
1.BlockingQueue定義的常用方法如下:

  丟擲異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查 element() peek() 不可用 不可用

        1)add(anObject):把anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則招聘異常

        2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.

        3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.

        4)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null

        5)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的物件被加入為止

其中:BlockingQueue 不接受null 元素。試圖add、put 或offer 一個null 元素時,某些實現會丟擲NullPointerException。null 被用作指示poll 操作失敗的警戒值。

三、BlockingQueue的幾個注意點

【1】BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個remainingCapacity,超出此容量,便無法無阻塞地put 附加元素。沒有任何內部容量約束的BlockingQueue 總是報告Integer.MAX_VALUE 的剩餘容量。
【2】BlockingQueue 實現主要用於生產者-使用者佇列,但它另外還支援Collection 介面。因此,舉例來說,使用remove(x) 從佇列中移除任意一個元素是有可能的。然而,這種操作通常不 會有效執行,只能有計劃地偶爾使用,比如在取消排隊資訊時。
【3】BlockingQueue 實現是執行緒安全的。所有排隊方法都可以使用內部鎖或其他形式的併發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和removeAll)沒有 必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了c 中的一些元素後,addAll(c) 有可能失敗(丟擲一個異常)。
【4】BlockingQueue 實質上不 支援使用任何一種“close”或“shutdown”操作來指示不再新增任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的end-of-stream 或poison 物件,並根據使用者獲取這些物件的時間來對它們進行解釋。

 四、簡要概述BlockingQueue常用的四個實現類

 

        1)ArrayBlockingQueue:規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的.

        2)LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的

        3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式的Comparator決定的順序.

        4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.

    其中LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的資料結構不一樣,導致LinkedBlockingQueue的資料吞吐量要大於ArrayBlockingQueue,但線上程數量很大時其效能的可預見性低於ArrayBlockingQueue. 

五、具體BlockingQueue的實現類的內部細節

有耐心的同學請看具體實現類細節:

1、ArrayBlockingQueue

    ArrayBlockingQueue是一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列的頭部 是在佇列中存在時間最長的元素。佇列的尾部 是在佇列中存在時間最短的元素。新元素插入到佇列的尾部,佇列檢索操作則是從佇列頭部開始獲得元素。

    這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致放入操作受阻塞;試圖從空佇列中檢索元素將導致類似阻塞。

ArrayBlockingQueue建立的時候需要指定容量capacity(可以儲存的最大的元素個數,因為它不會自動擴容)以及是否為公平鎖(fair引數)。

在建立ArrayBlockingQueue的時候預設建立的是非公平鎖,不過我們可以在它的建構函式裡指定。這裡呼叫ReentrantLock的建構函式建立鎖的時候,呼叫了:

public ReentrantLock(boolean fair) {

sync = (fair)? new FairSync() : new NonfairSync();

}

FairSync/ NonfairSync是ReentrantLock的內部類:

執行緒按順序請求獲得公平鎖,而一個非公平鎖可以闖入,且當它尚未進入等待佇列,就會和等待佇列head結點的執行緒發生競爭,如果鎖的狀態可用,請求非公平鎖的執行緒可在等待佇列中向前跳躍,獲得該鎖。內部鎖synchronized沒有提供確定的公平性保證。

分三點來講這個類:

2.1 新增新元素的方法:add/put/offer

2.2 該類的幾個例項變數:takeIndex/putIndex/count/

2.3 Condition實現

1.1 新增新元素的方法:add/put/offer

首先,談到新增元素的方法,首先得分析以下該類同步機制中用到的鎖:

Java程式碼


[java]
lock = new ReentrantLock(fair);      
notEmpty = lock.newCondition();//Condition Variable 1      
notFull =  lock.newCondition();//Condition Variable 2   

這三個都是該類的例項變數,只有一個鎖lock,然後lock例項化出兩個Condition,notEmpty/noFull分別用來協調多執行緒的讀寫操作。

Java程式碼


[java] 
public boolean offer(E e) {      
        if (e == null) throw new NullPointerException();      
        final ReentrantLock lock = this.lock;//每個物件對應一個顯示的鎖      
        lock.lock();//請求鎖直到獲得鎖(不可以被interrupte)      
        try {      
            if (count == items.length)//如果佇列已經滿了      
                return false;      
            else {      
                insert(e);      
                return true;      
            }      
        } finally {      
            lock.unlock();//      
        }      
}      
看insert方法:      
private void insert(E x) {      
        items[putIndex] = x;      
        //增加全域性index的值。      
        /*    
        Inc方法體內部:    
        final int inc(int i) {    
        return (++i == items.length)? 0 : i;    
            }    
        這裡可以看出ArrayBlockingQueue採用從前到後向內部陣列插入的方式插入新元素的。如果插完了,putIndex可能重新變為0(在已經執行了移除操作的前提下,否則在之前的判斷中佇列為滿)    
        */     
        putIndex = inc(putIndex);       
        ++count;      
        notEmpty.signal();//wake up one waiting thread      
}     

Java程式碼


[java]
public void put(E e) throws InterruptedException {      
        if (e == null) throw new NullPointerException();      
        final E[] items = this.items;      
        final ReentrantLock lock = this.lock;      
        lock.lockInterruptibly();//請求鎖直到得到鎖或者變為interrupted      
        try {      
            try {      
                while (count == items.length)//如果滿了,當前執行緒進入noFull對應的等waiting狀態      
                    notFull.await();      
            } catch (InterruptedException ie) {      
                notFull.signal(); // propagate to non-interrupted thread      
                throw ie;      
            }      
            insert(e);      
        } finally {      
            lock.unlock();      
        }      
}     

Java程式碼


[java] 
public boolean offer(E e, long timeout, TimeUnit unit)      
        throws InterruptedException {      
     
        if (e == null) throw new NullPointerException();      
    long nanos = unit.toNanos(timeout);      
        final ReentrantLock lock = this.lock;      
        lock.lockInterruptibly();      
        try {      
            for (;;) {      
                if (count != items.length) {      
                    insert(e);      
                    return true;      
                }      
                if (nanos <= 0)      
                    return false;      
                try {      
                //如果沒有被 signal/interruptes,需要等待nanos時間才返回      
                    nanos = notFull.awaitNanos(nanos);      
                } catch (InterruptedException ie) {      
                    notFull.signal(); // propagate to non-interrupted thread      
                    throw ie;      
                }      
            }      
        } finally {      
            lock.unlock();      
        }      
    }     

Java程式碼


[java] 
public boolean add(E e) {      
    return super.add(e);      
    }      
父類:      
public boolean add(E e) {      
        if (offer(e))      
            return true;      
        else     
            throw new IllegalStateException("Queue full");      
    }   
 
1.2 該類的幾個例項變數:takeIndex/putIndex/count

Java程式碼


[java]
用三個數字來維護這個佇列中的資料變更:      
/** items index for next take, poll or remove */     
    private int takeIndex;      
    /** items index for next put, offer, or add. */     
    private int putIndex;      
    /** Number of items in the queue */     
    private int count;     

提取元素的三個方法take/poll/remove內部都呼叫了這個方法:

Java程式碼


[java]  
private E extract() {      
        final E[] items = this.items;      
        E x = items[takeIndex];      
        items[takeIndex] = null;//移除已經被提取出的元素      
        takeIndex = inc(takeIndex);//策略和新增元素時相同      
        --count;      
        notFull.signal();//提醒其他在notFull這個Condition上waiting的執行緒可以嘗試工作了      
        return x;      
    }    
從這個方法裡可見,tabkeIndex維護一個可以提取/移除元素的索引位置,因為takeIndex是從0遞增的,所以這個類是FIFO佇列。

putIndex維護一個可以插入的元素的位置索引。

count顯然是維護佇列中已經存在的元素總數。

1.3 Condition實現

Condition現在的實現只有java.util.concurrent.locks.AbstractQueueSynchoronizer內部的ConditionObject,並且通過ReentranLock的newCondition()方法暴露出來,這是因為Condition的await()/sinal()一般在lock.lock()與lock.unlock()之間執行,當執行condition.await()方法時,它會首先釋放掉本執行緒持有的鎖,然後自己進入等待佇列。直到sinal(),喚醒後又會重新試圖去拿到鎖,拿到後執行await()下的程式碼,其中釋放當前鎖和得到當前鎖都需要ReentranLock的tryAcquire(int arg)方法來判定,並且享受ReentranLock的重進入特性。

Java程式碼


[java] 
public final void await() throws InterruptedException {      
            if (Thread.interrupted())      
                throw new InterruptedException();      
           //加一個新的condition等待節點      
 Node node = addConditionWaiter();      
//釋放自己的鎖      
            int savedState = fullyRelease(node);       
            int interruptMode = 0;      
            while (!isOnSyncQueue(node)) {      
            //如果當前執行緒 等待狀態時CONDITION,park住當前執行緒,等待condition的signal來解除      
                LockSupport.park(this);      
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)      
                    break;      
            }      
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)      
                interruptMode = REINTERRUPT;      
            if (node.nextWaiter != null)      
                unlinkCancelledWaiters();      
            if (interruptMode != 0)      
                reportInterruptAfterWait(interruptMode);      
        }    

2、SynchronousQueue

    一種阻塞佇列,其中每個 put 必須等待一個 take,反之亦然。同步佇列沒有任何內部容量,甚至連一個佇列的容量都沒有。不能在同步佇列上進行 peek,因為僅在試圖要取得元素時,該元素才存在;除非另一個執行緒試圖移除某個元素,否則也不能(使用任何方法)新增元素;也不能迭代佇列,因為其中沒有元素可用於迭代。佇列的頭 是嘗試新增到佇列中的首個已排隊執行緒元素;如果沒有已排隊執行緒,則不新增元素並且頭為 null。對於其他Collection 方法(例如 contains),SynchronousQueue 作為一個空集合。此佇列不允許 null 元素。

    同步佇列類似於 CSP 和 Ada 中使用的 rendezvous 通道。它非常適合於傳遞性設計,在這種設計中,在一個執行緒中執行的物件要將某些資訊、事件或任務傳遞給在另一個執行緒中執行的物件,它就必須與該物件同步。

    對於正在等待的生產者和使用者執行緒而言,此類支援可選的公平排序策略。預設情況下不保證這種排序。但是,使用公平設定為 true 所構造的佇列可保證執行緒以 FIFO 的順序進行訪問。公平通常會降低吞吐量,但是可以減小可變性並避免得不到服務。


3、LinkedBlockingQueue

    一個基於已連結節點的、範圍任意的 blocking queue。此佇列按 FIFO(先進先出)排序元素。佇列的頭部 是在佇列中時間最長的元素。佇列的尾部 是在佇列中時間最短的元素。新元素插入到佇列的尾部,並且佇列檢索操作會獲得位於佇列頭部的元素。連結佇列的吞吐量通常要高於基於陣列的佇列,但是在大多數併發應用程式中,其可預知的效能要低。
單向連結串列結構的佇列。如果不指定容量預設為Integer.MAX_VALUE。通過putLock和takeLock兩個鎖進行同步,兩個鎖分別例項化notFull和notEmpty兩個Condtion,用來協調多執行緒的存取動作。其中某些方法(如remove,toArray,toString,clear等)的同步需要同時獲得這兩個鎖,並且總是先putLock.lock緊接著takeLock.lock(在同一方法fullyLock中),這樣的順序是為了避免可能出現的死鎖情況(我也想不明白為什麼會是這樣?)

4、PriorityBlockingQueue

    一個無界的阻塞佇列,它使用與類 PriorityQueue 相同的順序規則,並且提供了阻塞檢索的操作。雖然此佇列邏輯上是無界的,但是由於資源被耗盡,所以試圖執行新增操作可能會失敗(導致 OutOfMemoryError)。此類不允許使用 null 元素。依賴自然順序的優先順序佇列也不允許插入不可比較的物件(因為這樣做會丟擲ClassCastException)。
看它的三個屬性,就基本能看懂這個類了:

Java程式碼


[java]
private final PriorityQueue q;      
    private final ReentrantLock lock = new ReentrantLock(true);      
    private final Condition notEmpty = lock.newCondition();  
 
 lock說明本類使用一個lock來同步讀寫等操作。

notEmpty協調佇列是否有新元素提供,而佇列滿了以後會呼叫PriorityQueue的grow方法來擴容。

5、DelayQueue

    Delayed 元素的一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部 是延遲期滿後儲存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且 poll 將返回 null。當一個元素的getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於或等於零的值時,則出現期滿。此佇列不允許使用 null 元素。
Delayed介面繼承自Comparable,我們插入的E元素都要實現這個介面。

DelayQueue的設計目的間API文件:

An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will returnnull. Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.

因為DelayQueue構造函數了裡限定死不允許傳入comparator(之前的PriorityBlockingQueue中沒有限定死),即只能在compare方法裡定義優先順序的比較規則。再看上面這段英文,“The head of the queue is that Delayed element whose delay expired furthest in the past.”說明compare方法實現的時候要保證最先加入的元素最早結束延時。而 “Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.”說明getDelay方法的實現必須保證延時到了返回的值變為<=0的int。

上面這段英文中,還說明了:在poll/take的時候,佇列中元素會判定這個elment有沒有達到超時時間,如果沒有達到,poll返回null,而take進入等待狀態。但是,除了這兩個方法,佇列中的元素會被當做正常的元素來對待。例如,size方法返回所有元素的數量,而不管它們有沒有達到超時時間。而協調的Condition available只對take和poll是有意義的。

另外需要補充的是,在ScheduledThreadPoolExecutor中工作佇列型別是它的內部類DelayedWorkQueue,而DelayedWorkQueue的Task容器是DelayQueue型別,而ScheduledFutureTask作為Delay的實現類作為Runnable的封裝後的Task類。也就是說ScheduledThreadPoolExecutor是通過DelayQueue優先順序判定規則來執行任務的。

6、BlockingDque+LinkedBlockingQueue

BlockingDque為阻塞雙端佇列介面,實現類有LinkedBlockingDque。雙端佇列特別之處是它首尾都可以操作。LinkedBlockingDque不同於LinkedBlockingQueue,它只用一個lock來維護讀寫操作,並由這個lock例項化出兩個Condition notEmpty及notFull,而LinkedBlockingQueue讀和寫分別維護一個lock。