1. 程式人生 > >Java併發包中的同步佇列SynchronousQueue實現原理

Java併發包中的同步佇列SynchronousQueue實現原理

作者:一粟

介紹

Java 6的併發程式設計包中的SynchronousQueue是一個沒有資料緩衝的BlockingQueue,生產者執行緒對其的插入操作put必須等待消費者的移除操作take,反過來也一樣。

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue內部並沒有資料快取空間,你不能呼叫peek()方法來看佇列中是否有資料元素,因為資料元素只有當你試著取走的時候才可能存在,不取走而只想偷窺一下是不行的,當然遍歷這個佇列的操作也是不允許的。佇列頭元素是第一個排隊要插入資料的執行緒,而不是要交換的資料。資料是在配對的生產者和消費者執行緒之間直接傳遞的,並不會將資料緩衝資料到佇列中。可以這樣來理解:生產者和消費者互相等待對方,握手,然後一起

離開。

SynchronousQueue的一個使用場景是線上程池裡。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個執行緒池根據需要(新任務到來時)建立新的執行緒,如果有空閒執行緒則會重複使用,執行緒空閒了60秒後會被回收。

實現原理

阻塞佇列的實現方法有許多:

阻塞演算法實現

阻塞演算法實現通常在內部採用一個鎖來保證多個執行緒中的put()和take()方法是序列執行的。採用鎖的開銷是比較大的,還會存在一種情況是執行緒A持有執行緒B需要的鎖,B必須一直等待A釋放鎖,即使A可能一段時間內因為B的優先順序比較高而得不到時間片執行。所以在高效能的應用中我們常常希望規避鎖的使用。

public class NativeSynchronousQueue<E> {
    boolean putting = false;
    E item = null;

    public synchronized E take() throws InterruptedException {
        while (item == null)
            wait();
        E e = item;
        item = null;
        notifyAll();
        return e;
    }

    public synchronized void put(E e) throws InterruptedException {
        if (e==null) return;
        while (putting)
            wait();
        putting = true;
        item = e;
        notifyAll();
        while (item!=null)
            wait();
        putting = false;
        notifyAll();
    }
}

訊號量實現

經典同步佇列實現採用了三個訊號量,程式碼很簡單,比較容易理解:

public class SemaphoreSynchronousQueue<E> {
    E item = null;
    Semaphore sync = new Semaphore(0);
    Semaphore send = new Semaphore(1);
    Semaphore recv = new Semaphore(0);

    public E take() throws InterruptedException {
        recv.acquire();
        E x = item;
        sync.release();
        send.release();
        return x;
    }

    public void put (E x) throws InterruptedException{
        send.acquire();
        item = x;
        recv.release();
        sync.acquire();
    }
}

在多核機器上,上面方法的同步代價仍然較高,作業系統排程器需要上千個時間片來阻塞或喚醒執行緒,而上面的實現即使在生產者put()時已經有一個消費者在等待的情況下,阻塞和喚醒的呼叫仍然需要。

Java 5實現

public class Java5SynchronousQueue<E> {
    ReentrantLock qlock = new ReentrantLock();
    Queue waitingProducers = new Queue();
    Queue waitingConsumers = new Queue();

    static class Node extends AbstractQueuedSynchronizer {
        E item;
        Node next;

        Node(Object x) { item = x; }
        void waitForTake() { /* (uses AQS) */ }
           E waitForPut() { /* (uses AQS) */ }
    }

    public E take() {
        Node node;
        boolean mustWait;
        qlock.lock();
        node = waitingProducers.pop();
        if(mustWait = (node == null))
           node = waitingConsumers.push(null);
         qlock.unlock();

        if (mustWait)
           return node.waitForPut();
        else
            return node.item;
    }

    public void put(E e) {
         Node node;
         boolean mustWait;
         qlock.lock();
         node = waitingConsumers.pop();
         if (mustWait = (node == null))
             node = waitingProducers.push(e);
         qlock.unlock();

         if (mustWait)
             node.waitForTake();
         else
            node.item = e;
    }
}

Java 5的實現相對來說做了一些優化,只使用了一個鎖,使用佇列代替訊號量也可以允許釋出者直接釋出資料,而不是要首先從阻塞在訊號量處被喚醒。

Java6實現

Java 6的SynchronousQueue的實現採用了一種效能更好的無鎖演算法 — 擴充套件的“Dual stack and Dual queue”演算法。效能比Java5的實現有較大提升。競爭機制支援公平和非公平兩種:非公平競爭模式使用的資料結構是後進先出棧(Lifo Stack);公平競爭模式則使用先進先出佇列(Fifo Queue),效能上兩者是相當的,一般情況下,Fifo通常可以支援更大的吞吐量,但Lifo可以更大程度的保持執行緒的本地化。

程式碼實現裡的Dual Queue或Stack內部是用連結串列(LinkedList)來實現的,其節點狀態為以下三種情況:

  1. 持有資料 – put()方法的元素
  2. 持有請求 – take()方法

這個演算法的特點就是任何操作都可以根據節點的狀態判斷執行,而不需要用到鎖。

其核心介面是Transfer,生產者的put或消費者的take都使用這個介面,根據第一個引數來區別是入列(棧)還是出列(棧)。

 /**
     * Shared internal API for dual stacks and queues.
     */
    static abstract class Transferer {
        /**
         * Performs a put or take.
         *
         * @param e if non-null, the item to be handed to a consumer;
         *          if null, requests that transfer return an item
         *          offered by producer.
         * @param timed if this operation should timeout
         * @param nanos the timeout, in nanoseconds
         * @return if non-null, the item provided or received; if null,
         *         the operation failed due to timeout or interrupt --
         *         the caller can distinguish which of these occurred
         *         by checking Thread.interrupted.
         */
        abstract Object transfer(Object e, boolean timed, long nanos);
    }

TransferQueue實現如下(摘自Java 6原始碼),入列和出列都基於Spin和CAS方法:

     /**
         * Puts or takes an item.
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             */

            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed &amp;&amp; nanos &lt;= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null)? x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null)? x : e;
                }
            }
        }

參考文章