1. 程式人生 > >J.U.C工具類中的CountDownLatch和CyclicBarrier

J.U.C工具類中的CountDownLatch和CyclicBarrier

內部 inter 同步 結果 異常 輔助 ber 計數 損壞

講解CyclicBarrier

      API文檔是這樣介紹的:一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點(common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此使CyclicBarrier很有用。因為該barrier在釋放等待線程後可以重用,所以稱它為循環的barrier。
      CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最後一個線程到達之後(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作 很有用。

    通俗點講:讓一組線程到達一個屏障時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有別屏障攔截的線程才會幹活。
類圖:
技術分享圖片
上圖可知:CyclicBarrier的內部是使用重入鎖ReetrantLock和Condition。
構造函數
技術分享圖片
源碼

  /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

Parties:表示攔截線程的數量。
barrierAction:為CyclicBarrier接受的Runnable命令,用於線程到達屏障時,優先執行barrierAction,用於處理復雜的業務場景.
<br/>
現在我們來看看CyclicBarrier最重要的函數await():
代碼:

await()方法:
  public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
dowait()方法:  
 private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
            TimeoutException {
        //獲取鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //分代
            final Generation g = generation;

            //當前generation“已損壞”,拋出BrokenBarrierException異常
            //拋出該異常一般都是某個線程在等待某個處於“斷開”狀態的CyclicBarrie
            if (g.broken)
                //當某個線程試圖等待處於斷開狀態的 barrier 時,或者 barrier 進入斷開狀態而線程處於等待狀態時,拋出該異常
                throw new BrokenBarrierException();

            //如果線程中斷,終止CyclicBarrier
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            //進來一個線程 count - 1
            int index = --count;
            //count == 0 表示所有線程均已到位,觸發Runnable任務
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //觸發任務
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //喚醒所有等待線程,並更新generation
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            for (;;) {
                try {
                    //如果不是超時等待,則調用Condition.await()方法等待
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        //超時等待,調用Condition.awaitNanos()方法等待
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We‘re about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                //generation已經更新,返回index
                if (g != generation)
                    return index;

                //“超時等待”,並且時間已到,終止CyclicBarrier,並拋出異常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

我們再看看jdkAPI文檔描述:

public int await()
          throws InterruptedException,
                 BrokenBarrierException
在所有 參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。
如果當前線程不是將到達的最後一個線程,出於調度目的,將禁用它,且在發生以下情況之一前,該線程將一直處於休眠狀態:

最後一個線程到達;或者
其他某個線程中斷當前線程;或者
其他某個線程中斷另一個等待線程;或者
其他某個線程在等待 barrier 時超時;或者
其他某個線程在此 barrier 上調用 reset()。
如果當前線程:

在進入此方法時已經設置了該線程的中斷狀態;或者
在等待時被中斷
則拋出 InterruptedException,並且清除當前線程的已中斷狀態。
如果在線程處於等待狀態時 barrier 被 reset(),或者在調用 await 時 barrier 被損壞,抑或任意一個線程正處於等待狀態,則拋出 BrokenBarrierException 異常。

如果任何線程在等待時被 中斷,則其他所有等待線程都將拋出 BrokenBarrierException 異常,並將 barrier 置於損壞狀態。

如果當前線程是最後一個將要到達的線程,並且構造方法中提供了一個非空的屏障操作,則在允許其他線程繼續運行之前,當前線程將運行該操作。如果在執行屏障操作過程中發生異常,則該異常將傳播到當前線程中,並將 barrier 置於損壞狀態。

返回:
到達的當前線程的索引,其中,索引 getParties() - 1 指示將到達的第一個線程,零指示最後一個到達的線程
拋出:
InterruptedException - 如果當前線程在等待時被中斷
BrokenBarrierException - 如果 另一個 線程在當前線程等待時被中斷或超時,或者重置了 barrier,或者在調用 await 時 barrier 被損壞,抑或由於異常而導致屏障操作(如果存在)失敗。

示例:

public class CyclicBarrierTest {
    public static CyclicBarrier cyclicBarrier;
    public static class WorkThread extends Thread
    {
        public void run()
        {
            try {
                System.out.println(Thread.currentThread().getName()+"已進入");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName()+"離開");

            } catch (InterruptedException | BrokenBarrierException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }
    public static void main(String[] args) {
        cyclicBarrier=new CyclicBarrier(4,new Runnable() {
            public void run()
            {
                System.out.println("人已經到齊");
            }

        });
        for(int i=0;i<4;i++)
        {
            new WorkThread().start();
        }

    }
}
運行結果: 
Thread-0已進入
Thread-2已進入
Thread-1已進入
Thread-3已進入
人已經到齊
Thread-3離開
Thread-0離開
Thread-1離開
Thread-2離開

講解CountDownLatch

看一下JDK文檔的描述:

  •     一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。

  • 用給定的計數 初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之後,會釋放所有等待的線程,await 的所有後續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。

  • CountDownLatch 是一個通用同步工具,它有很多用途。將計數 1 初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,或入口:在通過調用 countDown() 的線程打開入口前,所有調用 await 的線程都一直在入口處等待。用 N 初始化的 CountDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。

  • CountDownLatch 的一個有用特性是,它不要求調用 countDown 方法的線程等到計數到達零時才繼續,而在所有線程都能通過之前,它只是阻止任何線程繼續通過一個 await。

構造函數

 /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

從上述可以看出:count :在線程能通過 await() 之前,必須調用 countDown() 的次數
說到這我們來看看CountDownLatch中的連個方法:countDown()和await();
await()方法:

  public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
       private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

描述:
使當前線程在鎖存器倒計數至零之前一直等待,除非線程被 中斷。
如果當前計數為零,則此方法立即返回。

如果當前計數大於零,則出於線程調度目的,將禁用當前線程,且在發生以下兩種情況之一前,該線程將一直處於休眠狀態:

由於調用 countDown() 方法,計數到達零;或者
其他某個線程中斷當前線程。
如果當前線程:

在進入此方法時已經設置了該線程的中斷狀態;或者
在等待時被中斷,
則拋出 InterruptedException,並且清除當前線程的已中斷狀態。

countDown()方法:

   public void countDown() {
        sync.releaseShared(1);
    }

描述:遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。
如果當前計數大於零,則將計數減少。如果新的計數為零,出於線程調度目的,將重新啟用所有的等待線程。
如果當前計數等於零,則不發生任何操作。
示例:

public class CountDownLatchTest {
    private static class WorkThread extends Thread
    {
        private CountDownLatch cdl;
        private int sleepSecond;

        public WorkThread(String name, CountDownLatch cdl, int sleepSecond)
        {
            super(name);
            this.cdl = cdl;
            this.sleepSecond = sleepSecond;
        }

        public void run()
        {
            try
            {
                System.out.println(this.getName() + "啟動了,時間為" + System.currentTimeMillis());
                Thread.sleep(sleepSecond * 1000);
                cdl.countDown();
                System.out.println(this.getName() + "執行完了,時間為" + System.currentTimeMillis());
            } 
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }

    private static class DoneThread extends Thread
    {
        private CountDownLatch cdl;

        public DoneThread(String name, CountDownLatch cdl)
        {
            super(name);
            this.cdl = cdl;
        }

        public void run()
        {
            try
            {
                System.out.println(this.getName() + "要等待了, 時間為" + System.currentTimeMillis());
                cdl.await();
                System.out.println(this.getName() + "等待完了, 時間為" + System.currentTimeMillis());
            } 
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception
    {
        CountDownLatch cdl = new CountDownLatch(3);
        DoneThread dt0 = new DoneThread("DoneThread1", cdl);
        DoneThread dt1 = new DoneThread("DoneThread2", cdl);
        dt0.start();
        dt1.start();
        WorkThread wt0 = new WorkThread("WorkThread1", cdl, 2);
        WorkThread wt1 = new WorkThread("WorkThread2", cdl, 3);
        WorkThread wt2 = new WorkThread("WorkThread3", cdl, 4);
        wt0.start();
        wt1.start();
        wt2.start();
    }

}
運行結果:
DoneThread2要等待了, 時間為1529917959491
DoneThread1要等待了, 時間為1529917959491
WorkThread1啟動了,時間為1529917959492
WorkThread2啟動了,時間為1529917959492
WorkThread3啟動了,時間為1529917959492
WorkThread1執行完了,時間為1529917961493
WorkThread2執行完了,時間為1529917962492
WorkThread3執行完了,時間為1529917963492
DoneThread1等待完了, 時間為1529917963492
DoneThread2等待完了, 時間為1529917963492

兩者之間的比較

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置。

J.U.C工具類中的CountDownLatch和CyclicBarrier