1. 程式人生 > >並發容器(二)阻塞隊列詳細介紹

並發容器(二)阻塞隊列詳細介紹

才會 ddc ray add seq 插入數據 裏的 返回 utl

1. 什麽是阻塞隊列?

阻塞隊列(BlockingQueue) 是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。

2. Java裏的阻塞隊列

JDK7提供了7個阻塞隊列。分別是

  • ArrayBlockingQueue : 一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue : 一個由鏈表結構組成的有界阻塞隊列。
  • PriorityBlockingQueue :
    一個支持優先級排序的無界阻塞隊列。
  • DelayQueue: 一個使用優先級隊列實現的無界阻塞隊列。
  • SynchronousQueue: 一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列。
  • LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。

BlockingQueue接口 與 BlockingDeque 接口

??JDK提供的阻塞隊列中,LinkedBlockingDeque 是一個 Deque(雙向的隊列),其實現的接口是 BlockingDeque;其余6個阻塞隊列則是 Queue(單向隊列),實現的接口是 BlockingQueue。

對於 BlockingQueue 的阻塞隊列提供了四種處理方法:

方法描述 拋出異常 返回特殊的值 一直阻塞 超時退出
插入數據 add(e) offer(e) put(e) offer(e,time,unit)
獲取並移除隊列的頭 remove() poll() take() poll(time,unit)
獲取但不移除隊列的頭 element() peek() 不可用 不可用
  • 拋出異常: 是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException(“Queue full”)異常。當隊列為空時,從隊列裏獲取元素時會拋出NoSuchElementEx·ception異常 。
  • 返回特殊值: 插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,如果沒有則返回null
  • 一直阻塞: 當阻塞隊列滿時,如果生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出: 當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。

?? 拋出異常返回特殊值 方法的實現是一樣的,只不過對失敗的操作的處理不一樣!通過 AbstractQueue 的源碼可以發現,add(e),remove(),element() 都是分別基於 offer(),poll(),peek() 實現的

public boolean add(E arg0) {
        if (this.offer(arg0)) {
            return true;
        } else {
            throw new IllegalStateException("Queue full");
        }
    }

    public E remove() {
        Object arg0 = this.poll();
        if (arg0 != null) {
            return arg0;
        } else {
            throw new NoSuchElementException();
        }
    }

    public E element() {
        Object arg0 = this.peek();
        if (arg0 != null) {
            return arg0;
        } else {
            throw new NoSuchElementException();
        }
    }

JDK 文檔提到的幾點:

  • BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會拋出 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。
  • BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 附加元素。沒有任何內部容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩余容量。
  • BlockingQueue 實現主要用於生產者-使用者隊列,但它另外還支持 Collection 接口。因此,舉例來說,使用 remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操作通常不 會有效執行,只能有計劃地偶爾使用,比如在取消排隊信息時。
  • BlockingQueue 實現是線程安全的。所有排隊方法都可以使用內部鎖或其他形式的並發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll,這些方法盡可能地少使用)沒有 必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了 c 中的一些元素後,addAll(c) 有可能失敗(拋出一個異常)。

對於 BlockingDeque 的雙向隊列也提供了四種形式的方法

**第一個元素(頭部)**
方法描述 拋出異常 返回特殊的值 一直阻塞 超時退出
插入數據 addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit)
獲取並移除隊列的頭 removeFirst() pollFirst() takeFirst() pollFirst(time, unit)
獲取但不移除隊列的頭 getFirst() peekFirst() 不適用 不適用
**最後一個元素(尾部)**
方法描述 拋出異常 返回特殊的值 一直阻塞 超時退出
插入數據 addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit)
獲取並移除隊列的頭 removeLast() pollLast() takeLast() pollLast(time, unit)
獲取但不移除隊列的頭 getLast() peekLast() 不適用 不適用

像所有 BlockingQueue 一樣,BlockingDeque 是線程安全的,但不允許 null 元素,並且可能有(也可能沒有)容量限制。

BlockingDeque 接口繼承擴展了 BlockingQueue 接口,對於 繼承自 BlockingQueue 的方法,除了插入方法(add、poll、offer方法,是插入的隊列的尾部),其他方法,操作的都是隊列的頭部(第一個元素)。

七個阻塞隊列的詳細介紹

1. ArrayBlockingQueue

