1. 程式人生 > >CountDownLatch和CyclicBarrier 傻傻的分不清?超長精美圖文又來了

CountDownLatch和CyclicBarrier 傻傻的分不清?超長精美圖文又來了

> - 你有一個思想,我有一個思想,我們交換後,一個人就有兩個思想 > > - If you can NOT explain it simply, you do NOT understand it well enough 現陸續將Demo程式碼和技術文章整理在一起 [Github實踐精選](https://github.com/FraserYu/learnings) ,方便大家閱讀檢視,本文同樣收錄在此,覺得不錯,還請Star --- ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083743714-1614749688.png) ## 前言 併發程式設計的三大核心是`分工`,`同步`和`互斥`。在日常開發中,經常會碰到需要在主執行緒中開啟多個子執行緒去並行的執行任務,並且主執行緒需要等待所有子執行緒執行完畢再進行彙總的場景,這就涉及到分工與同步的內容了 在講 [有序性可見性,Happens-before來搞定](https://dayarch.top/p/java-concurrency-happens-before-rule.html#join-%E8%A7%84%E5%88%99) 時,提到過 join() 規則,使用 join() 就可以簡單的實現上述場景: ```java @Slf4j public class JoinExample { public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-1 執行完畢"); } }, "Thread-1"); Thread thread2 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-2 執行完畢"); } }, "Thread-2"); thread1.start(); thread2.start(); thread1.join(); thread2.join(); log.info("主執行緒執行完畢"); } } ``` 執行結果: ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083743951-211048549.png) 整個過程可以這麼理解 ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083744798-605987946.png) 我們來檢視 join() 的實現原始碼: ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083745198-725246509.png) 其實現原理是不停的檢查 join 執行緒是否存活,如果 join 執行緒存活,則 wait(0) 永遠的等下去,直至 join 執行緒終止後,執行緒的 this.notifyAll() 方法會被呼叫(該方法是在 JVM 中實現的,JDK 中並不會看到原始碼),退出迴圈恢復主執行緒執行。很顯然這種迴圈檢查的方式比較低效 除此之外,使用 join() 缺少很多靈活性,比如實際專案中很少讓自己單獨建立執行緒(原因在 [我會手動建立執行緒,為什麼要使用執行緒池?](https://dayarch.top/p/why-we-need-to-use-threadpool.html) 中說過)而是使用 Executor, 這進一步減少了 join() 的使用場景,所以 join() 的使用在多數是停留在 demo 演示上 > 那如何實現文中開頭提到的場景呢? ## CountDownLatch CountDownLatch, 直譯過來【數量向下門閂】,那肯定裡面有計數器的存在了。我們將上述程式用 CountDownLatch 實現一下,先讓大家有個直觀印象 ```java @Slf4j public class CountDownLatchExample { private static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { // 這裡不推薦這樣建立執行緒池,最好通過 ThreadPoolExecutor 手動建立執行緒池 ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-1 執行完畢"); //計數器減1 countDownLatch.countDown(); } }); executorService.submit(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-2 執行完畢"); //計數器減1 countDownLatch.countDown(); } }); log.info("主執行緒等待子執行緒執行完畢"); log.info("計數器值為:" + countDownLatch.getCount()); countDownLatch.await(); log.info("計數器值為:" + countDownLatch.getCount()); log.info("主執行緒執行完畢"); executorService.shutdown(); } } ``` 執行結果如下: ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083745479-2007230018.png) 結合上述示例的執行結果,相信你也能猜出 CountDownLatch 的實現原理了: 1. 初始化計數器數值,比如為2 2. 子執行緒執行完則呼叫 `countDownLatch.countDown()` 方法將計數器數值減1 3. 主執行緒呼叫 await() 方法阻塞自己,直至計數器數值為0(即子執行緒全部執行結束) > 不知道你是否注意,`countDownLatch.countDown();` 這行程式碼可以寫在子執行緒執行的任意位置,不像 join() 要完全等待子執行緒執行完,這也是 CountDownLatch 靈活性的一種體現 上述的例子還是過於簡單,[Oracle 官網 CountDownLatch 說明](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html ) 有兩個非常經典的使用場景,示例很簡單,強烈建議檢視相關示例程式碼,開啟使用思路。我將兩個示例程式碼以圖片的形式展示在此處: ### 官網示例1 - 第一個是開始訊號 `startSignal`,阻止任何工人 `Worker` 繼續工作,直到司機 `Driver` 準備好讓他們繼續工作 - 第二個是完成訊號 `doneSignal`,允許司機 `Driver` 等待,直到所有的工人 `Worker` 完成。 ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083745928-814311711.png) ### 官網示例2 另一種典型的用法是將一個問題分成 N 個部分 (比如將一個大的 list 拆分成多分,每個 Worker 幹一部分),Worker 執行完自己所處理的部分後,計數器減1,當所有子部分完成後,Driver 才繼續向下執行 ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083746633-1524866131.png) 結合官網示例,相信你已經可以結合你自己的業務場景解,通過 CountDownLatch 解決一些序列瓶頸來提高執行效率了,會用還遠遠不夠,咱得知道 CountDownLatch 的實現原理 ### 原始碼分析 CountDownLatch 是 AQS 實現中的最後一個內容,有了前序文章的知識鋪墊: - [Java AQS佇列同步器以及ReentrantLock的應用](https://dayarch.top/p/java-aqs-and-reentrantlock.html) - [Java AQS共享式獲取同步狀態及Semaphore的應用分析](https://dayarch.top/p/java-aqs-acquireshared-and-semaphore.html) 當你看到 CountDownLatch 的原始碼內容,你會高興的笑起來,內容真是太少了 ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083747106-69642271.png) 展開類結構全部內容就這點東西 ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083747466-1174435905.png) 既然 CountDownLatch 是基於 AQS 實現的,那肯定也離不開對同步狀態變數 state 的操作,我們在初始化的時候就將計數器的值賦值給了state ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083747756-1385463727.png) 另外,它可以多個執行緒同時獲取,那一定是基於共享式獲取同步變數的用法了,所以它需要通過重寫下面兩個方法控制同步狀態變數 state : - tryAcquireShared() - tryReleaseShared() CountDownLatch 暴露給使用者的只有 `await()` 和 `countDown()` 兩個方法,前者是阻塞自己,因為只有獲取同步狀態才會才會出現阻塞的情況,那自然是在 `await()` 的方法內部會用到 `tryAcquireShared()`;有獲取就要有釋放,那後者 `countDown()` 方法內部自然是要用到 `tryReleaseShared()` 方法了 > PS:如果你對上面這個很自然的推斷理解有困難,強烈建議你看一下前序文章的鋪墊,以防止知識斷層帶來的困擾 #### #### await() 先來看 await() 方法, 從方法簽名上看,該方法會丟擲 InterruptedException, 所以它是可以響應中斷的,這個我們在 [Java多執行緒中斷機制](https://dayarch.top/p/java-concurrency-interrupt-mechnism.html) 中明確說明過 ```java public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } ``` 其內部呼叫了同步器提供的模版方法 `acquireSharedInterruptibly` ```java public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果監測到中斷標識為true,會重置標識,然後丟擲 InterruptedException if (Thread.interrupted()) throw new InterruptedException(); // 呼叫重寫的 tryAcquireShared 方法,該方法結果如果大於零則直接返回,程式繼續向下執行,如果小於零,則會阻塞自己 if (tryAcquireShared(arg) < 0) // state不等於0,則嘗試阻塞自己 doAcquireSharedInterruptibly(arg); } ``` 重寫的 `tryAcquireShared` 方法非常簡單, 就是判斷同步狀態變數 state 的值是否為 0, 如果為零 (子執行緒已經全部執行完畢)則返回1, 否則返回 -1 ```java protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } ``` 如果子執行緒沒有全部執行完畢,則會通過 `doAcquireSharedInterruptibly` 方法阻塞自己,這個方法在 [Java AQS共享式獲取同步狀態及Semaphore的應用分析](https://dayarch.top/p/java-aqs-acquireshared-and-semaphore.html) 中已經仔細分析過了,這裡就不再贅述了 ```java private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 再次嘗試獲取同步裝阿嚏,如果大於0,說明子執行緒全部執行完畢,直接返回 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 阻塞自己 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } ``` `await()` 方法的實現就是這麼簡單,接下來看看 `countDown()` 的實現原理 #### countDown() ```java public void countDown() { sync.releaseShared(1); } ``` 同樣是呼叫同步器提供的模版方法 `releaseShared` ```java public final boolean releaseShared(int arg) { // 呼叫自己重寫的同步器方法 if (tryReleaseShared(arg)) { // 喚醒呼叫 await() 被阻塞的執行緒 doReleaseShared(); return true; } return false; } ``` 重寫的 `tryReleaseShared` 同樣很簡單 ```java protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); // 如果當前狀態值為0,則直接返回 (1) if (c == 0) return false; // 使用 CAS 讓計數器的值減1 (2) int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } ``` 程式碼 (1) 判斷當前同步狀態值,如果為0 則直接返回 false;否則執行程式碼 (2),使用 CAS 將計數器減1,如果 CAS 失敗,則迴圈重試,最終返回 `nextc == 0` 的結果值,如果該值返回 true,說明最後一個執行緒已呼叫 countDown() 方法,然後就要喚醒呼叫 await() 方法被阻塞的執行緒,同樣由於分析過 AQS 的模版方法 doReleaseShared 整個釋放同步狀態以及喚醒的過程,所以這裡同樣不再贅述了 仔細看CountDownLatch重寫的 `tryReleaseShared` 方法,有一點需要和大家說明: > 程式碼 (1) `if (c == 0)` 看似沒什麼用處,其實用處大大滴,如果沒有這個判斷,當計數器值已經為零了,其他執行緒再呼叫 countDown 方法會將計數器值變為負值 現在就差 `await(long timeout, TimeUnit unit)` 方法沒介紹了 #### await(long timeout, TimeUnit unit) ```java public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } ``` 該方法簽名同樣丟擲 InterruptedException,意思可響應中斷。它其實就是 await() 更完善的一個版本,簡單來說就是 > 主執行緒設定等待超時時間,如果該時間內子執行緒沒有執行完畢,主執行緒也會**直接返回** 我們將上面的例子稍稍修改一下你就會明白(主執行緒超時時間設定為 2 秒,而子執行緒要 sleep 5 秒) ```java @Slf4j public class CountDownLatchTimeoutExample { private static CountDownLatch countDownLatch = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { // 這裡不推薦這樣建立執行緒池,最好通過 ThreadPoolExecutor 手動建立執行緒池 ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-1 執行完畢"); //計數器減1 countDownLatch.countDown(); } }); executorService.submit(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("Thread-2 執行完畢"); //計數器減1 countDownLatch.countDown(); } }); log.info("主執行緒等待子執行緒執行完畢"); log.info("計數器值為:" + countDownLatch.getCount()); countDownLatch.await(2, TimeUnit.SECONDS); log.info("計數器值為:" + countDownLatch.getCount()); log.info("主執行緒執行完畢"); executorService.shutdown(); } } ``` 執行結果如下: ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083748025-473488961.png) 形象化的展示上述示例的執行過程 ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083748451-1886349863.png) ### 小結 CountDownLatch 的實現原理就是這麼簡單,瞭解了整個實現過程後,你也許發現了使用 CountDownLatch 的一個問題: > 計數器減 1 操作是**一次性**的,也就是說當計數器減到 0, 再有執行緒呼叫 await() 方法,該執行緒會直接通過,**不會再起到等待其他執行緒執行結果起到同步的作用了** 為了解決這個問題,貼心的 Doug Lea 大師早已給我們準備好相應策略 `CyclicBarrier` ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083748907-396027545.png) > 本來想將 `CyclicBarrier` 的內容放到下一個章節,但是 CountDownLatch 的內容著實有些少,不夠解渴,另外有對比才有傷害,所以內容沒結束,咱得繼續看 `CyclicBarrier`) ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083749184-1548194009.png) ## CyclicBarrier 上面簡單說了一下 CyclicBarrier 被創造出來的理由,這裡先看一下它的字面解釋: ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083749563-566821305.png) 概念總是有些抽象,我們將上面的例子用 CyclicBarrier 再做個改動,先讓大家有個直觀的使用概念 ```java @Slf4j public class CyclicBarrierExample { // 建立 CyclicBarrier 例項,計數器的值設定為2 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); int breakCount = 0; // 將執行緒提交到執行緒池 executorService.submit(() -> { try { log.info(Thread.currentThread() + "第一回合"); Thread.sleep(1000); cyclicBarrier.await(); log.info(Thread.currentThread() + "第二回合"); Thread.sleep(2000); cyclicBarrier.await(); log.info(Thread.currentThread() + "第三回合"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executorService.submit(() -> { try { log.info(Thread.currentThread() + "第一回合"); Thread.sleep(2000); cyclicBarrier.await(); log.info(Thread.currentThread() + "第二回合"); Thread.sleep(1000); cyclicBarrier.await(); log.info(Thread.currentThread() + "第三回合"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); executorService.shutdown(); } } ``` 執行結果: ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083749948-623133941.png) 結合程式程式碼與執行結果,我們可以看出,子執行緒執行完第一回合後(執行回合所需時間不同),都會呼叫 await() 方法,等所有執行緒都到達屏障點後,會突破屏障繼而執行第二回合,同樣的道理最終到達第三回合 形象化的展示上述示例的執行過程 ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083750397-692629063.png) 看到這裡,你應該明白 CyclicBarrier 的基本用法,但隨之你內心也應該有了一些疑問: 1. 怎麼判斷所有執行緒都到達屏障點的? 2. 突破某一屏障後,又是怎麼重置 CyclicBarrier 計數器,等待執行緒再一次突破屏障呢? 帶著這些問題我們來看一看原始碼 ### 原始碼分析 同樣先開啟 CyclicBarrier 的類結構,展開類全部內容,其實也沒多少內容 ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083750779-1980596660.png) 從類結構中看到有: 1. await() 方法,猜測應該和 CountDownLatch 是類似的,都是獲取同步狀態,阻塞自己 2. ReentrantLock,CyclicBarrier 內部竟然也用到了我們之前講過的 ReentrantLock,猜測這個鎖一定保護 CyclicBarrier 的某個變數,那肯定也是基於 AQS 相關知識了 3. Condition,存在條件,猜測會有等待/通知機制的運用 我們繼續帶著這些猜測,結合上面的例項程式碼一點點來驗證 ```java // 建立 CyclicBarrier 例項,計數器的值設定為2 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2); ``` 檢視建構函式 (這裡的英文註釋捨不得刪掉,因為說的太清楚了,我來結合註釋來說明一下): ```java private final int parties; private int count; public CyclicBarrier(int parties) { this(parties, null); } /** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } ``` 根據註釋說明,parties 代表衝破屏障之前要觸發的執行緒總數,count 本身又是計數器,那問題來了 > 直接就用 count 不就可以了嘛?為啥同樣用於初始化計數器,要維護兩個變數呢? 從 parties 和 count 的變數宣告中,你也能看出一些門道,前者有 final 修飾,初始化後就不可以改變了,因為 CyclicBarrier 的設計目的是可以迴圈利用的,所以始終用 parties 來記錄執行緒總數,當 count 計數器變為 0 後,如果沒有 parties 的值賦給它,怎麼進行重新複用再次計數呢,所以這裡維護兩個變數很有必要 接下來就看看 await() 到底是怎麼實現的 ```java // 從方法簽名上可以看出,該方法同樣可以被中斷,另外還有一個 BrokenBarrierException 異常,我們一會看 public int await() throws InterruptedException, BrokenBarrierException { try { // 呼叫內部 dowait 方法, 第一個引數為 false,表示不設定超時時間,第二個引數也就沒了意義 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } ``` 接下來看看 `dowait(false, 0L)` 做了哪些事情 (這個方法內容有點多,別擔心,邏輯並不複雜,請看關鍵程式碼註釋) ```java private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 還記得之前說過的 Lock 標準正規化嗎? JDK 內部都是這麼使用的,你一定也要遵循正規化 lock.lock(); try { final Generation g = generation; // broken 是靜態內部類 Generation唯一的一個成員變數,用於記錄當前屏障是否被打破,如果打破,則丟擲 BrokenBarrierException 異常 // 這裡感覺挺困惑的,我們要【衝破】屏障,這裡【打破】屏障卻丟擲異常,注意我這裡的用詞 if (g.broken) throw new BrokenBarrierException(); // 如果執行緒被中斷,則會通過 breakBarrier 方法將 broken 設定為true,也就是說,如果有執行緒收到中斷通知,直接就打破屏障,停止 CyclicBarrier, 並喚醒所有執行緒 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // ************************************ // 因為 breakBarrier 方法在這裡會被呼叫多次,為了便於大家理解,我直接將 breakBarrier 程式碼插入到這裡 private void breakBarrier() { // 將打破屏障標識 設定為 true generation.broken = true; // 重置計數器 count = parties; // 喚醒所有等待的執行緒 trip.signalAll(); } // ************************************ // 每當一個執行緒呼叫 await 方法,計數器 count 就會減1 int index = --count; // 當 count 值減到 0 時,說明這是最後一個呼叫 await() 的子執行緒,則會突破屏障 if (index == 0) { // tripped boolean ranAction = false; try { // 獲取建構函式中的 barrierCommand,如果有值,則執行該方法 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 啟用其他因呼叫 await 方法而被阻塞的執行緒,並重置 CyclicBarrier nextGeneration(); // ************************************ // 為了便於大家理解,我直接將 nextGeneration 實現插入到這裡 private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } // ************************************ return 0; } finally { if (!ranAction) breakBarrier(); } } // index 不等於0, 說明當前不是最後一個執行緒呼叫 await 方法 // loop until tripped, broken, interrupted, or timed out for (;;) { try { // 沒有設定超時時間 if (!timed) // 進入條件等待 trip.await(); else if (nanos > 0L) // 否則,判斷超時時間,這個我們在 AQS 中有說明過,包括為什麼最後超時閾值 spinForTimeoutThreshold 不再比較的原因,大家會看就好 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 條件等待被中斷,則判斷是否有其他執行緒已經使屏障破壞。若沒有則進行屏障破壞處理,並丟擲異常;否則再次中斷當前執行緒 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); // 如果新一輪迴環結束,會通過 nextGeneration 方法新建 generation 物件 if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } ``` doWait 就是 CyclicBarrier 的核心邏輯, 可以看出,該方法入口使用了 ReentrantLock,這也就是為什麼 Generation broken 變數沒有被宣告為 volatile 型別保持可見性,因為對其的更改都是在鎖的內部,同樣在鎖的內部對計數器 count 做更新,也保證了原子性 doWait 方法中,是通過 nextGeneration 方法來重新初始化/重置 CyclicBarrier 狀態的,該類中還有一個 reset() 方法,也是重置 CyclicBarrier 狀態的 ```java public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } ``` 但 reset() 方法並沒有在 CyclicBarrier 內部被呼叫,顯然是給 CyclicBarrier 使用者來呼叫的,那問題來了 > 什麼時候呼叫 reset() 方法呢 正常情況下,CyclicBarrier 是會被自動重置狀態的,從 reset 的方法實現中可以看出呼叫了 breakBarrier 方法,也就是說,呼叫 reset 會使當前處在等待中的執行緒最終丟擲 BrokenBarrierException 並立即被喚醒,所以說 reset() 只會在你想打破屏障時才會使用 ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083750972-129608231.png) 上述示例,我們構建 CyclicBarrier 物件時,並沒有傳遞 barrierCommand 物件, 我們修改示例傳入一個 barrierCommand 物件,看看會有什麼結果: ```java // 建立 CyclicBarrier 例項,計數器的值設定為2 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> { log.info("全部執行結束"); }); ``` 執行結果: ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083751455-1501984510.png) 從執行結果中來看,每次衝破屏障後都會執行 CyclicBarrier 初始化 barrierCommand 的方法, 這與我們對 doWait() 方法的分析完全吻合,從上面的執行結果中可以看出,最後一個執行緒是執行 barrierCommand run() 方法的執行緒,我們再來形象化的展示一下整個過程 ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083752163-387761899.png) 從上圖可以看出,barrierAction 與每次突破屏障是序列化的執行過程,假如 barrierAction 是很耗時的彙總操作,那這就是可以優化的點了,我們繼續修改程式碼 ```java // 建立單執行緒執行緒池 private static Executor executor = Executors.newSingleThreadExecutor(); // 建立 CyclicBarrier 例項,計數器的值設定為2 private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> { executor.execute(() -> gather()); }); private static void gather() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("全部執行結束"); } ``` 我們這裡將 CyclicBarrier 的回撥函式 barrierAction使用單執行緒的執行緒池,這樣最後一個衝破屏障的執行緒就不用等待 barrierAction 的執行,直接分配個執行緒池裡的執行緒非同步執行,進一步提升效率 執行結果如下: ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083752607-1483329165.png) 我們再形象化的看一下整個過程: ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083800717-1792792743.png) 這裡使用了單一執行緒池,增加了並行操作,提高了程式執行效率,那問題來了: > 如果 barrierAction 非常非常耗時,衝破屏障的任務就可能堆積在單一執行緒池的等待佇列中,就存在 OOM 的風險,那怎麼辦呢? 這是就要需要一定的限流策略或者使用執行緒池的拒絕的略等 > 那把單一執行緒池換成非單一的固定執行緒池不就可以了嘛?比如 fixed(5) 乍一看確實能緩解單執行緒池可能引起的任務堆積問題,上面程式碼我們看到的 gather() 方法,假如該方法內部沒有使用鎖或者說存在竟態條件,那 CyclicBarrier 的回撥函式 barrierAction 使用多執行緒必定引起結果的不準確 所以在實際使用中還要結合具體的業務場景不斷優化程式碼,使之更加健壯 ## 總結 本文講解了 CountDownLatch 和 CyclicBarrier 的經典使用場景以及實現原理,以及在使用過程中可能會遇到的問題,比如將大的 list 拆分作業就可以用到前者,讀取多個 Excel 的sheet 頁,最後進行結果彙總就可以用到後者 (文中完整示例程式碼已上傳) 最後,再形象化的比喻一下 - **CountDownLatch 主要用來解決一個執行緒等待多個執行緒的場景,可以類比旅遊團團長要等待所有遊客到齊才能去下一個景點** - **而 CyclicBarrier 是一組執行緒之間的相互等待,可以類比幾個驢友之間的不離不棄,共同到達某個地方,再繼續出發,這樣反覆** ## 靈魂追問 1. 怎樣拿到 CyclicBarrier 的彙總結果呢? 2. 執行緒池中的 Future 特性你有使用過嗎? 接下來,咱們就聊聊那些可以使用的 Future 特性 ## 參考 1. Java 併發程式設計實戰 2. Java 併發程式設計的藝術 3. Java 併發程式設計之美 4. [When to reset CyclicBarrier in java multithreading](https://stackoverflow.com/questions/24104642/when-to-reset-cyclicbarrier-in-java-multithreading#:~:text=CyclicBarriers%20are%20useful%20in%20programs,the%20waiting%20threads%20are%20released.) [個人部落格:https://dayarch.top](https://dayarch.top) [加我微信好友](https://mp.weixin.qq.com/s/G7BXuZh0Qh1-mE6ts4LJqQ), 進群娛樂學習交流,備註「進群」 > ### 歡迎持續關注公眾號:「日拱一兵」 > - 前沿 Java 技術乾貨分享 > - 高效工具彙總 | 回覆「工具」 > - 面試問題分析與解答 > - 技術資料領取 | 回覆「資料」 > 以讀偵探小說思維輕鬆趣味學習 Java 技術棧相關知識,本著將複雜問題簡單化,抽象問題具體化和圖形化原則逐步分解技術問題,技術持續更新,請持續關注...... --- ![](https://img2020.cnblogs.com/other/1583165/202007/1583165-20200701083802790-1821816