Java併發包中的同步佇列SynchronousQueue實現原理
作者:一粟
介紹
Java 6的併發程式設計包中的SynchronousQueue是一個沒有資料緩衝的BlockingQueue,生產者執行緒對其的插入操作put必須等待消費者的移除操作take,反過來也一樣。
不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue內部並沒有資料快取空間,你不能呼叫peek()方法來看佇列中是否有資料元素,因為資料元素只有當你試著取走的時候才可能存在,不取走而只想偷窺一下是不行的,當然遍歷這個佇列的操作也是不允許的。佇列頭元素是第一個排隊要插入資料的執行緒,而不是要交換的資料。資料是在配對的生產者和消費者執行緒之間直接傳遞的,並不會將資料緩衝資料到佇列中。可以這樣來理解:生產者和消費者互相等待對方,握手,然後一起
SynchronousQueue的一個使用場景是線上程池裡。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個執行緒池根據需要(新任務到來時)建立新的執行緒,如果有空閒執行緒則會重複使用,執行緒空閒了60秒後會被回收。
實現原理
阻塞佇列的實現方法有許多:
阻塞演算法實現
阻塞演算法實現通常在內部採用一個鎖來保證多個執行緒中的put()和take()方法是序列執行的。採用鎖的開銷是比較大的,還會存在一種情況是執行緒A持有執行緒B需要的鎖,B必須一直等待A釋放鎖,即使A可能一段時間內因為B的優先順序比較高而得不到時間片執行。所以在高效能的應用中我們常常希望規避鎖的使用。
public class NativeSynchronousQueue<E> { boolean putting = false; E item = null; public synchronized E take() throws InterruptedException { while (item == null) wait(); E e = item; item = null; notifyAll(); return e; } public synchronized void put(E e) throws InterruptedException { if (e==null) return; while (putting) wait(); putting = true; item = e; notifyAll(); while (item!=null) wait(); putting = false; notifyAll(); } }
訊號量實現
經典同步佇列實現採用了三個訊號量,程式碼很簡單,比較容易理解:
public class SemaphoreSynchronousQueue<E> { E item = null; Semaphore sync = new Semaphore(0); Semaphore send = new Semaphore(1); Semaphore recv = new Semaphore(0); public E take() throws InterruptedException { recv.acquire(); E x = item; sync.release(); send.release(); return x; } public void put (E x) throws InterruptedException{ send.acquire(); item = x; recv.release(); sync.acquire(); } }
在多核機器上,上面方法的同步代價仍然較高,作業系統排程器需要上千個時間片來阻塞或喚醒執行緒,而上面的實現即使在生產者put()時已經有一個消費者在等待的情況下,阻塞和喚醒的呼叫仍然需要。
Java 5實現
public class Java5SynchronousQueue<E> { ReentrantLock qlock = new ReentrantLock(); Queue waitingProducers = new Queue(); Queue waitingConsumers = new Queue(); static class Node extends AbstractQueuedSynchronizer { E item; Node next; Node(Object x) { item = x; } void waitForTake() { /* (uses AQS) */ } E waitForPut() { /* (uses AQS) */ } } public E take() { Node node; boolean mustWait; qlock.lock(); node = waitingProducers.pop(); if(mustWait = (node == null)) node = waitingConsumers.push(null); qlock.unlock(); if (mustWait) return node.waitForPut(); else return node.item; } public void put(E e) { Node node; boolean mustWait; qlock.lock(); node = waitingConsumers.pop(); if (mustWait = (node == null)) node = waitingProducers.push(e); qlock.unlock(); if (mustWait) node.waitForTake(); else node.item = e; } }
Java 5的實現相對來說做了一些優化,只使用了一個鎖,使用佇列代替訊號量也可以允許釋出者直接釋出資料,而不是要首先從阻塞在訊號量處被喚醒。
Java6實現
Java 6的SynchronousQueue的實現採用了一種效能更好的無鎖演算法 — 擴充套件的“Dual stack and Dual queue”演算法。效能比Java5的實現有較大提升。競爭機制支援公平和非公平兩種:非公平競爭模式使用的資料結構是後進先出棧(Lifo Stack);公平競爭模式則使用先進先出佇列(Fifo Queue),效能上兩者是相當的,一般情況下,Fifo通常可以支援更大的吞吐量,但Lifo可以更大程度的保持執行緒的本地化。
程式碼實現裡的Dual Queue或Stack內部是用連結串列(LinkedList)來實現的,其節點狀態為以下三種情況:
- 持有資料 – put()方法的元素
- 持有請求 – take()方法
- 空
這個演算法的特點就是任何操作都可以根據節點的狀態判斷執行,而不需要用到鎖。
其核心介面是Transfer,生產者的put或消費者的take都使用這個介面,根據第一個引數來區別是入列(棧)還是出列(棧)。
/** * Shared internal API for dual stacks and queues. */ static abstract class Transferer { /** * Performs a put or take. * * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. * @param timed if this operation should timeout * @param nanos the timeout, in nanoseconds * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. */ abstract Object transfer(Object e, boolean timed, long nanos); }
TransferQueue實現如下(摘自Java 6原始碼),入列和出列都基於Spin和CAS方法:
/** * Puts or takes an item. */ Object transfer(Object e, boolean timed, long nanos) { /* Basic algorithm is to loop trying to take either of * two actions: * * 1. If queue apparently empty or holding same-mode nodes, * try to add node to queue of waiters, wait to be * fulfilled (or cancelled) and return matching item. * * 2. If queue apparently contains waiting items, and this * call is of complementary mode, try to fulfill by CAS'ing * item field of waiting node and dequeuing it, and then * returning matching item. * * In each case, along the way, check for and try to help * advance head and tail on behalf of other stalled/slow * threads. * * The loop starts off with a null check guarding against * seeing uninitialized head or tail values. This never * happens in current SynchronousQueue, but could if * callers held non-volatile/final ref to the * transferer. The check is here anyway because it places * null checks at top of loop, which is usually faster * than having them implicitly interspersed. */ QNode s = null; // constructed/reused as needed boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null) // saw uninitialized value continue; // spin if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } if (timed && nanos <= 0) // can't wait return null; if (s == null) s = new QNode(e, isData); if (!t.casNext(null, s)) // failed to link in continue; advanceTail(t, s); // swing tail and wait Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null)? x : e; } else { // complementary-mode QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } advanceHead(h, m); // successfully fulfilled LockSupport.unpark(m.waiter); return (x != null)? x : e; } } }