??ArrayBlockingQueue是一個用數組實現的 有界阻塞隊列。 此隊列按照先進先出(FIFO)的原則對元素進行排序。
??默認情況下不保證訪問者公平地訪問隊列 ,所謂公平訪問隊列是指阻塞的線程,可按照阻塞的先後順序訪問隊列。非公平性是對先等待的線程是不公平的,當隊列可用時,阻塞的線程都可以競爭訪問隊列的資格。
??為了保證公平性,通常會降低吞吐量。

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

訪問者的公平性是使用可重入鎖實現的 ,代碼如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
}

2. LinkedBlockingQueue

??LinkedBlockingQueue是一個用鏈表實現的 有界阻塞隊列。此隊列的默認和最大長度為Integer.MAX_VALUE。 此隊列按照先進先出的原則對元素進行排序。

3. PriorityBlockingQueue

??PriorityBlockingQueue是一個支持優先級的無界隊列(雖然此隊列邏輯上是無界的,但是資源被耗盡時試圖執行 add 操作也將失敗,導致 OutOfMemoryError)。默認情況下元素采取自然順序排列(每個元素都必須實現 Comparable 接口),也可以通過比較器comparator來指定元素的排序規則。元素按照升序排列.
??其iterator() 方法中提供的叠代器並不 保證以特定的順序遍歷 PriorityBlockingQueue 的元素。如果需要 有序地進行遍歷, 則應考慮使用 Arrays.sort(pq.toArray())。此外,可以使用方法 drainTo 按優先級順序移除 全部或部分元素,並將它們放在另一個 collection 中。
?? 在此類上進行的操作不保證具有同等優先級的元素的順序。 如果需要實施某一排序,那麽可以定義自定義類或者比較器,比較器可使用修改鍵斷開主優先級值之間的聯系。例如,以下是應用先進先出 (first-in-first-out) 規則斷開可比較元素之間聯系的一個類。要使用該類,則需要插入一個新的 FIFOEntry(anEntry) 來替換普通的條目對象。

   class FIFOEntry<E extends Comparable<? super E>>
     implements Comparable<FIFOEntry<E>> {
   final static AtomicLong seq = new AtomicLong();
   final long seqNum;
   final E entry;
   public FIFOEntry(E entry) {
     seqNum = seq.getAndIncrement();
     this.entry = entry;
   }
   public E getEntry() { return entry; }
   public int compareTo(FIFOEntry<E> other) {
     int res = entry.compareTo(other.entry);
     if (res == 0 && other.entry != this.entry)
       res = (seqNum < other.seqNum ? -1 : 1);
     return res;
   }
 }

4. DelayQueue

??Delayed 元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。註意 DelayQueue 的所有方法只能操作“到期的元素“,例如,poll()、remove()、size()等方法,都會忽略掉未到期的元素。
我們可以將DelayQueue運用在以下應用場景:

  • 緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
  • 定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。

DelayQueue 的實現是基於 PriorityQueue,是一個優先級隊列,是以延時時間的長短進行排序的。所以,DelayQueue 需要知道每個元素的延時時間,而這個延時時間是由 Delayed 接口的 getDelay()方法獲取的。所以, DelayQueue 的元素必須實現 Dela 接口;

//計算並返回延時時間
public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }

延時隊列的原理

延時隊列的實現很簡單,當消費者從隊列裏獲取元素時,如果元素沒有達到延時時間,就阻塞當前線程。

long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    else if (leader != null)
                        available.await();

5. SynchronousQueue

??一種阻塞隊列,其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。

SynchronousQueue 的幾個特點

  • 同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。 所以很多繼承的方法就沒有用了,(如 isEmpty()始終返回true,size()為0,包含contain、移除remove 都始終為false 等等)。或者說,真正有意義的只有以下幾個方法:獲取並移除(poll()、poll(timeout,timeunit)、take())、插入(offer()、offer(timeout,timeunit)、put());
  • 適合於傳遞性設計,在這種設計中, 每一個put操作必須等待一個take操作,反之亦然 。(當然,如果用的是offer、poll的話,那麽就不會阻塞等待)。SynchronousQueue可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。
  • 支持可選的公平排序策略。 默認情況下不保證這種排序。但是,使用公平設置為 true 所構造的隊列可保證線程以 FIFO 的順序進行訪問。
//設置公平性的構造方法
public SynchronousQueue(boolean fair) 
          創建一個具有指定公平策略的 SynchronousQueue。

6. LinkedTransferQueue

