1. 程式人生 > >原始碼分析:CyclicBarrier 之迴圈柵欄

原始碼分析:CyclicBarrier 之迴圈柵欄

## 簡介 CyclicBarrier 是一個同步輔助工具,允許一組執行緒全部等待彼此達到共同屏障點,且等待的執行緒被釋放後還可以重新使用,所以叫做Cyclic(迴圈的)。 ### 應用場景 比如出去旅行時,導遊需要等待所有的客人到齊後,導遊才會給大家講解注意事項等 ### 官方示例 在JDK的原始碼註釋中,提供了一個簡單的示例demo,稍加修改後就可以執行 ```java public class Solver { AtomicInteger sum = new AtomicInteger(0); // 自己新增的一個標識,true代表所有的計算完成了 volatile boolean done = false; final int N; final int[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } @Override public void run() { while (!done()) { int rowSum = Arrays.stream(data[myRow]).sum(); // 計算行的和 System.out.println("processRow(myRow):" + rowSum); sum.addAndGet(rowSum); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } private boolean done(){ return done; } public Solver(int[][] matrix) throws InterruptedException{ data = matrix; N = matrix.length; Runnable barrierAction = () -> { System.out.println("mergeRows(...):"+sum.get()); // 輸出二維陣列的總和 done = true; }; barrier = new CyclicBarrier(N, barrierAction); List threads = new ArrayList(N); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads){ thread.join(); } } public static void main(String[] args) throws InterruptedException{ int[][] matrix = {{1,2,3},{4,5,6}}; Solver solver = new Solver(matrix); } } ``` ## 原始碼分析 ### 主要的屬性 ```java /** 防護柵欄入口的鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** 等待直到跳閘的條件 */ private final Condition trip = lock.newCondition(); /** 構造方法引數,在障礙被釋放之前必須呼叫等待的執行緒數 */ private final int parties; /* 越過柵欄時執行的命令 */ private final Runnable barrierCommand; /** 當前的一代,控制CyclicBarrier的迴圈 */ private Generation generation = new Generation(); /** 記錄仍在等待的參與方執行緒數量,初始值等於parties */ private int count; ``` ### 主要內部類 ```java /** 代:屏障的每次使用都表示為一個生成例項 */ private static class Generation { boolean broken = false; // 標識當前的柵欄已破壞或喚醒,jinglingwang.cn } ``` ### 構造方法 一共有兩個構造方法,第一個構造方法僅需要傳入一個int值,表示呼叫等待的執行緒數;第二個構造方法多了一個runnable介面,當所有的執行緒越過柵欄時執行的命令,沒有則為null; ```java public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; // Runnable 命令執行緒 } ``` ### await() 方法 每個需要在柵欄處等待的執行緒都需要顯式地呼叫這個方法。 ```java public int await() throws InterruptedException, BrokenBarrierException { try { // 呼叫await方法,0:不超時 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } ``` ### dowait() 方法 主要的障礙程式碼 ```java private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 當前鎖 final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { // 當前代 final Generation g = generation; // 檢查當前代的狀態,是否要丟擲BrokenBarrierException異常 if (g.broken) throw new BrokenBarrierException(); // 當前執行緒被中斷了 if (Thread.interrupted()) { // 屏障被打破 breakBarrier(); throw new InterruptedException(); } // count減一 int index = --count; // index等於0,說明最後一個執行緒到達了屏障處 if (index == 0) { // tripped boolean ranAction = false; // 標識Runnable 命令執行緒是否有執行 try { final Runnable command = barrierCommand; // 第二個構造方法的入參,需要執行的命令執行緒 if (command != null) command.run(); // 執行命令執行緒。by:jinglingwang.cn ranAction = true; nextGeneration(); // 更新重置整個屏障 return 0; } finally { if (!ranAction) // ranAction 沒有被設定成true;被中斷了 breakBarrier(); } } // 迴圈直到跳閘,斷開,中斷或超時 for (;;) { try { if (!timed) // 沒有設超時時間,直接呼叫條件鎖的await方法阻塞等待 trip.await(); else if (nanos > 0L) // 有超時時間 nanos = trip.awaitNanos(nanos); //呼叫條件鎖的await方法阻塞等待一段時間 } catch (InterruptedException ie) { // 捕獲中斷異常 if (g == generation && ! g.broken) { breakBarrier(); //被中斷,當前代會被標識成已被破壞 throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 如果上面程式碼沒有異常,理論上只有被喚醒後才會執行到下面的程式碼 // 再次檢查當前代是否已經被破壞 if (g.broken) throw new BrokenBarrierException(); // 正常來說,最後一個執行緒在執行上面的程式碼時,會呼叫nextGeneration,重新生成generation // 所以執行緒被喚醒後,這裡條件會成立 if (g != generation) return index; // 超時檢查 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); //丟擲超時異常 } } } finally { // 釋放鎖 lock.unlock(); } } /** 重置屏障,回到初始狀態,說明可以重複使用*/ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; // 重置等的參與方執行緒數量計數,回到最初的狀態 generation = new Generation(); } private void breakBarrier() { // 標識當前的柵欄狀態 generation.broken = true; count = parties; // 條件鎖,喚醒所有等待的執行緒,jinglingwang.cn trip.signalAll(); } ``` **dowait() 方法過程總結:** 1. 參與方的多個執行緒執行邏輯程式碼後,分別呼叫`await`方法 2. 執行緒分別拿到當前鎖,最先獲得鎖的N-1個執行緒,呼叫條件鎖`Condition`的`await`方法,根據前面[條件鎖的原始碼分析](https://jinglingwang.cn/archives/reentrantlock-condition)我們知道,呼叫條件鎖的await方法會釋放當前鎖,然後再呼叫Unsafa類底層 `park` 阻塞執行緒。 3. 當最後一個執行緒呼叫await方法時(也就是上面的 if (index == 0) 分支邏輯,count減為0,屏障打破),會執行命令執行緒(構造方法的第二個入參Runnable),然後呼叫`nextGeneration`方法,喚醒所有的條件鎖等待的N-1個執行緒(喚醒並不一定馬上執行),然後重置計數與當前代,也就是一個新的屏障了,這也就是為什麼可以重複使用的原因。 4. 最後一個執行緒釋放鎖,N-1執行緒中的執行緒陸續獲得鎖,釋放鎖,完成整個流程 ## CyclicBarrier 總結 1. 支援兩個構造引數:執行緒數和需要執行的命令執行緒 2. CyclicBarrier 是基於ReentrantLock和Condition來實現屏障邏輯的 3. 先搶到鎖的N-1個執行緒會呼叫條件鎖的await方法從而被阻塞 4. 最後一個獲得鎖的執行緒來喚醒之前的N-1個執行緒以及來呼叫命令執行緒的run方法 5. 最後一個獲得鎖的執行緒會生成一個新的屏障(new Generation()),也就是可以重複使用的屏障 6. 如果執行緒中有一個執行緒被中斷,整個屏障被破壞後,所有執行緒都可能丟擲BrokenBarrierException異常 7. 原文首發地址:[https://jinglingwang.cn/archives/cyclicbarrier](https://jinglingwang.cn/archives/cyclicbarrier) ## CyclicBarrier 與CountDownLatch的區別 1. CyclicBarrier 是基於重入鎖和條件鎖來實現的 2. CountDownLatch 是基於AQS的同步功能來實現的 3. CyclicBarrier 不允許0個執行緒,會丟擲異常 4. CountDownLatch 允許0個執行緒,雖然沒什麼*用 5. CyclicBarrier 阻塞的是N-1個執行緒,需要每個執行緒呼叫await,之後由最後一個執行緒來喚醒所有的等待執行緒,這也就是屏障的意思 6. CountDownLatch 是計數為N,阻塞的不一定是N個執行緒(可以是一個或多個),由執行緒顯示呼叫countDown方法來減計數,計數為0時,喚醒阻塞的一個執行緒或多個執行緒 7. CyclicBarrier 最後一個執行緒會重置屏障的引數,生成一個新的Generation,可以重複使用,不需要重新new CyclicBarrier 8. CountDownLatch 沒有重置計數的地方,計數為0後不可以重複使用,需要重新new CountDownLatch 才可以再