1. 程式人生 > >CyclicBarrier(同步屏障)的簡單使用

CyclicBarrier(同步屏障)的簡單使用

CyclicBarrer簡介

CyclicBarrer,可迴圈使用的屏障,功能是讓多個執行緒到達某個點時被阻塞,直到最後一個執行緒達到這個屏障便釋放所有執行緒,和CountDownLatch的區別即在於執行緒釋放後屏障是否可重用。

例項化:通過帶引數的new CyclicBarrer(N)可例項化CyclicBarrier,N代表需要屏障攔截(阻塞)的執行緒數,也可以使用new CyclicBarrier(N,Runnable)的方式指定當所有阻塞的執行緒都到達屏障點後優先執行的任務barrierAction。

public CyclicBarrier(int parties) {……}
public CyclicBarrier(int parties, Runnable barrierAction) {……}

阻塞執行緒:通過呼叫await方法告訴當前執行緒已到達屏障,進入阻塞等待狀態。也可以指定阻塞時間await(timeout,unit),防止阻塞時間過長,當阻塞超過指定時間,丟擲TimeoutException

public int await() throws InterruptedException, BrokenBarrierException {……}
public int await(long timeout, TimeUnit unit) {……}

測試demo:以 三個執行緒計算任務為例,其中一個執行緒計算時間很長,於是呼叫await(time,unit)來指定等待時間

public class CyclicBarrierService {
    static ExecutorService executorService = new ThreadPoolExecutor(3, 3, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(15));
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

    public static void statistic() throws Exception {
        Future<Integer> task1 = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("我是任務一");
                cyclicBarrier.await(1, TimeUnit.SECONDS);
                return 1;
            }
        });
        Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("我是任務二");
                cyclicBarrier.await(1, TimeUnit.SECONDS);
                return 2;
            }
        });
        Future<Integer> task3 = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("我是任務三");
                Thread.sleep(5000);   //模擬任務3要執行很長時間
                cyclicBarrier.await(1, TimeUnit.SECONDS);
                return 3;
            }
        });

        int result1 = task1.get();
        int result2 =  task2.get();
        int result3 = task3.get();
        System.out.println("多執行緒計算結果為");
        System.out.println(result1 + result2 + result3);
        executorService.shutdown();
    }

    public static void main(String[] args) throws Exception {
        statistic();
    }
}

因為我在獲取執行緒計算結果時候未使用FutureTask.isDone()來判斷當前任務是否計算完成(直接呼叫FutureTask.get()可能會阻塞,加了isDone判斷,由於子執行緒任務還被阻塞在屏障點,所以獲取不到計算結果),上述程式碼就丟擲超時異常

我是任務一
我是任務二
我是任務三
Exception in thread "main" java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:188)
	at com.pptv.activityapi.controller.actmodule.CyclicBarrierService.statistic(CyclicBarrierService.java:49)
	at com.pptv.activityapi.controller.actmodule.CyclicBarrierService.main(CyclicBarrierService.java:62)
Caused by: java.util.concurrent.TimeoutException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
	at com.pptv.activityapi.controller.actmodule.CyclicBarrierService$1.call(CyclicBarrierService.java:25)
	at com.pptv.activityapi.controller.actmodule.CyclicBarrierService$1.call(CyclicBarrierService.java:21)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

重置計數器N:reset方法將屏障重置為其初始狀態。 如果任何一方當前正在屏障等待,他們將返回BrokenBarrierException。

測試demo:

public class CyclicBarrierService {
    static ExecutorService executorService = new ThreadPoolExecutor(3, 3, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(15));
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

    public static void statistic() {
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                    log.info("我是任務一……");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                    log.info("我是任務二……");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.reset();
                    log.info("我是任務三……");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        executorService.shutdown();
    }

    public static void main(String[] args) {
        statistic();
    }
}

第三個執行緒呼叫CyclicBarrier.reset()方法後將導致前面兩個阻塞在屏障點的執行緒顯式的丟擲java.util.concurrent.BrokenBarrierException,輸出結果如下

java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:243)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:355)
	at com.pptv.activityapi.controller.actmodule.CyclicBarrierService$1.run(CylicBarrierService.java:25)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.BrokenBarrierException
	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:243)
	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:355)
	at com.pptv.activityapi.controller.actmodule.CyclicBarrierService$2.run(CylicBarrierService.java:37)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
2018-11-14 15:11:39,535[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]我是任務三……

CyclicBarrier使用場景及demo

結合上面說的,CyclicBarrier非常適合多執行緒計算任務,功能還是和CountDownLatch一致的,分組執行任務,最後彙總結果

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.*;

@Slf4j
@Service
public class CyclicBarrierService {
    static ExecutorService executorService = new ThreadPoolExecutor(3, 3, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(15));
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    
    public static void statistic() throws Exception {
        Future<Integer> task1 = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("我是任務一");
                cyclicBarrier.await(1, TimeUnit.SECONDS);
                return 1;
            }
        });
        Future<Integer> task2 = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("我是任務二");
                cyclicBarrier.await(1, TimeUnit.SECONDS);
                return 2;
            }
        });
        Future<Integer> task3 = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("我是任務三");
                cyclicBarrier.await(1, TimeUnit.SECONDS);
                return 3;
            }
        });
        int result1 =  task1.get();
        int result2 =  task2.get();
        int result3 =  task3.get();
        log.info("多執行緒計算結果為");
        log.info(String.valueOf(result1 + result2 + result3));
        executorService.shutdown();
    }

    public static void main(String[] args) throws Exception {
        statistic();
    }
}

輸出結果為

2018-11-14 16:39:24,127[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]我是任務一
2018-11-14 16:39:24,127[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]我是任務二
2018-11-14 16:39:24,127[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]我是任務三
2018-11-14 16:39:24,128[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]多執行緒計算結果為
2018-11-14 16:39:24,128[com.pptv.activityapi.controller.actmodule.CyclicBarrierService][INFO]6

引申閱讀:

Java中的執行緒池和非同步任務詳解
CountDownLatch(閉鎖)的簡單使用