1. 程式人生 > >Java多執行緒之JUC包:CyclicBarrier原始碼學習筆記

Java多執行緒之JUC包:CyclicBarrier原始碼學習筆記

若有不正之處請多多諒解,並歡迎批評指正。

請尊重作者勞動成果,轉載請標明原文連結:

CyclicBarrier是java.util.concurrent包中提供的同步工具。通過這個工具我們可以實現n個執行緒相互等待。我們可以通過引數指定達到公共屏障點之後的行為。

先上原始碼:

  1 package java.util.concurrent;
  2 import java.util.concurrent.locks.*;
  3 
  4 public class CyclicBarrier {
  5 
  6     private static class
Generation { 7 boolean broken = false; 8 } 9 10 private final ReentrantLock lock = new ReentrantLock(); 11 private final Condition trip = lock.newCondition(); 12 private final int parties; 13 private final Runnable barrierCommand; 14 private Generation generation = new
Generation(); 15 private int count; 16 17 private void nextGeneration() { 18 // signal completion of last generation 19 trip.signalAll(); 20 // set up next generation 21 count = parties; 22 generation = new Generation(); 23 } 24 25
26 private void breakBarrier() { 27 generation.broken = true; 28 count = parties; 29 trip.signalAll(); 30 } 31 32 private int dowait(boolean timed, long nanos) 33 throws InterruptedException, BrokenBarrierException, TimeoutException { 34 final ReentrantLock lock = this.lock; 35 lock.lock(); 36 try { 37 final Generation g = generation; 38 39 //小概率事件:該執行緒在等待鎖的過程中,barrier被破壞 40 if (g.broken) 41 throw new BrokenBarrierException(); 42 43 //小概率事件:該執行緒在等待鎖的過程中被中斷 44 if (Thread.interrupted()) { 45 breakBarrier(); 46 throw new InterruptedException(); 47 } 48 49 int index = --count; 50 //當有parties個執行緒到達barrier 51 if (index == 0) { // tripped 52 boolean ranAction = false; 53 try { 54 final Runnable command = barrierCommand; 55 //如果設定了barrierCommand,令最後到達的barrier的執行緒執行它 56 if (command != null) 57 command.run(); 58 ranAction = true; 59 nextGeneration(); 60 return 0; 61 } finally { 62 //注意:當執行barrierCommand出現異常時,ranAction派上用場 63 if (!ranAction) 64 breakBarrier(); 65 } 66 } 67 68 // loop until tripped, broken, interrupted, or timed out 69 for (;;) { 70 try { 71 if (!timed) 72 trip.await(); 73 else if (nanos > 0L) 74 //注意:nanos值標識了是否超時,後續用這個nanos值判斷是否breakBarrier 75 nanos = trip.awaitNanos(nanos); 76 } catch (InterruptedException ie) { 77 if (g == generation && ! g.broken) { 78 breakBarrier(); 79 throw ie; 80 } else { 81 //小概率事件:該執行緒被中斷,進入鎖等待佇列 82 //在等待過程中,另一個執行緒更新或破壞了generation 83 //當該執行緒獲取鎖之後,應重置interrupt標誌而不是丟擲異常 84 //原因在於:它中斷的太晚了,generation已更新或破壞,它丟擲InterruptedException的時機已經過去, 85 //兩種情況: 86 //①g被破壞。已經有一個執行緒丟擲了InterruptedException(也只能由第一個拋),與它同時等待的都拋BrokenBarrierException(後續檢查broken標誌會拋)。 87 //②g被更新:此時拋異常沒意義(後續檢查g更新後會return index),這裡重置interrupt標誌,讓執行緒繼續執行,讓這個標誌由上層處理 88 Thread.currentThread().interrupt(); 89 } 90 } 91 92 //barrier被破壞,丟擲異常 93 if (g.broken) 94 throw new BrokenBarrierException(); 95 96 //barrier正常進入下一迴圈,上一代await的執行緒繼續執行 97 if (g != generation) 98 return index; 99 100 //只要有一個超時,就breakBarrier,後續執行緒拋的就是barrier損壞異常 101 if (timed && nanos <= 0L) { 102 breakBarrier(); 103 throw new TimeoutException(); 104 } 105 } 106 } finally { 107 lock.unlock(); 108 } 109 } 110 111 112 public CyclicBarrier(int parties, Runnable barrierAction) { 113 if (parties <= 0) throw new IllegalArgumentException(); 114 this.parties = parties; 115 this.count = parties; 116 this.barrierCommand = barrierAction; 117 } 118 119 public CyclicBarrier(int parties) { 120 this(parties, null); 121 } 122 123 124 public int getParties() { 125 return parties; 126 } 127 128 129 public int await() throws InterruptedException, BrokenBarrierException { 130 try { 131 return dowait(false, 0L); 132 } catch (TimeoutException toe) { 133 throw new Error(toe); // cannot happen; 134 } 135 } 136 137 138 public int await(long timeout, TimeUnit unit) 139 throws InterruptedException, 140 BrokenBarrierException, 141 TimeoutException { 142 return dowait(true, unit.toNanos(timeout)); 143 } 144 145 146 public boolean isBroken() { 147 final ReentrantLock lock = this.lock; 148 lock.lock(); 149 try { 150 return generation.broken; 151 } finally { 152 lock.unlock(); 153 } 154 } 155 156 public void reset() { 157 final ReentrantLock lock = this.lock; 158 lock.lock(); 159 try { 160 breakBarrier(); // break the current generation 161 nextGeneration(); // start a new generation 162 } finally { 163 lock.unlock(); 164 } 165 } 166 167 public int getNumberWaiting() { 168 final ReentrantLock lock = this.lock; 169 lock.lock(); 170 try { 171 return parties - count; 172 } finally { 173 lock.unlock(); 174 } 175 } 176 }
View Code

我們先來看一下CyclicBarrier的成員變數:

1 private final ReentrantLock lock = new ReentrantLock();
2 private final Condition trip = lock.newCondition();
3 private final int parties;
4 private final Runnable barrierCommand;
5 private Generation generation = new Generation();
6 private int count;

CyclicBarrier是通過獨佔鎖lock和Condition物件trip來實現的,成員parties表示必須有parties個執行緒到達barrier,成員barrierCommand表示當parties個執行緒到達之後要執行的程式碼,成員count表示離觸發barrierCommand還差count個執行緒(還有count個執行緒未到達barrier),成員generation表示當前的“代數”,“cyclic”表示可迴圈使用,generation是對一次迴圈的標識。注意:Generation是CyclicBarrier的一個私有內部類,他只有一個成員變數來標識當前的barrier是否已“損壞”:

1 private static class Generation {
2     boolean broken = false;
3 }

建構函式

 1 public CyclicBarrier(int parties, Runnable barrierAction) {
 2     if (parties <= 0) throw new IllegalArgumentException();
 3     this.parties = parties;
 4     this.count = parties;
 5     this.barrierCommand = barrierAction;
 6 }
 7 
 8 public CyclicBarrier(int parties) {
 9     this(parties, null);
10 }

CyclicBarrier提供了兩種建構函式,沒有指定barrierCommand的建構函式是呼叫第二個建構函式實現的。第二個建構函式有兩個引數:parties和barrierAction,分別用來初始化成員parties和barrierCommand。注意,parties必須大於0,否則會丟擲IllegalArgumentException。

await()方法

1 public int await() throws InterruptedException, BrokenBarrierException {
2     try {
3         return dowait(false, 0L);
4     } catch (TimeoutException toe) {
5      throw new Error(toe); // cannot happen;
6     }
7 }

await方法是由呼叫dowait方法實現的,兩個引數分別代表是否定時等待和等待的時長。

doawait()方法

 1     private int dowait(boolean timed, long nanos) 
 2             throws InterruptedException, BrokenBarrierException, TimeoutException {
 3         final ReentrantLock lock = this.lock;
 4         lock.lock();
 5         try {
 6             final Generation g = generation;
 7 
 8             //小概率事件:該執行緒在等待鎖的過程中,barrier被破壞
 9             if (g.broken)
10                 throw new BrokenBarrierException();
11 
12             //小概率事件:該執行緒在等待鎖的過程中被中斷
13             if (Thread.interrupted()) {
14                 breakBarrier();
15                 throw new InterruptedException();
16             }
17 
18            int index = --count;
19            //當有parties個執行緒到達barrier
20            if (index == 0) {  // tripped
21                 boolean ranAction = false;
22                 try {
23                    final Runnable command = barrierCommand;
24                    //如果設定了barrierCommand,令最後到達的barrier的執行緒執行它
25                    if (command != null)
26                         command.run();
27                     ranAction = true;
28                     nextGeneration();
29                     return 0;
30                } finally {
31                     //注意:當執行barrierCommand出現異常時,ranAction派上用場
32                     if (!ranAction)
33                         breakBarrier();
34                }
35            }
36 
37             // loop until tripped, broken, interrupted, or timed out
38             for (;;) {
39                 try {
40                     if (!timed)
41                         trip.await();
42                     else if (nanos > 0L)
43                         //注意:nanos值標識了是否超時,後續用這個nanos值判斷是否breakBarrier
44                         nanos = trip.awaitNanos(nanos);
45                 } catch (InterruptedException ie) {
46                     if (g == generation && ! g.broken) {
47                         breakBarrier();
48                         throw ie;
49                     } else {
50                         //小概率事件:該執行緒被中斷,進入鎖等待佇列
51                         //在等待過程中,另一個執行緒更新或破壞了generation
52                         //當該執行緒獲取鎖之後,應重置interrupt標誌而不是丟擲異常
53                         //原因在於:它中斷的太晚了,generation已更新或破壞,它丟擲InterruptedException的時機已經過去,
54                         //兩種情況:
55                         //①g被破壞:已有一個執行緒丟擲InterruptedException(只能由第一個拋),與它同時等待的都拋BrokenBarrierException(後續檢查broken標誌會拋)。
56                         //②g被更新:此時拋異常沒意義(後續檢查g更新後會return index),這裡重置interrupt標誌,讓執行緒繼續執行,讓這個標誌由上層處理
57                         Thread.currentThread().interrupt();
58                     }
59                 }
60 
61                 //barrier被破壞,丟擲異常
62                 if (g.broken)
63                     throw new BrokenBarrierException();
64                 
65                 //barrier正常進入下一迴圈,上一代await的執行緒繼續執行
66                 if (g != generation)
67                     return index;
68                 
69                 //只要有一個超時,就breakBarrier,後續執行緒拋的就是barrier損壞異常
70                 if (timed && nanos <= 0L) {
71                     breakBarrier();
72                     throw new TimeoutException();
73                 }
74             }
75         } finally {
76             lock.unlock();
77         }
78     }

dowait方法是CyclicBarrier的精華。應該重點來理解。

方法開頭首先申請鎖,然後做了兩個判斷:g.broken和Thread.interrupted(),這兩個判斷是分別處理兩種小概率的事件:①該執行緒在等待鎖的過程中,barrier被破壞②該執行緒在等待鎖的過程中被中斷。這兩個事件應丟擲相應的異常。接下來dowait方法修改了令count減1,如果此時count減為0,說明已經有parties個執行緒到達barrier,這時由最後到達barrier的執行緒去執行barrierCommand。注意,這裡設定了一個布林值ranAction,作用是來標識barrierCommand是否被正確執行完畢,如果執行失敗,finally中會執行breakBarrier操作。如果count尚未減為0,則在Condition物件trip上執行await操作,注意:這裡有一個InterruptedException的catch子句。當前執行緒在await中被中斷時,會丟擲InterruptedException,這時候如果g==generation&&!g.broken的話,我們執行breakBarrier操作,同時丟擲這個異常;如果g!=generation或者g.broken==true的話,我們的操作是重置interrupt標誌而不是丟擲這個異常。這麼做的原因我們分兩種情況討論:

①g被破壞,這也是一個小概率事件,當前執行緒被中斷後進入鎖等待佇列,此時另一個執行緒由於某種原因(超時或者被中斷)在他之前獲取了鎖並執行了breakBarrier方法,那麼當前執行緒持有鎖之後就不應再拋InterruptedException,邏輯上應該處理barrier被破壞事件,事實上在後續g.broken的檢查中,他會丟擲一個BrokenBarrierException。而當前的InterruptedException被我們捕獲卻沒有做出處理,所以執行interrupt方法重置中斷標誌,交由上層程式處理。

②g被更新:說明當前執行緒在即將完成等待之際被中斷,此時拋異常沒意義(後續檢查g更新後會return index),這裡重置interrupt標誌,讓執行緒繼續執行,讓這個標誌由上層處理。

後續對g.broken和g!=generation的判斷,分表代表了被喚醒執行緒(非最後一個到達barrier的執行緒,也不是被中斷或第一個超時的執行緒)的兩種退出方法的方式:第一種是以barrier被破壞告終(然後拋異常),第二個是barrier等到parties個執行緒,壽終正寢(返回該執行緒的到達次序index)。

最後一個if是第一個超時執行緒執行breakBarrier操作並跑出異常。最後finally子句要釋放鎖。

至此,整個doawait方法流程就分析完畢了,我們可以發現,在barrier上等待的執行緒,如果以拋異常結束的話,只有第一個執行緒會拋InterruptedException或TimeoutException並執行breakBarrier操作,其他等待執行緒只能拋BrokenBarrierException,邏輯上這也是合理的:一個barrier只能因超時或中斷被破壞一次。

 1 private void nextGeneration() {
 2     trip.signalAll();
 3     count = parties;
 4     generation = new Generation();
 5 }
 6 
 7 private void breakBarrier() {
 8     generation.broken = true;
 9     count = parties;
10     trip.signalAll();
11 }

doawait方法中用到的nextGeneration方法將所有等待執行緒喚醒,更新generation物件,復位count,進入下一輪任務。breakBarrier方法將generation狀態值為broken,復位count(這個復位看上去沒有用,但實際上,在broken之後reset之前,如果呼叫getNumberWaiting方法檢視等待執行緒數的話,復位count是合理的),並喚醒所有等待執行緒。在呼叫reset更新generation之前,barrier將處於不可用狀態。

reset()方法

 1 public void reset() {
 2     final ReentrantLock lock = this.lock;
 3     lock.lock();
 4     try {
 5         breakBarrier();   // break the current generation
 6         nextGeneration(); // start a new generation
 7     } finally {
 8         lock.unlock();
 9     }
10 }

reset方法先break當執行breakBarrier操作(如果有執行緒在barrier上等待,呼叫reset會導致BrokenBarrierException),再更新generation物件。

相關推薦

Java執行JUCCyclicBarrier原始碼學習筆記

若有不正之處請多多諒解,並歡迎批評指正。 請尊重作者勞動成果,轉載請標明原文連結: CyclicBarrier是java.util.concurrent包中提供的同步工具。通過這個工具我們可以實現n個執行緒相互等待。我們可以通過引數指定達到公共屏障點之後的行為。 先上原始碼:

Java執行JUCCondition原始碼學習筆記

若有不正之處請多多諒解,並歡迎批評指正。 請尊重作者勞動成果,轉載請標明原文連結: Condition在JUC框架下提供了傳統Java監視器風格的wait、notify和notifyAll相似的功能。 Condition必須被繫結到一個獨佔鎖上使用。ReentrantLock中獲取Conditi

Java執行JUCSemaphore原始碼學習筆記

若有不正之處請多多諒解,並歡迎批評指正。 請尊重作者勞動成果,轉載請標明原文連結: Semaphore是JUC包提供的一個共享鎖,一般稱之為訊號量。 Semaphore通過自定義的同步器維護了一個或多個共享資源,執行緒通過呼叫acquire獲取共享資源,通過呼叫release釋放。 原始碼:

Java執行JUCAbstractQueuedSynchronizer(AQS)原始碼學習筆記

若有不正之處請多多諒解,並歡迎批評指正。 請尊重作者勞動成果,轉載請標明原文連結: AbstractQueuedSynchronizer(AQS)是一個同步器框架,在實現鎖的時候,一般會實現一個繼承自AQS的內部類sync,作為我們的自定義同步器。AQS內部維護了一個state成員和一個佇列。其中

Java執行Condition實現原理和原始碼分析(四)

章節概覽、 1、概述 上面的幾個章節我們基於lock(),unlock()方法為入口,深入分析了獨佔鎖的獲取和釋放。這個章節我們在此基礎上,進一步分析AQS是如何實現await,signal功能。其功能上和synchronize的wait,notify一樣。

Java執行ThreadPoolExecutor實現原理和原始碼分析(五)

章節概覽、 1、概述 執行緒池的顧名思義,就是執行緒的一個集合。需要用到執行緒,從集合裡面取出即可。這樣設計主要的作用是優化執行緒的建立和銷燬而造成的資源浪費的情況。Java中的執行緒池的實現主要是JUC下面的ThreadPoolExecutor類完成的。下面

JDK中執行JUC集合的JDK原始碼解讀配合大神的一起看,秒懂。

一、    “JUC集合”01之框架 1)  概要 之前,在"Java 集合系列目錄(Category)"中,講解了Java集合包中的各個類。接下來,將展開對JUC包中的集合進行學習。在學習之前,先溫習一下"Java集合包"。本章內容包括: Java集合包 JUC中的

Java執行ReentrantLock實現原理和原始碼分析(二)

章節概覽、 1、ReentrantLock概述 ReentrantLock字面含義是可重入的互斥鎖,實現了和synchronize關鍵字一樣的獨佔鎖功能。但是ReentrantLock使用的是自旋鎖,通過CAS硬體原語指令實現的輕量級的鎖,不會引起上下文切換

Java執行系列--“JUC原子類”03 AtomicLong原子類

轉自:https://www.cnblogs.com/skywang12345/p/3514593.html(含部分修改) 概要 AtomicInteger, AtomicLong和AtomicBoolean這3個基本型別的原子類的原理和用法相似。本章以AtomicLong對基本型別的原子類進行介紹。內容

Java執行系列---“JUC原子類”04 AtomicLongArray原子類

轉自:https://www.cnblogs.com/skywang12345/p/3514604.html(含部分修改) 概要 AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray這3個數組型別的原子類的原理和用法相似。本章以AtomicLo

Java執行系列---“JUC原子類”05 AtomicReference原子類

轉自:http://www.cnblogs.com/skywang12345/p/3514623.html(部分修改) 概要 本章對AtomicReference引用型別的原子類進行介紹。內容包括: AtomicReference介紹和函式列表 AtomicReference原始碼分析(基於J

Java執行系列---“JUC原子類”06 AtomicLongFieldUpdater原子類

轉自:http://www.cnblogs.com/skywang12345/p/3514635.html (含部分修改) 概要 AtomicIntegerFieldUpdater, AtomicLongFieldUpdater和AtomicReferenceFieldUpdater這3個修改類的成員的原

Java執行系列---“JUC原子類”01 原子類的實現(CAS演算法)

轉自:https://blog.csdn.net/ls5718/article/details/52563959  & https://blog.csdn.net/mmoren/article/details/79185862(含部分修改)   在JDK 5之前Java語言是靠

Java執行系列---“JUC原子類”02 框架

轉自:http://www.cnblogs.com/skywang12345/p/3514589.html   根據修改的資料型別,可以將JUC包中的原子操作類可以分為4類。 1. 基本型別: AtomicInteger, AtomicLong, AtomicBoolean ;2.&

Java執行系列---“JUC鎖”01 框架

轉自:http://www.cnblogs.com/skywang12345/p/3496098.html(含部分修改)   本章,我們介紹鎖的架構;後面的章節將會對它們逐個進行分析介紹。目錄如下: 01. Java多執行緒系列--“JUC鎖”01之 框架 02. 

Java執行系列---“JUC鎖”02 ReentrantLock

轉自:https://www.jianshu.com/p/96c89e6e7e90 & https://blog.csdn.net/Somhu/article/details/78874634 (含部分修改) 一.ReentrantLock鎖 1.Lock介面 Lock,鎖

Java執行系列---“JUC鎖”06 公平鎖(下)

轉自:http://www.cnblogs.com/skywang12345/p/3496609.html 釋放公平鎖(基於JDK1.7.0_40) 1. unlock() unlock()在ReentrantLock.java中實現的,原始碼如下: public void unlock() {

Java執行系列--“JUC執行池”01 執行池架構

概要 前面分別介紹了”Java多執行緒基礎”、”JUC原子類”和”JUC鎖”。本章介紹JUC的最後一部分的內容——執行緒池。內容包括: 執行緒池架構圖 執行緒池示例 執行緒池架構圖 執行緒池的架構圖如下: 1、Executor

Java執行系列--“JUC執行池”05 執行池原理(四)

概要 本章介紹執行緒池的拒絕策略。內容包括: 拒絕策略介紹 拒絕策略對比和示例 拒絕策略介紹 執行緒池的拒絕策略,是指當任務新增到執行緒池中被拒絕,而採取的處理措施。 當任務新增到執行緒池中之所以被拒絕,可能是由於:第一,執行緒池異常關閉。第二,任務數量

讀書筆記java執行執行同步

閱讀的書籍:《java瘋狂講義》 關鍵詞:執行緒安全問題,同步程式碼塊,同步方法,釋放同步監視器的鎖定,同步鎖,死鎖 執行緒安全問題:當使用多個執行緒來訪問同一個資料時,會導致一些錯誤情況的發生 到底什麼是執行緒安全問題呢,先看一個經典的案例:銀行取錢的問題