1. 程式人生 > >多線程編程-- part 8 CyclicBarrier

多線程編程-- part 8 CyclicBarrier

復制代碼 給定 循環 div new cnblogs public 就會 leg

CyclicBarrier簡介

  cuclicBarrier允許一組線程互相等待,直到到達某個公共屏障點(common barrier point)。因為該barrier在釋放等待線程後可以重用,所以稱它為循環的barrier。

CyclicBarrier函數列表

CyclicBarrier(int parties)
創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作。
CyclicBarrier(int parties, Runnable barrierAction)
創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最後一個進入 barrier 的線程執行。

int await() 在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。 int await(long timeout, TimeUnit unit) 在所有參與者都已經在此屏障上調用 await 方法之前將一直等待,或者超出了指定的等待時間。 int getNumberWaiting() 返回當前在屏障處等待的參與者數目。 int getParties() 返回要求啟動此 barrier 的參與者數目。 boolean isBroken() 查詢此屏障是否處於損壞狀態。 void reset() 將屏障重置為其初始狀態。

CyclicBarrier數據結構

技術分享

可見其包含的對象:

(1)parties:定義多少個等待線程可以啟動屏障

(2)count:處於等待狀態的線程數量

(3)lock:是ReentrantLock獨占鎖

(4)trip:condition條件控制,控制線程的等待和激活

(5)barrierCommand:表示當parties個處於等待狀態的線程到達屏障時,要執行的動作

(6)generation:記錄線程屬於那一代,當有parties個線程到達barrier時,generation會被換代

CyclicBarrier核心函數

1.構造函數

(1)CyclicBarrier(int parties), CyclicBarrier(int parties, Runnable barrierAction)

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // parties表示“必須同時到達barrier的線程個數”。
    this.parties = parties;
    // count表示“處在等待狀態的線程個數”。
    this.count = parties;
    // barrierCommand表示“parties個線程到達barrier時,會執行的動作”。
    this.barrierCommand = barrierAction;
}

(2)等待函數

  

CyclicBarrier.java中await()方法如下:

技術分享
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen;
    }
}
技術分享

說明:await()是通過dowait()實現的。

技術分享
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 獲取“獨占鎖(lock)”
    lock.lock();
    try {
        // 保存“當前的generation”
        final Generation g = generation;

        // 若“當前generation已損壞”,則拋出異常。
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果當前線程被中斷,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       // 將“count計數器”-1
       int index = --count;
       // 如果index=0,則意味著“有parties個線程到達barrier”。
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               // 如果barrierCommand不為null,則執行該動作。
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 喚醒所有等待線程,並更新generation。
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // 當前線程一直阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生,
        // 當前線程才繼續執行。
        for (;;) {
            try {
                // 如果不是“超時等待”,則調用awati()進行等待;否則,調用awaitNanos()進行等待。
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果等待過程中,線程被中斷,則執行下面的函數。
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 如果“當前generation已經損壞”,則拋出異常。
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果“generation已經換代”,則返回index。
            if (g != generation)
                return index;

            // 如果是“超時等待”,並且時間已到,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放“獨占鎖(lock)”
        lock.unlock();
    }
}
技術分享

說明:dowait()的作用就是讓當前線程阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生,當前線程才繼續執行。
(01) generation是CyclicBarrier的一個成員遍歷,它的定義如下:

private Generation generation = new Generation();

private static class Generation {
    boolean broken = false;
}

在CyclicBarrier中,同一批的線程屬於同一代,即同一個Generation;CyclicBarrier中通過generation對象,記錄屬於哪一代。
當有parties個線程到達barrier,generation就會被更新換代。

(02) 如果當前線程被中斷,即Thread.interrupted()為true;則通過breakBarrier()終止CyclicBarrier。breakBarrier()的源碼如下:

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

breakBarrier()會設置當前中斷標記broken為true,意味著“將該Generation中斷”;同時,設置count=parties,即重新初始化count;最後,通過signalAll()喚醒CyclicBarrier上所有的等待線程。

(03) 將“count計數器”-1,即--count;然後判斷是不是“有parties個線程到達barrier”,即index是不是為0。
當index=0時,如果barrierCommand不為null,則執行該barrierCommand,barrierCommand就是我們創建CyclicBarrier時,傳入的Runnable對象。然後,調用nextGeneration()進行換代工作,nextGeneration()的源碼如下:

private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

首先,它會調用signalAll()喚醒CyclicBarrier上所有的等待線程;接著,重新初始化count;最後,更新generation的值。

(04) 在for(;;)循環中。timed是用來表示當前是不是“超時等待”線程。如果不是,則通過trip.await()進行等待;否則,調用awaitNanos()進行超時等待

CyclicBarrier示例

(1)新建5個線程,都調用await()等待,這些線程都到達屏障,繼續往後執行

public class testHello {

    private static int SIZE = 5;
    private static CyclicBarrier cb;
    public static void main(String[] args) {

        cb = new CyclicBarrier(SIZE);

        // 新建5個任務
        for(int i=0; i<SIZE; i++)
            new InnerThread().start();
    }

    static class InnerThread extends Thread{
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");

                // 將cb的參與者數量加1
                cb.await();

                // cb的參與者數量等於5時,才繼續往後執行
                System.out.println(Thread.currentThread().getName() + " continued.");
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

技術分享

(2)新建5個線程,都調用await()等待,這些線程都到達屏障,執行runnable中定義的任務

public class testHello {

    private static int SIZE = 5;
    private static CyclicBarrier cb;
    public static void main(String[] args) {

        cb = new CyclicBarrier(SIZE, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " runnable start");
            }
        });

        // 新建5個任務
        for(int i=0; i<SIZE; i++)
            new InnerThread().start();
    }

    static class InnerThread extends Thread{
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier.");

                // 將cb的參與者數量加1
                cb.await();

                // cb的參與者數量等於5時,才繼續往後執行
                System.out.println(Thread.currentThread().getName() + " continued.");
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

技術分享

  可以看出當parties個線程到達屏障時,會執行屏障的Runable定義的任務

多線程編程-- part 8 CyclicBarrier