1. 程式人生 > >非阻塞同步演算法實戰(二)-BoundlessCyclicBarrier

非阻塞同步演算法實戰(二)-BoundlessCyclicBarrier

感謝網友trytocatch的投稿

前言

相比上一 篇而言,本文不需要太多的準備知識,但技巧性更強一些。因為分析、設計的過程比較複雜繁瑣,也限於篇幅,所以,主要展示如何解決這些需求,和講解程式碼。另外,所講的內容也是後一篇實戰中需要用到的一個工具類。

需求介紹

我需要編寫一個同步工具,它需要提供這樣幾個方法:await、pass、cancel。某個執行緒呼叫await時,會被阻塞;當呼叫pass方法時,之前因為await而阻塞的執行緒將全部被解除阻塞,之後呼叫await的執行緒繼續被阻塞,直到下一次呼叫pass。

該工具同時還維護一個版本號,await方法可以帶一個目標版本號,如果當前的版本號比目標版本號新或相同,則直接通過,否則,阻塞本執行緒,直到到達或超過目標版本。呼叫pass的時候,更新版本號。

如果停止了版本更新,可使用cancel方法來解除所有因await而阻塞的執行緒,包括指定版本號的。此方法用於避免無謂地等待。若await發生在cancel之後,則仍將被阻塞。

因為CountDownLatch不允許重複使用,CyclicBarrier只支援固定個數的執行緒,並且都沒有維護一個版本號,所以沒有已有的類能實現上面的需求,需要自己實現。

問題分析

簡單分析可知,應該維護一個佇列,來儲存當前被阻塞的執行緒,用於在pass時對它們一一解除阻塞,pass時應該使用一個新的佇列,否則不方便正確處理pass前和pass後呼叫await的執行緒。

至此,問題的關鍵就明瞭了:如何將佇列的替換和版本號的更新這兩個操作做成原子的。

解決方案

以前在《JAVA併發程式設計實踐》曾看到過這樣一個小技巧,如果要原子地更新兩個變數,那麼可以建立一個新的類將它們封裝起來,將這兩個變數當定義成類成員變數,更新時,用CAS更新這個類的引用即可。

因為較為複雜,下面先給出完整的程式碼,再講解其中的關鍵。

注意:上面所說pass,在程式碼中的具體實現為nextCycle,有兩個版本,一個自動維護版本號,一個由呼叫者維護版本號。

/**
 * @author [email protected]
 * @time 2013-1-31
 */
public class BoundlessCyclicBarrier {
    protected final AtomicReference<VersionQueue> waitQueueRef;

    public BoundlessCyclicBarrier() {
        this(0);
    }

    public BoundlessCyclicBarrier(int startVersion) {
        waitQueueRef = new AtomicReference<VersionQueue>(new VersionQueue(startVersion));
    }

    public final void awaitWithAssignedVersion(int myVersion)
            throws InterruptedException {
        awaitImpl(true, myVersion, 0);
    }

    /**
     *
     * @param myVersion
     * @param nanosTimeout
     * @return if timeout, or be canceled and doesn't reach myVersion, returns false
     * @throws InterruptedException
     */
    public final boolean awaitWithAssignedVersion(int myVersion, long nanosTimeout) throws InterruptedException {
        return awaitImpl(true, myVersion, nanosTimeout);
    }

    public final void await() throws InterruptedException {
        awaitImpl(false, 0, 0);
    }

    /**
     *
     * @param nanosTimeout
     * @return if and only if timeout, returns false
     * @throws InterruptedException
     */
    public final boolean await(long nanosTimeout)
            throws InterruptedException {
        return awaitImpl(false, 0, nanosTimeout);
    }

    /**
     * pass and version++(some threads may not be unparked when awaitImpl is in process, but it's OK in this Barrier)
     * @return old queue version
     */
    public int nextCycle() {
        VersionQueue oldQueue = waitQueueRef.get();
        VersionQueue newQueue = new VersionQueue(oldQueue.version + 1);
        for(;;){
            if (waitQueueRef.compareAndSet(oldQueue, newQueue)) {
                for (Thread t : oldQueue.queue)
                    LockSupport.unpark(t);
                break;
            }
            oldQueue = waitQueueRef.get();
            newQueue.version = oldQueue.version + 1;
        }
        return oldQueue.version;
    }

    /**
     * pass and assign the next cycle version(caller should make sure that the newAssignVersion is right)
     * @param newAssignVersion
     */
    public void nextCycle(int newAssignVersion) {
        VersionQueue oldQueue = waitQueueRef.getAndSet(new VersionQueue(newAssignVersion));
        for (Thread t : oldQueue.queue)
            LockSupport.unpark(t);
    }

