1. 程式人生 > >Java併發(十三):同步屏障CyclicBarrier

Java併發(十三):同步屏障CyclicBarrier

CyclicBarrier 的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。

一、應用舉例

public class CyclicBarrierTest {
    private static CyclicBarrier cyclicBarrier;

    static class CyclicBarrierThread extends Thread {
        public void run() {
            System.out.println(Thread.currentThread().getName() 
+ "到了"); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { cyclicBarrier = new CyclicBarrier(5, new Runnable() { @Override
public void run() { System.out.println("人到齊了,開會吧...."); } }); for (int i = 0; i < 5; i++) { new CyclicBarrierThread().start(); } } }

二、類結構

public class CyclicBarrier {
    private static class Generation { // 內部類,當有parties個執行緒到達barrier,就會更新換代
        
boolean broken = false; // 是否損壞 } private final ReentrantLock lock = new ReentrantLock(); // 重入鎖 private final Condition trip = lock.newCondition(); private final int parties; // 等待執行緒總數量 private final Runnable barrierCommand; // 達到等待執行緒數量後執行的執行緒 private Generation generation = new Generation(); // 當有parties個執行緒到達barrier,就會更新換代 private int count; // 記錄當前執行緒數量 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); } }

三、原理解析

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier(); // 代失效,喚醒所有執行緒
                throw new InterruptedException();
            }

            int index = --count; // 計數
            if (index == 0) { // 達到要求數量
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run(); // 達到等待執行緒數量後執行barrierCommand
                    ranAction = true;
                    nextGeneration(); // 喚醒本代所有執行緒,生成新一代,重置count
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 執行緒數量未達到要求數量,將執行緒掛起等待
            for (;;) {
                try {
                    if (!timed)
                        trip.await(); // 將執行緒加入condition佇列掛起
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && !g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                // 特殊情況處理
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    // 代失效,喚醒所有執行緒
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    // 喚醒本代所有執行緒,生成新一代,重置count
    private void nextGeneration() {
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }