1. 程式人生 > >【搞定Java併發程式設計】第25篇:Java中的併發工具類之同步屏障 CyclicBarrier

【搞定Java併發程式設計】第25篇:Java中的併發工具類之同步屏障 CyclicBarrier

上一篇:Java中的併發工具類之CountDownLatch

本文目錄:

1、CyclicBarrier的簡單概述

2、CyclicBarrier 的原始碼分析

3、CyclicBarrier與CountDownLatch的區別


1、CyclicBarrier的簡單概述

現實生活中我們經常會遇到這樣的情景,在進行某個活動前需要等待人全部都齊了才開始。例如吃飯時要等全家人都上座了才動筷子,旅遊時要等全部人都到齊了才出發,比賽時要等運動員都上場後才開始。

在JUC包中為我們提供了一個同步工具類能夠很好的模擬這類場景,它就是CyclicBarrier類。利用CyclicBarrier類可以實現一組執行緒相互等待,當所有執行緒都到達某個屏障點後再進行後續的操作。下圖演示了這一過程。

CyclicBarrier字面意思是“可重複使用的柵欄”,CyclicBarrier 相比 CountDownLatch 來說,要簡單很多,其原始碼沒有什麼高深的地方,它是 ReentrantLock 和 Condition 的組合使用。

看如下示意圖,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一個柵欄,因為它的柵欄(Barrier)可以重複使用(Cyclic)。

cyclicbarrier-2

首先,CyclicBarrier 的原始碼實現和 CountDownLatch 大相徑庭,CountDownLatch 基於 AQS 的共享模式的使用,而 CyclicBarrier 基於 Condition 來實現的。

因為 CyclicBarrier 的原始碼相對來說簡單許多,讀者只要熟悉了前面關於 Condition 的分析,那麼這裡的原始碼是毫無壓力的,就是幾個特殊概念罷了。


2、CyclicBarrier 的原始碼分析

下面先看下CyclicBarrier類中的基本屬性和構造方法:

public class CyclicBarrier {
    // 我們說了,CyclicBarrier 是可以重複使用的,我們把每次從開始使用到穿過柵欄當做"一代"
    private static class Generation {
        boolean broken = false;
    }
 
    private final ReentrantLock lock = new ReentrantLock();
    // CyclicBarrier 是基於 Condition 的
    // Condition 是“條件”的意思,CyclicBarrier 的等待執行緒通過 barrier 的“條件”是大家都到了柵欄上
    private final Condition trip = lock.newCondition();
 
    // 參與的執行緒數
    private final int parties;
 
    // 如果設定了這個,代表越過柵欄之前,要執行相應的操作
    private final Runnable barrierCommand;
 
    // 當前所處的“代”
    private Generation generation = new Generation();
 
    // 還沒有到柵欄的執行緒數,這個值初始為 parties,然後遞減
    // 還沒有到柵欄的執行緒數 = parties - 已經到柵欄的數量
    private int count;
 
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
 
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    ...
}

上面貼出了CyclicBarrier所有的成員變數,可以看到CyclicBarrier內部是通過條件佇列trip來對執行緒進行阻塞的,並且其內部維護了兩個int型的變數parties和count,parties表示每次攔截的執行緒數,該值在構造時進行賦值。count是內部計數器,它的初始值和parties相同,以後隨著每次await方法的呼叫而減1,直到減為0就將所有執行緒喚醒。

CyclicBarrier有一個靜態內部類Generation,該類的物件代表柵欄的當前代,就像玩遊戲時代表的本局遊戲,利用它可以實現迴圈等待。barrierCommand表示換代前執行的任務,當count減為0時表示本局遊戲結束,需要轉到下一局。在轉到下一局遊戲之前會將所有阻塞的執行緒喚醒,在喚醒所有執行緒之前你可以通過指定barrierCommand來執行自己的任務。

我用一圖來描繪下 CyclicBarrier 裡面的一些概念:

cyclicbarrier-3

看圖我們也知道了,CyclicBarrier 的原始碼最重要的就是 await() 方法了。

首先,先看怎麼開啟新的一代:

// 開啟新的一代,當最後一個執行緒到達柵欄上的時候,呼叫這個方法來喚醒其他執行緒,同時初始化“下一代”
private void nextGeneration() {
    // 首先,需要喚醒所有的在柵欄上等待的執行緒
    trip.signalAll();
    // 更新 count 的值
    count = parties;
    // 重新生成“新一代”
    generation = new Generation();
}

再看下如何怎麼打破一個柵欄:

private void breakBarrier() {
    // 設定狀態 broken 為 true
    generation.broken = true;
    // 重置 count 為初始值 parties
    count = parties;
    // 喚醒所有已經在等待的執行緒
    trip.signalAll();
}

這兩個方法之後用得到,現在開始分析最重要的等待通過柵欄方法 await 方法:

// 不帶超時機制
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
// 帶超時機制,如果超時丟擲 TimeoutException 異常
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

繼續往裡看:

// 核心等待方法
private int dowait(boolean timed, long nanos) 
throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        // 檢查當前柵欄是否被打翻
        if (g.broken) {
            throw new BrokenBarrierException();
        }
        // 檢查當前執行緒是否被中斷
        if (Thread.interrupted()) {
            // 如果當前執行緒被中斷會做以下三件事
            // 1.打翻當前柵欄
            // 2.喚醒攔截的所有執行緒
            // 3.丟擲中斷異常
            breakBarrier();
            throw new InterruptedException();
        }
       // 每次都將計數器的值減1
       int index = --count;
       // 計數器的值減為0則需喚醒所有執行緒並轉換到下一代
       if (index == 0) {
           boolean ranAction = false;
           try {
               // 喚醒所有執行緒前先執行指定的任務
               final Runnable command = barrierCommand;
               if (command != null) {
                   command.run();
               }
               ranAction = true;
               // 喚醒所有執行緒並轉到下一代
               nextGeneration();
               return 0;
           } finally {
               // 確保在任務未成功執行時能將所有執行緒喚醒
               if (!ranAction) {
                   breakBarrier();
               }
           }
       }

       // 如果計數器不為0則執行此迴圈
       for (;;) {
           try {
               // 根據傳入的引數來決定是定時等待還是非定時等待
               if (!timed) {
                   trip.await();
               }else if (nanos > 0L) {
                   nanos = trip.awaitNanos(nanos);
               }
           } catch (InterruptedException ie) {
               // 若當前執行緒在等待期間被中斷則打翻柵欄喚醒其他執行緒
               if (g == generation && ! g.broken) {
                   breakBarrier();
                   throw ie;
               } else {
                   // 若在捕獲中斷異常前已經完成在柵欄上的等待, 則直接呼叫中斷操作
                   Thread.currentThread().interrupt();
               }
           }
           // 如果執行緒因為打翻柵欄操作而被喚醒則丟擲異常
           if (g.broken) {
               throw new BrokenBarrierException();
           }
           // 如果執行緒因為換代操作而被喚醒則返回計數器的值
           if (g != generation) {
               return index;
           }
           // 如果執行緒因為時間到了而被喚醒則打翻柵欄並丟擲異常
           if (timed && nanos <= 0L) {
               breakBarrier();
               throw new TimeoutException();
           }
       }
    } finally {
        lock.unlock();
    }
}

上面貼出的程式碼中註釋都比較詳細,我們只挑一些重要的來講。

可以看到在dowait方法中每次都將count減1,減完後立馬進行判斷看看是否等於0,如果等於0的話就會先去執行之前指定好的任務,執行完之後再呼叫nextGeneration方法將柵欄轉到下一代,在該方法中會將所有執行緒喚醒,將計數器的值重新設為parties,最後會重新設定柵欄代次,在執行完nextGeneration方法之後就意味著遊戲進入下一局。

如果計數器此時還不等於0的話就進入for迴圈,根據引數來決定是呼叫trip.awaitNanos(nanos)還是trip.await()方法,這兩方法對應著定時和非定時等待。

如果在等待過程中當前執行緒被中斷就會執行breakBarrier方法,該方法叫做打破柵欄,意味著遊戲在中途被掐斷,設定generation的broken狀態為true並喚醒所有執行緒。同時這也說明在等待過程中有一個執行緒被中斷整盤遊戲就結束,所有之前被阻塞的執行緒都會被喚醒。

執行緒醒來後會執行下面三個判斷,看看是否因為呼叫breakBarrier方法而被喚醒,如果是則丟擲異常;看看是否是正常的換代操作而被喚醒,如果是則返回計數器的值;看看是否因為超時而被喚醒,如果是的話就呼叫breakBarrier打破柵欄並丟擲異常。

下面開始收尾工作:

首先,我們看看怎麼得到有多少個執行緒到了柵欄上,處於等待狀態:

public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

判斷一個柵欄是否被打破了,這個很簡單,直接看 broken 的值即可:

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

前面我們在說 await 的時候也幾乎說清楚了,什麼時候柵欄會被打破,總結如下:

  1. 中斷,我們說了,如果某個等待的執行緒發生了中斷,那麼會打破柵欄,同時丟擲 InterruptedException 異常;
  2. 超時,打破柵欄,同時丟擲 TimeoutException 異常;
  3. 指定執行的操作丟擲了異常。

最後,我們來看看怎麼重置一個柵欄:

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

我們設想一下,如果初始化時,指定了執行緒 parties = 4,前面有 3 個執行緒呼叫了 await 等待,在第 4 個執行緒呼叫 await 之前,我們呼叫 reset 方法,那麼會發生什麼?

首先,打破柵欄,那意味著所有等待的執行緒(3個等待的執行緒)會喚醒,await 方法會通過丟擲 BrokenBarrierException 異常返回。然後開啟新的一代,重置了 count 和 generation,相當於一切歸零了。

上面我們已經通過原始碼將CyclicBarrier的原理基本都講清楚了,下面我們就通過一個賽馬的例子來深入掌握它的使用。

 

class Horse implements Runnable {

    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;
    private static Random rand = new Random(47);
    private static CyclicBarrier barrier;

    public Horse(CyclicBarrier b) { barrier = b; }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                synchronized(this) {
                    // 賽馬每次隨機跑幾步
                    strides += rand.nextInt(3);
                }
                barrier.await();
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

    public String tracks() {
        StringBuilder s = new StringBuilder();
        for(int i = 0; i < getStrides(); i++) {
            s.append("*");
        }
        s.append(id);
        return s.toString();
    }

    public synchronized int getStrides() { return strides; }
    public String toString() { return "Horse " + id + " "; }

}

public class HorseRace implements Runnable {

    private static final int FINISH_LINE = 75;
    private static List<Horse> horses = new ArrayList<Horse>();
    private static ExecutorService exec = Executors.newCachedThreadPool();

    @Override
    public void run() {
        StringBuilder s = new StringBuilder();
        // 列印賽道邊界
        for(int i = 0; i < FINISH_LINE; i++) {
            s.append("=");
        }
        System.out.println(s);
        // 列印賽馬軌跡
        for(Horse horse : horses) {
            System.out.println(horse.tracks());
        }
        // 判斷是否結束
        for(Horse horse : horses) {
            if(horse.getStrides() >= FINISH_LINE) {
                System.out.println(horse + "won!");
                exec.shutdownNow();
                return;
            }
        }
        // 休息指定時間再到下一輪
        try {
            TimeUnit.MILLISECONDS.sleep(200);
        } catch(InterruptedException e) {
            System.out.println("barrier-action sleep interrupted");
        }
    }

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(7, new HorseRace());
        for(int i = 0; i < 7; i++) {
            Horse horse = new Horse(barrier);
            horses.add(horse);
            exec.execute(horse);
        }
    }
}

該賽馬程式主要是通過在控制檯不停的列印各賽馬的當前軌跡,以此達到動態顯示的效果。整場比賽有多個輪次,每一輪次各個賽馬都會隨機走上幾步然後呼叫await方法進行等待,當所有賽馬走完一輪的時候將會執行任務將所有賽馬的當前軌跡列印到控制檯上。這樣每一輪下來各賽馬的軌跡都在不停的增長,當其中某個賽馬的軌跡最先增長到指定的值的時候將會結束整場比賽,該賽馬成為整場比賽的勝利者!程式的執行結果如下:


3、CyclicBarrier與CountDownLatch的區別

至此我們難免會將CyclicBarrier與CountDownLatch進行一番比較。這兩個類都可以實現一組執行緒在到達某個條件之前進行等待,它們內部都有一個計數器,當計數器的值不斷的減為0的時候所有阻塞的執行緒將會被喚醒。

有區別的是CyclicBarrier的計數器由自己控制,而CountDownLatch的計數器則由使用者來控制,在CyclicBarrier中執行緒呼叫await方法不僅會將自己阻塞還會將計數器減1,而在CountDownLatch中執行緒呼叫await方法只是將自己阻塞而不會減少計數器的值。

另外,CountDownLatch只能攔截一輪,而CyclicBarrier可以實現迴圈攔截。一般來說用CyclicBarrier可以實現CountDownLatch的功能,而反之則不能,例如上面的賽馬程式就只能使用CyclicBarrier來實現。總之,這兩個類的異同點大致如此,至於何時使用CyclicBarrier,何時使用CountDownLatch,還需要讀者自己去拿捏。

除此之外,CyclicBarrier還提供了:resert()、getNumberWaiting()、isBroken()等比較有用的方法。

全文完!


上一篇:Java中的併發工具類之CountDownLatch