1. 程式人生 > >CyclicBarrier原始碼探究 (JDK 1.8)

CyclicBarrier原始碼探究 (JDK 1.8)

`CyclicBarrier`也叫回環柵欄,能夠實現讓一組執行緒執行到柵欄處並阻塞,等到所有執行緒都到達柵欄時再一起執行的功能。“迴環”意味著`CyclicBarrier`可以多次重複使用,相比於`CountDownLatch`只能使用一次,`CyclicBarrier`可以節省許多資源,並且還可以在構造器中傳入任務,當柵欄條件滿足時執行這個任務。`CyclicBarrier`是使用了`ReentrantLock`,主要方法在執行時都會加鎖,因此併發效能不是很高。 ## 1.相關欄位 ``` //重入鎖,CyclicBarrier內部通過重入鎖實現執行緒安全 private final ReentrantLock lock = new ReentrantLock(); //執行緒阻塞時的等待條件 private final Condition trip = lock.newCondition(); //需要等待的執行緒數 private final int parties; //柵欄開啟之後首先執行的任務 private final Runnable barrierCommand; //記錄當前的分代標記 private Generation generation = new Generation(); //當前還需要等待多少個執行緒執行到柵欄位置 private int count; ``` 需要注意的是`generation`欄位,用於標記柵欄當前處在哪一代。當滿足一定的條件時(例如呼叫了`reset`方法,或者柵欄開啟等),柵欄狀態會切換到下一代,實際就是`new`一個新的`Generation`物件,這是`CyclicBarrier`的內部類,程式碼非常簡單,如下: ``` private static class Generation { boolean broken = false; //標記柵欄是否被破壞 } ``` 實際使用的過程中,會利用`generation`欄位判斷當前是否在同一個分代,而使用`broker`欄位判斷柵欄是否被破壞。 ## 2.建構函式 `CyclicBarrier`有兩個過載的建構函式,建構函式只是對上述的相關欄位進行初始化,如下: ``` public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } ``` ## 3.核心方法 - `await` `await`是開發時最常用到的方法了,同`CountDownLatch`一樣,`CyclicBarrier`也提供了兩個`await`方法,一個不帶引數,一個帶有超時引數,其內部只是簡單呼叫了一下`dowait`方法: ``` public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } ``` 接下來看看至關重要的`dowait`方法: ``` private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //加重入鎖 lock.lock(); try { //首先獲取年齡代資訊 final Generation g = generation; //如果柵欄狀態被破壞,丟擲異常,例如先啟動的執行緒呼叫了breakBarrier方法,後啟動的執行緒就能夠看到g.broker=true if (g.broken) throw new BrokenBarrierException(); //檢測執行緒的中斷狀態,如果執行緒設定了中斷狀態,則通過breakBarrier設定柵欄為已破壞狀態,並喚醒其他執行緒 //如果這裡能夠檢測到中斷狀態,那隻可能是在await方法外部設定的 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //每呼叫一次await,就將需要等待的執行緒數減1 int index = --count; //index=0表示這是最後一個到達的執行緒,由該執行緒執行下面的邏輯 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; //如果在構造器中傳入了第二個任務引數,就在放開柵欄前先執行這個任務 if (command != null) command.run(); ranAction = true; //正常結束,需要喚醒阻塞的執行緒,並換代 nextGeneration(); return 0; } finally { //try程式碼塊如果正常執行,ranAction就一定等於true,而try程式碼塊唯一可能發生異常的地方就是command.run(), //因此這裡為了保證在任務執行失敗時,將柵欄標記為已破壞,喚醒阻塞執行緒 if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out ////柵欄沒被破壞,執行緒沒有被中斷,且不是最後一個到達柵欄的執行緒,就會執行下面的自旋,排隊等待 for (;;) { try { //沒有設定超時標記,就加入等待佇列 if (!timed) trip.await(); //設定了超時標記,但目前還沒有超時,則繼續等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //如果執行緒等待的過程中被中斷,會執行到這裡 //g == generation表示當前還在同一個年齡分代中,!g.broker表示當前柵欄狀態沒有被破壞 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //上面的條件不滿足,說明:1)g!=generation,說明執行緒執行到這裡時已經換代了 //2)沒有換代,但是柵欄被破壞了 //無論哪種情況,都只是簡單地設定一下當前執行緒的中斷狀態 Thread.currentThread().interrupt(); } } //柵欄被破壞,丟擲異常 //注意,在breakBarrier方法中會喚醒所有等待條件的執行緒,這些執行緒會執行到這裡,判斷柵欄已經被破壞,都會丟擲異常 if (g.broken) throw new BrokenBarrierException(); //距離上一次設定g變數的值已經過去很長時間了,在執行過程中generation可能已經發生改變, //當前執行緒還是前幾代的,不需要再迴圈阻塞了,直接返回上一代剩餘需要等待的執行緒數 //注意:程式碼中breakBarrier方法和nextGeneration方法都會喚醒阻塞的執行緒,但是breakBarrier在上一個判斷就被攔截了, //因此走到這裡的有三種情況: //a)最後一個執行緒正常執行,柵欄開啟導致其他執行緒被喚醒;不屬於當前代的執行緒直接返回, //屬於當前代的則可能因為沒到柵欄開放條件要繼續迴圈阻塞 //b)柵欄被重置(呼叫了reset方法),此時g!=negeration,全都直接返回 //c)執行緒等待超時了,不屬於當前代的返回就可以了,屬於當前代的則要設定generation.broken = true if (g != generation) return index; //如果執行緒等待超時,標記柵欄為破壞狀態並丟擲異常,如果還沒超時,則自旋後又重新阻塞 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //別忘了解鎖 lock.unlock(); } } ``` `dowait`的方法邏輯是:每一個呼叫`await`方法的執行緒都會將計數`count`減`1`,最後一個執行緒將`count`減為`0`時,順帶還要執行`barrierCommand`指定的任務,並將`generation`切換到下一代,當然,最重要的還是要喚醒之前在柵欄處阻塞的執行緒。由於`trip`對應的`Condition`物件沒有任何地方會修改,因此`trip.signalAll()`會喚醒所有在該條件上等待的執行緒,如果執行緒在等待的過程中,其他執行緒將`generation`更新到下一代,就會出現被喚醒的執行緒中有部分還屬於之前那一代的情況。 接下來將會對`dowait`用到的一些方法進行簡單介紹。 - `breakBarrier` `dowait`方法有四個地方呼叫了`breakBarrier`,從名字可以看出,該方法會將`generation.broken`設定為`true`,除此之外,還會還原`count`的值,並且喚醒所有被阻塞的執行緒: ``` private void breakBarrier() { generation.broken = true; count = parties; //喚醒所有的阻塞執行緒 trip.signalAll(); } ``` 縱觀`CyclicBarrier`原始碼,`generation.broken`統一在`breakBarrier`方法中被設定為`true`,而一旦將`generation.broken`設定為`true`之後,程式碼中檢查到這個狀態之後都會丟擲異常,柵欄就沒辦法再使用了(可以手動呼叫`reset`進行重置),而原始碼中會在以下幾種情況呼叫`breakBarrier`方法: 1) 當前執行緒被中斷 2)通過構造器傳入的任務執行失敗 3) 條件等待時被中斷 4) 執行緒等待超時 5) 顯式呼叫`reset`方法 - `nextGeneration` ``` private void nextGeneration() { // 喚醒所有的阻塞執行緒 trip.signalAll(); // 開啟下一代 count = parties; generation = new Generation(); } ``` - `reset` `reset`方法主要是結束這一代,並切換到下一代 ``` public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } ``` 介紹到這裡,整個`CyclicBarrier`已經差不多介紹完了,但是內部的流程遠遠沒有這麼簡單,因為很大一部分邏輯封裝在`AbstractQueuedSynchronizer`中,這個類定義了阻塞的執行緒如何加入等待佇列,又如何被喚醒,因此如果想要深入瞭解執行緒等待的邏輯,還需要仔細研究`AbstractQueuedSynchronizer`才行。本文不會對這部分內容進行介紹,後面有時間的話將會專門對其進行