1. 程式人生 > >JUC鎖——CyclicBarrier

JUC鎖——CyclicBarrier

一、什麼是CyclicBarrier

CyclicBarrier初始化時規定一個數目,然後計算呼叫了CyclicBarrier.await()進入等待的執行緒數。當執行緒數達到了這個數目時,所有進入等待狀態的執行緒被喚醒並繼續。 CyclicBarrier就象它名字的意思一樣,可看成是個障礙, 所有的執行緒必須到齊後才能一起通過這個障礙。 CyclicBarrier初始時還可帶一個Runnable的引數, 此Runnable任務在CyclicBarrier的數目達到後,所有其它執行緒被喚醒前被執行。

方法列表:

CyclicBarrier(int parties)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(執行緒)處於等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作。
CyclicBarrier(int parties, Runnable barrierAction)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(執行緒)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最後一個進入 barrier 的執行緒執行。

int await()
在所有參與者都已經在此 barrier 上呼叫 await 方法之前,將一直等待。
int await(long timeout, TimeUnit unit)
在所有參與者都已經在此屏障上呼叫 await 方法之前將一直等待,或者超出了指定的等待時間。
int getNumberWaiting()
返回當前在屏障處等待的參與者數目。
int getParties()
返回要求啟動此 barrier 的參與者數目。
boolean isBroken()
查詢此屏障是否處於損壞狀態。
void reset()
將屏障重置為其初始狀態。

二、CyclicBarrier資料結構

CyclicBarrier是包含了"ReentrantLock物件lock"和"Condition物件trip",它是通過獨佔鎖實現的。

三、原始碼分析

3.1 建構函式

CyclicBarrier的建構函式共2個:CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第1個建構函式是呼叫第2個建構函式來實現的,下面第2個建構函式的原始碼。

parties表示“必須同時到達barrier的執行緒個數”。

count表示處在等待狀態的執行緒個數

barrierCommand表示“parties個執行緒到達barrier時,會執行的動作”。

3.2 等待函式

主要功能是dowait實現的:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 獲取“獨佔鎖(lock)”
    lock.lock();
    try {
        // 儲存“當前的generation”
        final Generation g = generation;

        // 若“當前generation已損壞”,則丟擲異常。
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果當前執行緒被中斷,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待執行緒。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       // 將“count計數器”-1
       int index = --count;
       // 如果index=0,則意味著“有parties個執行緒到達barrier”。
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               // 如果barrierCommand不為null,則執行該動作。
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 喚醒所有等待執行緒,並更新generation。
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // 當前執行緒一直阻塞,直到“有parties個執行緒到達barrier” 或 “當前執行緒被中斷” 或 “超時”這3者之一發生,
        // 當前執行緒才繼續執行。
        for (;;) {
            try {
                // 如果不是“超時等待”,則呼叫awati()進行等待;否則,呼叫awaitNanos()進行等待。
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果等待過程中,執行緒被中斷,則執行下面的函式。
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 如果“當前generation已經損壞”,則丟擲異常。
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果“generation已經換代”,則返回index。
            if (g != generation)
                return index;

            // 如果是“超時等待”,並且時間已到,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待執行緒。
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放“獨佔鎖(lock)”
        lock.unlock();
    }
}

dowait()的作用就是讓當前執行緒阻塞,直到“有parties個執行緒到達barrier” 或 “當前執行緒被中斷” 或 “超時”這3者之一發生,當前執行緒才繼續執行。

四、CyclicBarrier的使用示例

執行緒數達到5個,才能喚醒所有執行緒繼續往下執行

package com.juc.countdownlatch;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @Author: 98050
 * @Time: 2018-12-20 21:52
 * @Feature: CyclicBarrier的使用
 */
public class Test2 {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            Write write = new Write(cyclicBarrier);
            write.start();
        }
    }
}
class Write extends Thread{

    private CyclicBarrier cyclicBarrier;

    public Write(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        System.out.println("執行緒"+Thread.currentThread().getName() + ",正在寫入資料");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("執行緒"+Thread.currentThread().getName() + ",資料寫入成功");

        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName()+"繼續執行");
    }
}

 

如果只建立3個執行緒:

結果:一直等待,無法繼續向下執行