    /**
     * if version update has stopped, invoke this to awake all threads
     */
    public void cancel(){
        VersionQueue oldQueue = waitQueueRef.get();
        if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true))) {
            for (Thread t : oldQueue.queue)
                LockSupport.unpark(t);
    }

    public final int getVersion() {
        return waitQueueRef.get().version;
    }

    private static final class VersionQueue {
        final private ConcurrentLinkedQueue queue;
        int version;
        final boolean isCancelQueue;

        VersionQueue(int curVersion){
            this(curVersion, false);
        }

        VersionQueue(int curVersion, boolean isCancelQueue) {
            this.version = curVersion;
            this.isCancelQueue = isCancelQueue;
            queue = new ConcurrentLinkedQueue();
        }
    }

    /**
     *
     * @param assignVersion is myVersion available
     * @param myVersion wait for this version
     * @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid)      * @return if timeout, or be canceled and doesn't reach myVersion, returns false      * @throws InterruptedException      */     protected boolean awaitImpl(boolean assignVersion, int myVersion,             long nanosTimeout) throws InterruptedException {         boolean timeOutEnable = nanosTimeout > 0;
        long lastTime = System.nanoTime();
        VersionQueue newQueue = waitQueueRef.get();//A
        if (assignVersion && newQueue.version - myVersion >= 0)
            return true;
        while (true) {
            VersionQueue submitQueue = newQueue;//B
            submitQueue.queue.add(Thread.currentThread());//C
            while (true) {
                newQueue = waitQueueRef.get();//D
                if (newQueue != submitQueue){//E: it's a new cycle
                    if(assignVersion == false)
                        return true;
                    else if(newQueue.version - myVersion >= 0)
                        return true;
                    else if (newQueue.isCancelQueue)//F: be canceled
                        return false;
                    else//just like invoking awaitImpl again
                        break;
                }
                if (timeOutEnable) {
                    if (nanosTimeout <= 0)
                        return false;
                    LockSupport.parkNanos(this, nanosTimeout);
                    long now = System.nanoTime();
                    nanosTimeout -= now - lastTime;
                    lastTime = now;
                } else
                    LockSupport.park(this);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        }
    }
}

程式碼分析

先分析一下awaitImpl方法,A和D是該方法的關鍵點,決定著它屬於哪一個批次,對應哪一個版本。這裡有個小細節,在nexeCycle,cancel解除阻塞時,該執行緒可能並不在佇列中,因為插入佇列發生在C處,這在A和D之後(雖然看起來C在D之前,但D取到的queue要在下一次迴圈時才被當作submitQueue),所以,在E處再進行了一次判斷,開始解除阻塞時,舊佇列肯定被新佇列所替換,newQueue != submitQueue一定為真,就會不呼叫park進行阻塞了,也就不需要解除阻塞,所以即使解除阻塞時,該執行緒不在佇列中也是沒問題的。

再看E處,當進入一個新的cycle時(當前佇列與提交的佇列不同),a)如果沒指定版本,或者到達或超過了指定版本,則返回true;b)如果當前呼叫了cancel,則當前佇列的isCancelQueue將為true,則不繼續傻等,返回false;c)或者還未到達指定版本,break,插入到當前佇列中,繼續等待指定版本的到達。

如果沒有進入E處的IF內,則當前執行緒會被阻塞,直到超時,然後返回false;或被中斷,然後丟擲InterruptedException;或被解除阻塞,重新進行E處的判定。

這裡還有個小細節,既然cancel時,把當前的佇列設定了isCancelQueue,那麼之後指定版本的await會不會也直接返回了呢?其實不會的,因為它若要執行F處的判斷,則先必需通過E處的判定,這意味著,當前佇列已經不是提交時的那個設定了isCancelQueue的隊列了。

程式碼中對於cancel的處理,其實並不保證cancel後,之前的await都會被解除阻塞並返回,如果cancel後,緊接著又呼叫了nextCycle,那麼可能某執行緒感知不到cancel的呼叫,喚醒後又繼續等待指定的版本。cancel的目的是在於不讓執行緒傻等,既然恢復版本更新了,那就繼續等待吧。

如果自己維護版本號,則應該保證遞增。另外,版本號的設計,考慮到了int溢位的情況,版本的前後判斷,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,這樣,版本號就相當於迴圈使用了,只要兩個比較的版本號的差不超過int的最大值,那麼都是正確的,int的最大值可是20多億,幾乎不可能出現跨度這麼大的兩個版本號的比較,所以,認為它是正確的。

小結

本文講到了一個非阻塞同步演算法設計時的小技巧,如果多個變數之間要維護某種特定關係,那麼可以將它們封裝到一個類中,再用CAS更新這個類的引用,這樣就達到了:要麼都被更新,要麼都沒被更新,保持了多個變數之間的一致性。同時需要注意的是,每次更新都必需建立新的包裝物件,假如有其它更好的辦法,應該避免使用該方法。