1. 程式人生 > >Java鎖--CyclicBarrier

Java鎖--CyclicBarrier

fin title ann 返回 inner illegal -c 計數器 set

轉載請註明出處:http://www.cnblogs.com/skywang12345/p/3533995.html

CyclicBarrier簡介

CyclicBarrier是一個同步輔助類,允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。因為該 barrier 在釋放等待線程後可以重用,所以稱它為循環 的 barrier。

註意比較CountDownLatch和CyclicBarrier:
(01) CountDownLatch的作用是允許1或N個線程等待其他線程完成執行;而CyclicBarrier則是允許N個線程相互等待。
(02) CountDownLatch的計數器無法被重置;CyclicBarrier的計數器可以被重置後使用,因此它被稱為是循環的barrier。


CyclicBarrier函數列表

技術分享圖片
CyclicBarrier(int parties)
創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作。
CyclicBarrier(int parties, Runnable barrierAction)
創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最後一個進入 barrier 的線程執行。

int await()
在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。
int await(long timeout, TimeUnit unit)
在所有參與者都已經在此屏障上調用 await 方法之前將一直等待,或者超出了指定的等待時間。
int getNumberWaiting()
返回當前在屏障處等待的參與者數目。
int getParties()
返回要求啟動此 barrier 的參與者數目。
boolean isBroken()
查詢此屏障是否處於損壞狀態。
void reset()
將屏障重置為其初始狀態。
技術分享圖片

CyclicBarrier數據結構

CyclicBarrier的UML類圖如下:

技術分享圖片

CyclicBarrier是包含了"ReentrantLock對象lock"和"Condition對象trip",它是通過獨占鎖實現的。下面通過源碼去分析到底是如何實現的。

CyclicBarrier源碼分析(基於JDK1.7.0_40)

CyclicBarrier完整源碼(基於JDK1.7.0_40)

技術分享圖片 View Code

CyclicBarrier是通過ReentrantLock(獨占鎖)和Condition來實現的。下面,我們分析CyclicBarrier中3個核心函數: 構造函數, await()作出分析。

1. 構造函數

CyclicBarrier的構造函數共2個:CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第1個構造函數是調用第2個構造函數來實現的,下面第2個構造函數的源碼。

技術分享圖片
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // parties表示“必須同時到達barrier的線程個數”。
    this.parties = parties;
    // count表示“處在等待狀態的線程個數”。
    this.count = parties;
    // barrierCommand表示“parties個線程到達barrier時,會執行的動作”。
    this.barrierCommand = barrierAction;
}
技術分享圖片

2. 等待函數

CyclicBarrier.java中await()方法如下:

技術分享圖片
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
    }
}
技術分享圖片

說明:await()是通過dowait()實現的。