??LinkedTransferQueue是一個由鏈表結構組成的 無界阻塞TransferQueue隊列 。相對於其他阻塞隊列LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法: 如果當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生產者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點,並等到該元素被消費者消費了才返回。transfer方法的關鍵代碼如下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

tryTransfer方法: 則是用來試探下生產者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法無論消費者是否接收,方法立即返回。而transfer方法是必須等到消費者消費了才返回。

對於帶有時間限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法 ,則是試圖把生產者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再返回,如果超時還沒消費元素,則返回false,如果在超時時間內消費了元素,則返回true。

7. LinkedBlockingDeque

??LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。 雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。 相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法。另外,插入方法add等同於addLast,移除方法remove等效於removeFirst。但是take方法卻等同於takeFirst,不知道是不是Jdk的bug,使用時還是用帶有First和Last後綴的方法更清楚。
?? 和 LinkedBlockingQueue 一樣,是有界的阻塞隊列,默認長度以及最大長度是 Integer.MAX_VALUE。可在創建時,指定容量。

阻塞隊列的實現原理

??如果隊列是空的,消費者會一直等待,當生產者添加元素時候,消費者是如何知道當前隊列有元素的呢?如果讓你來設計阻塞隊列你會如何設計,讓生產者和消費者能夠高效率的進行通訊呢?讓我們先來看看JDK是如何實現的。

??使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。通過查看JDK源碼發現ArrayBlockingQueue使用了Condition來實現,代碼如下

private final Condition notFull;
private final Condition notEmpty;

public ArrayBlockingQueue(int capacity, boolean fair) {
        //省略其他代碼
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
}

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
  } finally {
            lock.unlock();
        }
}

private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

當我們往隊列裏插入一個元素時,如果隊列不可用,阻塞生產者主要通過LockSupport.park(this);來實現

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);
        }

繼續進入源碼,發現調用setBlocker先保存下將要阻塞的線程,然後調用unsafe.park阻塞當前線程。

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }

unsafe.park是個native方法,代碼如下:

public native void park(boolean isAbsolute, long time);

park這個方法會阻塞當前線程,只有以下四種情況中的一種發生時,該方法才會返回。

  • 與park對應的unpark執行或已經執行時。註意:已經執行是指unpark先執行,然後再執行的park。
  • 線程被中斷時。
  • 如果參數中的time不是零,等待了指定的毫秒數時。
  • 發生異常現象時。這些異常事先無法確定。

我們繼續看一下JVM是如何實現park方法的,park在不同的操作系統使用不同的方式實現,在linux下是使用的是系統方法pthread_cond_wait實現。實現代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp裏的 os::PlatformEvent::park方法,代碼如下:

void os::PlatformEvent::park() {
             int v ;
         for (;;) {
        v = _Event ;
         if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
         }
         guarantee (v >= 0, "invariant") ;
         if (v == 0) {
         // Do this the hard way by blocking ...
         int status = pthread_mutex_lock(_mutex);
         assert_status(status == 0, status, "mutex_lock");
         guarantee (_nParked == 0, "invariant") ;
         ++ _nParked ;
         while (_Event < 0) {
         status = pthread_cond_wait(_cond, _mutex);
         // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
         // Treat this the same as if the wait was interrupted
         if (status == ETIME) { status = EINTR; }
         assert_status(status == 0 || status == EINTR, status, "cond_wait");
         }
         -- _nParked ;

         // In theory we could move the ST of 0 into _Event past the unlock(),
         // but then we‘d need a MEMBAR after the ST.
         _Event = 0 ;
         status = pthread_mutex_unlock(_mutex);
         assert_status(status == 0, status, "mutex_unlock");
         }
         guarantee (_Event >= 0, "invariant") ;
         }

     }

pthread_cond_wait是一個多線程的條件變量函數,cond是condition的縮寫,字面意思可以理解為線程在等待一個條件發生,這個條件是一個全局變量。這個方法接收兩個參數,一個共享變量_cond,一個互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal實現的。park 在windows下則是使用WaitForSingleObject實現的。

當隊列滿時,生產者往阻塞隊列裏插入一個元素,生產者線程會進入WAITING (parking)狀態。我們可以使用jstack dump阻塞的生產者線程看到這點:

"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
        at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)



參考文獻

  • 聊聊並發(七)——Java中的阻塞隊列

並發容器(二)阻塞隊列詳細介紹