[Java原始碼][併發J.U.C]---併發工具類CyclicBarrier
前言
CyclicBarrier
要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行. 簡單地說就是人到齊了後才可以讓每個人繼續去做自己的事情.
CycliBarrier
是通過 ReentrantLock
和 Condition
實現的一個數據結構.
本文程式碼: JavaBasics/src/com/sourcecode/concurrencytools_CyclicBarrier" target="_blank" rel="nofollow,noindex">程式碼下載
例子1
先通過一個簡單的例子瞭解一下 CyclicBarrier
.
package com.sourcecode.concurrencytools_CyclicBarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest4 { static CyclicBarrier c = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException, BrokenBarrierException { for (int i = 0; i < 5; i++) { Thread thread = new MyThread(); thread.start(); } } static class MyThread extends Thread { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " tries to wait!"); c.await(); } catch (Exception e) { System.out.println(e); //System.out.println(Thread.currentThread().getName() + "------>" + c.isBroken() + ", interrupted status:" + Thread.currentThread().isInterrupted()); } finally { System.out.println(Thread.currentThread().getName() + " finishes!"); } } } }
執行結果如下: 初始化 CyclicBarrier
的時候引數是 5
,表示需要等待 5
個執行緒達到後才可以開啟屏障,正如結果所示, thread-0
到 thread-3
在等待,到最後一個執行緒 thread-4
到達屏障時,此時屏障開啟,每個執行緒執行各自接下來的模組.
如果初始化引數大於 5
,比如 6
,此程式將一直阻塞,因為沒有第 6
個執行緒到達該屏障.
Thread-0 tries to wait! Thread-1 tries to wait! Thread-2 tries to wait! Thread-3 tries to wait! Thread-4 tries to wait! Thread-4 finishes! Thread-1 finishes! Thread-0 finishes! Thread-2 finishes! Thread-3 finishes!
實現思路分析

cyclicbarrier.png
private static class Generation { boolean broken = false; } /** 重入鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** 一個lock物件的Condition例項 */ private final Condition trip = lock.newCondition(); /** 攔截執行緒的總個數 */ private final int parties; /** The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** 攔截執行緒的剩餘需要數量 */ private int count;
從該圖可以看出 CyclicBarrier
有一個重入鎖的變數 lock
並且持有一個該鎖的 Condition
例項 trip
,就可以大概知道該 CyclicBarrier
會讓執行緒嘗試獲取鎖並且在拿到鎖後將屏障個數減減操作,然後根據 count
的數量來決定是否呼叫 trip.await()
操作,比如 count==0
表示最後一個到達屏障的執行緒,那麼就不需要呼叫 trip
的方法了.
構造方法
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
第二個引數 Runnable barrierAction
表示的是當最後一個到達屏障的執行緒先執行完該 barrierAction
的 run
方法後再執行喚醒其他執行緒的操作.簡單地說當到達屏障時,先執行 barrierAction
的業務再執行其他執行緒的業務.
await方法
await
方法有兩個,分別為 await()
和 await(long timeout, TimeUnit unit)
方法,一個沒有超時返回,另外一個有超時返回,但是兩者都是呼叫 dowait(boolean timed, long nanos)
,該方法是整個 CyclicBarrier
的核心實現.
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
所以接下來的看看該方法 dowait
是如何實現的.
/** * @param timed 是否需要超時 * @param nanos 時長 * @return 返回還需要等待多少個執行緒才可以到達屏障 * @throws InterruptedException 當前執行緒中斷 * @throws BrokenBarrierException 有其他執行緒中斷或者其他執行緒超時 * @throws TimeoutException 當前執行緒等待超時 */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 獲取重入鎖 final ReentrantLock lock = this.lock; // 嘗試獲取鎖 lock.lock(); try { //System.out.println(Thread.currentThread().getName() + " get locks."); // 獲得當前代 final Generation g = generation; // 如果有執行緒中斷或者超時 if (g.broken) throw new BrokenBarrierException(); // 如果當前執行緒被中斷 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //System.out.format("index=%d\n", index); if (index == 0) {// 最後一個到達屏障的執行緒 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); //更新下一代 return 0; } finally { // 如果執行command.run發生異常,則breakBarrier if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果等待過程中有被執行緒中斷 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 如果當代的broken為true,表明有執行緒被中斷 if (g.broken) throw new BrokenBarrierException(); // 如果換代了 表示可以返回了 if (g != generation) return index; // 如果超時則先break the current generation // 再丟擲超時異常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放鎖 //System.out.println(Thread.currentThread().getName() + " release locks."); lock.unlock(); } } /** *break the current generation *1. broken設定為true *2. count 重新設定為parties *3. 喚醒所有執行緒 */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } /** *start a new generation *1. 喚醒所有等待中的執行緒 *2. count 重新設定為parties *3. generation 設定成一個新的Generation物件 */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
該方法的流程大概如下:
1.嘗試獲取鎖
2.如果不是最後一個到達屏障的執行緒,則進入 for
迴圈中一直等待(此時該執行緒會釋放鎖)直到被最後一個執行緒喚醒或者被某個執行緒中斷後呼叫 breakBarrier
方法喚醒. 喚醒後需要競爭再次獲得鎖後才可以繼續執行.
3.如果是最後一個到達屏障的執行緒,如果 barrierCommand
不為空,則需要先執行 barrierCommand.run()
方法,然後通過 nextGeneration
喚醒等待的執行緒.
4.在所有異常退出或者正常退出都需要釋放鎖.
流程圖如下

dowait.png
例子2
設定執行緒屏障為3,啟動兩個執行緒2秒超時等待,讓最後一個執行緒3秒後才到達屏障.
package com.sourcecode.concurrencytools_CyclicBarrier; import java.util.concurrent.TimeUnit; public class CyclicBarrierTest5 { static CyclicBarrier c = new CyclicBarrier(3); public static void main(String[] args) throws InterruptedException, BrokenBarrierException { for (int i = 0; i < 2; i++) { Thread thread = new MyThread(); thread.start(); } TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + "------>" + "tries to wait!"); c.await(); System.out.println(Thread.currentThread().getName() + "------>" + "finishes!"); } static class MyThread extends Thread { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " tries to wait!"); c.await(1, TimeUnit.SECONDS); //c.await(); } catch (Exception e) { System.out.println(Thread.currentThread().getName() + "---->" + e); //System.out.println(Thread.currentThread().getName() + "------>" + c.isBroken() + ", interrupted status:" + Thread.currentThread().isInterrupted()); } finally { System.out.println(Thread.currentThread().getName() + " finishes!"); } } } }
結果如下: 可以看到第一個執行緒出現超時異常後,表示該執行緒已經呼叫了 breakBarrier
方法,所以可以看到後續的兩個執行緒都是丟擲 BrokenBarrierException
異常.
Thread-0 tries to wait! Thread-1 tries to wait! Thread-1---->java.util.concurrent.TimeoutException Thread-0---->com.sourcecode.concurrencytools_CyclicBarrier.BrokenBarrierException Thread-0 finishes! Thread-1 finishes! main------>tries to wait! Exception in thread "main" com.sourcecode.concurrencytools_CyclicBarrier.BrokenBarrierException at com.sourcecode.concurrencytools_CyclicBarrier.CyclicBarrier.dowait(CyclicBarrier.java:69) at com.sourcecode.concurrencytools_CyclicBarrier.CyclicBarrier.await(CyclicBarrier.java:39) at com.sourcecode.concurrencytools_CyclicBarrier.CyclicBarrierTest5.main(CyclicBarrierTest5.java:14)
isBroken方法和reset方法
/** * @return 當前代是否被破壞, 被破壞的兩種情況, 某個執行緒中斷或者等待超時 */ public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier();// break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
reset留作遇到好的例子後再分析
參考
1.Java併發程式設計的藝術