技術分享圖片
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 獲取“獨占鎖(lock)”
    lock.lock();
    try {
        // 保存“當前的generation”
        final Generation g = generation;

        // 若“當前generation已損壞”,則拋出異常。
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果當前線程被中斷,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       // 將“count計數器”-1
       int index = --count;
       // 如果index=0,則意味著“有parties個線程到達barrier”。
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               // 如果barrierCommand不為null,則執行該動作。
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 喚醒所有等待線程,並更新generation。
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // 當前線程一直阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生,
        // 當前線程才繼續執行。
        for (;;) {
            try {
                // 如果不是“超時等待”,則調用awati()進行等待;否則,調用awaitNanos()進行等待。
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果等待過程中,線程被中斷,則執行下面的函數。
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 如果“當前generation已經損壞”,則拋出異常。
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果“generation已經換代”,則返回index。
            if (g != generation)
                return index;

            // 如果是“超時等待”,並且時間已到,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放“獨占鎖(lock)”
        lock.unlock();
    }
}
技術分享圖片

說明:dowait()的作用就是讓當前線程阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生,當前線程才繼續執行。
(01) generation是CyclicBarrier的一個成員遍歷,它的定義如下:

private Generation generation = new Generation();

private static class Generation {
    boolean broken = false;
}

在CyclicBarrier中,同一批的線程屬於同一代,即同一個Generation;CyclicBarrier中通過generation對象,記錄屬於哪一代。
當有parties個線程到達barrier,generation就會被更新換代。

(02) 如果當前線程被中斷,即Thread.interrupted()為true;則通過breakBarrier()終止CyclicBarrier。breakBarrier()的源碼如下:

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

breakBarrier()會設置當前中斷標記broken為true,意味著“將該Generation中斷”;同時,設置count=parties,即重新初始化count;最後,通過signalAll()喚醒CyclicBarrier上所有的等待線程。

(03) 將“count計數器”-1,即--count;然後判斷是不是“有parties個線程到達barrier”,即index是不是為0。
當index=0時,如果barrierCommand不為null,則執行該barrierCommand,barrierCommand就是我們創建CyclicBarrier時,傳入的Runnable對象。然後,調用nextGeneration()進行換代工作,nextGeneration()的源碼如下:

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

首先,它會調用signalAll()喚醒CyclicBarrier上所有的等待線程;接著,重新初始化count;最後,更新generation的值。

(04) 在for(;;)循環中。timed是用來表示當前是不是“超時等待”線程。如果不是,則通過trip.await()進行等待;否則,調用awaitNanos()進行超時等待。

CyclicBarrier的使用示例

示例1
新建5個線程,這5個線程達到一定的條件時,它們才繼續往後運行。

技術分享圖片
 1 import java.util.concurrent.CyclicBarrier;
 2 import java.util.concurrent.BrokenBarrierException;
 3 
 4 public class CyclicBarrierTest1 {
 5 
 6     private static int SIZE = 5;
 7     private static CyclicBarrier cb;
 8     public static void main(String[] args) {
 9 
10         cb = new CyclicBarrier(SIZE);
11 
12         // 新建5個任務
13         for(int i=0; i<SIZE; i++)
14             new InnerThread().start();
15     }
16 
17     static class InnerThread extends Thread{
18         public void run() {
19             try {
20                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
21 
22                 // 將cb的參與者數量加1
23                 cb.await();
24 
25                 // cb的參與者數量等於5時,才繼續往後執行
26                 System.out.println(Thread.currentThread().getName() + " continued.");
27             } catch (BrokenBarrierException e) {
28                 e.printStackTrace();
29             } catch (InterruptedException e) {
30                 e.printStackTrace();
31             }
32         }
33     }
34 }
技術分享圖片

運行結果

技術分享圖片
Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.
技術分享圖片

結果說明:主線程中新建了5個線程,所有的這些線程都調用cb.await()等待。所有這些線程一直等待,直到cb中所有線程都達到barrier時,這些線程才繼續運行!

示例2

新建5個線程,當這5個線程達到一定的條件時,執行某項任務。

技術分享圖片
 1 import java.util.concurrent.CyclicBarrier;
 2 import java.util.concurrent.BrokenBarrierException;
 3 
 4 public class CyclicBarrierTest2 {
 5 
 6     private static int SIZE = 5;
 7     private static CyclicBarrier cb;
 8     public static void main(String[] args) {
 9 
10         cb = new CyclicBarrier(SIZE, new Runnable () {
11             public void run() {
12                 System.out.println("CyclicBarrier‘s parties is: "+ cb.getParties());
13             }
14         });
15 
16         // 新建5個任務
17         for(int i=0; i<SIZE; i++)
18             new InnerThread().start();
19     }
20 
21     static class InnerThread extends Thread{
22         public void run() {
23             try {
24                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");
25 
26                 // 將cb的參與者數量加1
27                 cb.await();
28 
29                 // cb的參與者數量等於5時,才繼續往後執行
30                 System.out.println(Thread.currentThread().getName() + " continued.");
31             } catch (BrokenBarrierException e) {
32                 e.printStackTrace();
33             } catch (InterruptedException e) {
34                 e.printStackTrace();
35             }
36         }
37     }
38 }
技術分享圖片

運行結果

技術分享圖片
Thread-1 wait for CyclicBarrier.
Thread-2 wait for CyclicBarrier.
Thread-3 wait for CyclicBarrier.
Thread-4 wait for CyclicBarrier.
Thread-0 wait for CyclicBarrier.
CyclicBarrier‘s parties is: 5
Thread-0 continued.
Thread-4 continued.
Thread-2 continued.
Thread-3 continued.
Thread-1 continued.
技術分享圖片

結果說明:主線程中新建了5個線程,所有的這些線程都調用cb.await()等待。所有這些線程一直等待,直到cb中所有線程都達到barrier時,執行新建cb時註冊的Runnable任務。

Java鎖--CyclicBarrier