CyclicBarrier柵欄
CyclicBarrier有什麼作用?
在現實生活中,在進行某個活動前需要等待人全部都齊了才開始。例如吃飯時要等全家人都上座了才動筷子,旅遊時要等全部人都到齊了才出發,比賽時要等運動員都上場後才開始。
柵欄就可以很好的模擬這類場景,利用CyclicBarrier類可以實現一組執行緒相互等待,當所有執行緒都到達某個屏障點後再進行後續的操作。下圖演示了這一過程。
CyclicBarrier的實現原理
在CyclicBarrier類的內部有一個計數器,每個執行緒在到達屏障點的時候都會呼叫await方法將自己阻塞,此時計數器會減1,當計數器減為0的時候所有因呼叫await方法而被阻塞的執行緒將被喚醒
如果對awai()的呼叫超時,或者await()阻塞的執行緒被中斷,那麼柵欄就被認為是打破了。如果成功地通過柵欄,那麼await()將為每個執行緒返回一個唯一的到達索引號。下面看看CyclicBarrier都有哪些成員變數
//同步操作鎖 private final ReentrantLock lock = new ReentrantLock(); //執行緒攔截器 private final Condition trip = lock.newCondition(); //每次攔截的執行緒數 private final int parties; //換代前的任務 private final Runnable barrierCommand; //柵欄當前代 private Generation generation = new Generation(); //計數器 private int count; //靜態內部類Generation private static class Generation { boolean broken = false; }
可以看到CyclicBarrier內部是通過條件佇列trip來對執行緒進行阻塞的,並且其內部維護了兩個int型的變數parties和count,parties表示每次攔截的執行緒數,該值在構造時進行賦值。
count是內部計數器,它的初始值和parties相同,以後隨著每次await方法的呼叫而減1,直到減為0就將所有執行緒喚醒。
CyclicBarrier有一個靜態內部類Generation,該類的物件代表柵欄的當前代,就像玩遊戲時代表的本局遊戲,利用它可以實現迴圈等待。
barrierCommand表示換代前執行的任務,當count減為0時表示本局遊戲結束,需要轉到下一局。在轉到下一局遊戲之前會將所有阻塞的執行緒喚醒
CyclicBarrier的建構函式
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier有兩個構造器,其中構造器1是它的核心構造器,在這裡你可以指定本局遊戲的參與者數量(要攔截的執行緒數)以及本局結束時要執行的任務,還可以看到計數器count的初始值被設定為parties。CyclicBarrier類最主要的功能就是使先到達屏障點的執行緒阻塞並等待後面的執行緒,其中它提供了兩種等待的方法,分別是定時等待和非定時等待。
//非定時等待
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
//定時等待
public int await(long timeout, TimeUnit unit) throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
可以看到不管是定時等待還是非定時等待,它們都呼叫了dowait方法,只不過是傳入的引數不同而已。下面我們就來看看dowait方法都做了些什麼。
//核心等待方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//檢查當前柵欄是否被打翻
if (g.broken) {
throw new BrokenBarrierException();
}
//檢查當前執行緒是否被中斷
if (Thread.interrupted()) {
//如果當前執行緒被中斷會做以下三件事
//1.打翻當前柵欄
//2.喚醒攔截的所有執行緒
//3.丟擲中斷異常
breakBarrier();
throw new InterruptedException();
}
//每次都將計數器的值減1
int index = --count;
//計數器的值減為0則需喚醒所有執行緒並轉換到下一代
if (index == 0) {
boolean ranAction = false;
try {
//喚醒所有執行緒前先執行指定的任務
final Runnable command = barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
//喚醒所有執行緒並轉到下一代
nextGeneration();
return 0;
} finally {
//確保在任務未成功執行時能將所有執行緒喚醒
if (!ranAction) {
breakBarrier();
}
}
}
//如果計數器不為0則執行此迴圈
for (;;) {
try {
//根據傳入的引數來決定是定時等待還是非定時等待
if (!timed) {
trip.await();
}else if (nanos > 0L) {
nanos = trip.awaitNanos(nanos);
}
} catch (InterruptedException ie) {
//若當前執行緒在等待期間被中斷則打翻柵欄喚醒其他執行緒
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//若在捕獲中斷異常前已經完成在柵欄上的等待,
//則直接呼叫中斷操作
Thread.currentThread().interrupt();
}
}
//如果執行緒因為打翻柵欄操作而被喚醒則丟擲異常
if (g.broken) {
throw new BrokenBarrierException();
}
//如果執行緒因為換代操作而被喚醒則返回計數器的值
if (g != generation) {
return index;
}
//如果執行緒因為時間到了而被喚醒則打翻柵欄並丟擲異常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
可以看到在dowait方法中每次都將count減1,減完後立馬進行判斷看看是否等於0,如果等於0的話就會先去執行之前指定好的任務,執行完之後再呼叫nextGeneration方法將柵欄轉到下一代,在該方法中會將所有執行緒喚醒,將計數器的值重新設為parties,最後會重新設定柵欄代次,在執行完nextGeneration方法之後就意味著遊戲進入下一局。
如果計數器此時還不等於0的話就進入for迴圈,根據引數來決定是呼叫trip.awaitNanos(nanos)還是trip.await()方法,這兩方法對應著定時和非定時等待。如果在等待過程中當前執行緒被中斷就會執行breakBarrier方法,該方法叫做打破柵欄,意味著遊戲在中途被掐斷,設定generation的broken狀態為true並喚醒所有執行緒。同時這也說明在等待過程中有一個執行緒被中斷整盤遊戲就結束,所有之前被阻塞的執行緒都會被喚醒。
執行緒醒來後會執行下面三個判斷,看看是否因為呼叫breakBarrier方法而被喚醒,如果是則丟擲異常;看看是否是正常的換代操作而被喚醒,如果是則返回計數器的值;看看是否因為超時而被喚醒,如果是的話就呼叫breakBarrier打破柵欄並丟擲異常。這裡還需要注意的是,如果其中有一個執行緒因為等待超時而退出,那麼整盤遊戲也會結束,其他執行緒都會被喚醒。下面貼出nextGeneration方法和breakBarrier方法的具體程式碼。
//重置柵欄到下一代
private void nextGeneration() {
// 喚醒條件佇列中的所有執行緒
trip.signalAll();
// 設定計數器的值為需要攔截的執行緒數
count = parties;
//重新設定柵欄代次
generation = new Generation();
}
//中斷當前柵欄
private void breakBarrier() {
generation.broken = true;
//設定計數器的值為需要攔截的執行緒數
count = parties;
//喚醒所有執行緒
trip.signalAll();
}
CyclicBarrier和CountDownLatch的區別
這兩個類都可以實現一組執行緒在到達某個條件之前進行等待,它們內部都有一個計數器,當計數器的值不斷的減為0的時候所有阻塞的執行緒將會被喚醒。
柵欄(Barrier)類似於閉鎖,他能阻塞一組執行緒直到某個事件發生。柵欄和閉鎖的關鍵區別在於,所有執行緒必須同時到達柵欄位置,才能繼續執行。閉鎖用於等待事件,而柵欄用於等待其他執行緒。CyclicBarrier的計數器由自己控制,而CountDownLatch的計數器則由使用者來控制,在CyclicBarrier中執行緒呼叫await方法不僅會將自己阻塞還會將計數器減1,而在CountDownLatch中執行緒呼叫await方法只是將自己阻塞而不會減少計數器的值。
閉鎖是一次性物件,一旦進入終止狀態,就不能重置,只能攔截一輪。而柵欄可以多次使用。
應用案例
賽馬程式,CyclicBarrier使得每匹馬都要執行為了向前移動所需要執行的所有動作,然後必須在柵欄處等待其他所有馬都準備完畢。一旦所有的任務都越過柵欄,他會自動為下一回合比賽做好準備
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) { barrier = b; }
@Override
public void run() {
try {
while(!Thread.interrupted()) {
synchronized(this) {
//賽馬每次隨機跑幾步
strides += rand.nextInt(3);
}
barrier.await();
}
} catch(Exception e) {
e.printStackTrace();
}
}
public String tracks() {
StringBuilder s = new StringBuilder();
for(int i = 0; i < getStrides(); i++) {
s.append("*");
}
s.append(id);
return s.toString();
}
public synchronized int getStrides() { return strides; }
public String toString() { return "Horse " + id + " "; }
}
public class HorseRace implements Runnable {
private static final int FINISH_LINE = 75;
private static List<Horse> horses = new ArrayList<Horse>();
private static ExecutorService exec = Executors.newCachedThreadPool();
@Override
public void run() {
StringBuilder s = new StringBuilder();
//列印賽道邊界
for(int i = 0; i < FINISH_LINE; i++) {
s.append("=");
}
System.out.println(s);
//列印賽馬軌跡
for(Horse horse : horses) {
System.out.println(horse.tracks());
}
//判斷是否結束
for(Horse horse : horses) {
if(horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "won!");
exec.shutdownNow();
return;
}
}
//休息指定時間再到下一輪
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch(InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
}
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(7, new HorseRace());
for(int i = 0; i < 7; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
}
該賽馬程式主要是通過在控制檯不停的列印各賽馬的當前軌跡,以此達到動態顯示的效果。整場比賽有多個輪次,每一輪次各個賽馬都會隨機走上幾步然後呼叫await方法進行等待,當所有賽馬走完一輪的時候將會執行任務將所有賽馬的當前軌跡列印到控制檯上。
這樣每一輪下來各賽馬的軌跡都在不停的增長,當其中某個賽馬的軌跡最先增長到指定的值的時候將會結束整場比賽,該賽馬成為整場比賽的勝利者!程式的執行結果如下: