1. 程式人生 > >J.U.C並發框架源碼閱讀(七)CyclicBarrier

J.U.C並發框架源碼閱讀(七)CyclicBarrier

sse 核心 exception row new t shared ati processes str

基於版本jdk1.7.0_80

java.util.concurrent.CyclicBarrier

代碼如下

技術分享
/*
 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 */

/*
 *
 *
 *
 *
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * 
http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; import java.util.concurrent.locks.*; /** * A synchronization aid that allows a set of threads to all wait for * each other to reach a common barrier point. CyclicBarriers are * useful in programs involving a fixed sized party of threads that * must occasionally wait for each other. The barrier is called * <em>cyclic</em> because it can be re-used after the waiting threads * are released. * * <p>A <tt>CyclicBarrier</tt> supports an optional {
@link Runnable} command * that is run once per barrier point, after the last thread in the party * arrives, but before any threads are released. * This <em>barrier action</em> is useful * for updating shared-state before any of the parties continue. * * <p><b>Sample usage:</b> Here is an example of * using a barrier in a parallel decomposition design: * <pre> * class Solver { * final int N; * final float[][] data; * final CyclicBarrier barrier; * * class Worker implements Runnable { * int myRow; * Worker(int row) { myRow = row; } * public void run() { * while (!done()) { * processRow(myRow); * * try { * barrier.await(); * } catch (InterruptedException ex) { * return; * } catch (BrokenBarrierException ex) { * return; * } * } * } * } * * public Solver(float[][] matrix) { * data = matrix; * N = matrix.length; * barrier = new CyclicBarrier(N, * new Runnable() { * public void run() { * mergeRows(...); * } * }); * for (int i = 0; i < N; ++i) * new Thread(new Worker(i)).start(); * * waitUntilDone(); * } * } * </pre> * Here, each worker thread processes a row of the matrix then waits at the * barrier until all rows have been processed. When all rows are processed * the supplied {
@link Runnable} barrier action is executed and merges the * rows. If the merger * determines that a solution has been found then <tt>done()</tt> will return * <tt>true</tt> and each worker will terminate. * * <p>If the barrier action does not rely on the parties being suspended when * it is executed, then any of the threads in the party could execute that * action when it is released. To facilitate this, each invocation of * {@link #await} returns the arrival index of that thread at the barrier. * You can then choose which thread should execute the barrier action, for * example: * <pre> if (barrier.await() == 0) { * // log the completion of this iteration * }</pre> * * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model * for failed synchronization attempts: If a thread leaves a barrier * point prematurely because of interruption, failure, or timeout, all * other threads waiting at that barrier point will also leave * abnormally via {@link BrokenBarrierException} (or * {@link InterruptedException} if they too were interrupted at about * the same time). * * <p>Memory consistency effects: Actions in a thread prior to calling * {@code await()} * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> * actions that are part of the barrier action, which in turn * <i>happen-before</i> actions following a successful return from the * corresponding {@code await()} in other threads. * * @since 1.5 * @see CountDownLatch * * @author Doug Lea */ public class CyclicBarrier { /** * Each use of the barrier is represented as a generation instance. * The generation changes whenever the barrier is tripped, or * is reset. There can be many generations associated with threads * using the barrier - due to the non-deterministic way the lock * may be allocated to waiting threads - but only one of these * can be active at a time (the one to which <tt>count</tt> applies) * and all the rest are either broken or tripped. * There need not be an active generation if there has been a break * but no subsequent reset. */ private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count; /** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } /** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We‘re about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } /** * Creates a new <tt>CyclicBarrier</tt> that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** * Creates a new <tt>CyclicBarrier</tt> that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties) { this(parties, null); } /** * Returns the number of parties required to trip this barrier. * * @return the number of parties required to trip this barrier */ public int getParties() { return parties; } /** * Waits until all {@linkplain #getParties parties} have invoked * <tt>await</tt> on this barrier. * * <p>If the current thread is not the last to arrive then it is * disabled for thread scheduling purposes and lies dormant until * one of the following things happens: * <ul> * <li>The last thread arrives; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * one of the other waiting threads; or * <li>Some other thread times out while waiting for barrier; or * <li>Some other thread invokes {@link #reset} on this barrier. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * </ul> * then {@link InterruptedException} is thrown and the current thread‘s * interrupted status is cleared. * * <p>If the barrier is {@link #reset} while any thread is waiting, * or if the barrier {@linkplain #isBroken is broken} when * <tt>await</tt> is invoked, or while any thread is waiting, then * {@link BrokenBarrierException} is thrown. * * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting, * then all other waiting threads will throw * {@link BrokenBarrierException} and the barrier is placed in the broken * state. * * <p>If the current thread is the last thread to arrive, and a * non-null barrier action was supplied in the constructor, then the * current thread runs the action before allowing the other threads to * continue. * If an exception occurs during the barrier action then that exception * will be propagated in the current thread and the barrier is placed in * the broken state. * * @return the arrival index of the current thread, where index * <tt>{@link #getParties()} - 1</tt> indicates the first * to arrive and zero indicates the last to arrive * @throws InterruptedException if the current thread was interrupted * while waiting * @throws BrokenBarrierException if <em>another</em> thread was * interrupted or timed out while the current thread was * waiting, or the barrier was reset, or the barrier was * broken when {@code await} was called, or the barrier * action (if present) failed due an exception. */ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } /** * Waits until all {@linkplain #getParties parties} have invoked * <tt>await</tt> on this barrier, or the specified waiting time elapses. * * <p>If the current thread is not the last to arrive then it is * disabled for thread scheduling purposes and lies dormant until * one of the following things happens: * <ul> * <li>The last thread arrives; or * <li>The specified timeout elapses; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * one of the other waiting threads; or * <li>Some other thread times out while waiting for barrier; or * <li>Some other thread invokes {@link #reset} on this barrier. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * </ul> * then {@link InterruptedException} is thrown and the current thread‘s * interrupted status is cleared. * * <p>If the specified waiting time elapses then {@link TimeoutException} * is thrown. If the time is less than or equal to zero, the * method will not wait at all. * * <p>If the barrier is {@link #reset} while any thread is waiting, * or if the barrier {@linkplain #isBroken is broken} when * <tt>await</tt> is invoked, or while any thread is waiting, then * {@link BrokenBarrierException} is thrown. * * <p>If any thread is {@linkplain Thread#interrupt interrupted} while * waiting, then all other waiting threads will throw {@link * BrokenBarrierException} and the barrier is placed in the broken * state. * * <p>If the current thread is the last thread to arrive, and a * non-null barrier action was supplied in the constructor, then the * current thread runs the action before allowing the other threads to * continue. * If an exception occurs during the barrier action then that exception * will be propagated in the current thread and the barrier is placed in * the broken state. * * @param timeout the time to wait for the barrier * @param unit the time unit of the timeout parameter * @return the arrival index of the current thread, where index * <tt>{@link #getParties()} - 1</tt> indicates the first * to arrive and zero indicates the last to arrive * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the specified timeout elapses * @throws BrokenBarrierException if <em>another</em> thread was * interrupted or timed out while the current thread was * waiting, or the barrier was reset, or the barrier was broken * when {@code await} was called, or the barrier action (if * present) failed due an exception */ public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } /** * Queries if this barrier is in a broken state. * * @return {@code true} if one or more parties broke out of this * barrier due to interruption or timeout since * construction or the last reset, or a barrier action * failed due to an exception; {@code false} otherwise. */ public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } /** * Resets the barrier to its initial state. If any parties are * currently waiting at the barrier, they will return with a * {@link BrokenBarrierException}. Note that resets <em>after</em> * a breakage has occurred for other reasons can be complicated to * carry out; threads need to re-synchronize in some other way, * and choose one to perform the reset. It may be preferable to * instead create a new barrier for subsequent use. */ public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } /** * Returns the number of parties currently waiting at the barrier. * This method is primarily useful for debugging and assertions. * * @return the number of parties currently blocked in {@link #await} */ public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
View Code

0. CyclicBarrier簡介

CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續幹活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然後當前線程被阻塞。

CyclicBarrier初始時還可帶一個Runnable的參數, 此Runnable任務在CyclicBarrier的數目達到後,所有其它線程被喚醒前被執行。

1. CyclicBarrier和CountDownLatch的區別

CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復雜的業務場景,比如如果計算發生錯誤,可以重置計數器,並讓線程們重新執行一次。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數量。isBroken方法用來知道阻塞的線程是否被中斷。

2. CyclicBarrier原理概述

跟之前講解的幾個並發容器不同,CyclicBarrier沒有用到AQS,而是直接使用了ReentrantLock作為核心同步工具。

CyclicBarrier內部維護了一個ReentrantLock對象lock,當工作線程調用CyclicBarrier.await方法時,會檢查已經到達的線程數是否已經滿足需求,如果沒有,則在lock的一個Condition上等待。如果滿足了需求,則向所有在這個Condition上等待的線程發送喚醒信號。然後重置CyclicBarrier。

3. CyclicBarrier.await方法的調用軌跡

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }

    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();//加鎖
        try {
            final Generation g = generation;

            if (g.broken)//這一代的CyclicBarrier已經損壞
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {//如果工作線程被中斷,則CyclicBarrier損壞
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;//由於方法一開始就加鎖,所以這裏可以安全的執行自減操作,無需cas
           if (index == 0) {  // tripped//如果計數減到0了,那麽執行收尾工作:執行barrierCommand,重置CyclicBarrier
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();//如果有barrierCommand則執行
                   ranAction = true;
                   nextGeneration();//重置CyclicBarrier
                   return 0;
               } finally {
                   if (!ranAction)//如果barrierCommand拋出異常了,CyclicBarrier不會被重置,其他的等待線程也不會被喚醒
                       breakBarrier();
               }
           }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {//工作線程在lock的Condition上等待
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//實現超時語義
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We‘re about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

邏輯很簡單,主要就是利用內部維護的ReentrantLock對象lock,以及lock關聯的Condition對象trip,完成等待/超時等待/響應中斷的語義

4. 一些輔助方法

    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();//喚醒所有等待線程
    }


    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();//喚醒所有等待線程
        // set up next generation
        count = parties;
        generation = new Generation();
    }


    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

總的來說邏輯不算復雜,就不多解釋了。

J.U.C並發框架源碼閱讀(七)CyclicBarrier