1. 程式人生 > >併發工具類 countDownLatch、CyclicBarrier與Semaphore

併發工具類 countDownLatch、CyclicBarrier與Semaphore

1、等待多執行緒完成的 CountDownLatch

 CountDownLatch允許一個或多個執行緒等待其他執行緒完成操作。

它的建構函式接受一個int型別的引數作為計數器,如果想等待N個點完成,這裡傳入N即可。

呼叫countDown方法時,N就會減1,await方法會阻塞當前執行緒直到N為0時。N個點可以是N個執行緒,也可以是同一執行緒中的N個點。

比如如下程式碼實現等待N個執行緒執行併發並等待N個執行緒結束。

package cn.zhjw.eurekaclientnode2.common;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Desc:
 * @Author: zhaojiwei
 * @Date: 2018/10/29 22:56
 */
public class ThreadSyn {

    CountDownLatch latchOne = new CountDownLatch(1);
    CountDownLatch latchTen = new CountDownLatch(10);

    public static void main(String[] args) throws IOException {
        ThreadSyn threadSyn = new ThreadSyn();
        threadSyn.testCountDownLatch();

        System.in.read();
        System.out.println("##############");
    }

    public void testCountDownLatch() {

        int m = 10;
        for (int i = 0; i < m; i++) {
            Thread thread = new Thread(new MyRunnable(latchOne, latchTen));
            thread.start();
        }
        System.out.println("開始併發執行。。。");
        latchOne.countDown();
        try {
            latchTen.await();
            System.out.println("############### All Thread Done!!#################");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }


    private class MyRunnable implements Runnable {

        private CountDownLatch countDownLathchOne;
        private CountDownLatch countDownLatchTen;

        public MyRunnable(CountDownLatch latchOne, CountDownLatch latchTen) {
            this.countDownLatchTen = latchTen;
            this.countDownLathchOne = latchOne;
        }

        /**
         * When an object implementing interface <code>Runnable</code> is used
         * to create a thread, starting the thread causes the object's
         * <code>run</code> method to be called in that separately executing
         * thread.
         * <p>
         * The general contract of the method <code>run</code> is that it may
         * take any action whatsoever.
         *
         * @see Thread#run()
         */
        @Override
        public void run() {
            try {
                System.out.println("到達等待點 。。。" + Thread.currentThread().getName());
                countDownLathchOne.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("running 。。。" + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            //執行後減去訊號量
            countDownLatchTen.countDown();
            
        }
    }

}

2、同步屏障 CyclicBarrier

顧名思義:可迴圈使用的屏障。它所要做的事情是讓一組執行緒到達一個屏障(同步點)時阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被攔截的執行緒才會繼續執行。

CyclicBarrier 與 CountDownLatch的區別:

  • CountDownLatch 計數器只能使用一次
  • CyclicBarrier 的計數器可通過方法reset() 重置,適合處理更復雜的業務場景;比如計算錯誤可以重置計數器重新計算一次;
  • CyclicBarrier 還有其他可用方法,比如getNumberWating() 可以忽的阻塞的執行緒數,isBorken()用來了解阻塞的執行緒是否被中斷。

比如多執行緒彙總資料後,計算總資料。

package cn.zhjw.eurekaclientnode2.common;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;

/**
 * @Desc:迴圈屏障
 * @Author: zhaojiwei
 * @Date: 2018/11/1 15:52
 */
public class ThreadSynCyclicBarrier {

    static final int COUNT = 10;
    /**
     * 假設需要執行4個任務,開啟4個執行緒去執行
     */
    private Executor executor = Executors.newFixedThreadPool(COUNT);
    private ConcurrentHashMap<String, Integer> bankWaterCount = new ConcurrentHashMap<>();
    /**
     * 迴圈屏障,都到達同步點時,優先執行回撥操作barrierAction
     */
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(COUNT, new BankWaterService(bankWaterCount));



    public static void main(String[] args) {
        ThreadSynCyclicBarrier cb = new ThreadSynCyclicBarrier();
        cb.count();
        System.out.println("$#################");
    }

    /**
     * 模擬多執行緒計算
     */
    public void count() {
        for (int i = 0; i < COUNT; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    //計算當前sheet中銀流資料,計算程式碼省略
                    bankWaterCount.put(Thread.currentThread().getName(), Integer.valueOf(1));
                    //銀流計算完後,插入一個屏障
                    try {
                        System.out.println(Thread.currentThread().getName()+" => await...");
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    class BankWaterService implements Runnable {
        private ConcurrentHashMap<String, Integer> bankWaterCount;

        public BankWaterService(ConcurrentHashMap<String, Integer> bankWaterCount) {
            this.bankWaterCount = bankWaterCount;
        }

        /**
         * When an object implementing interface <code>Runnable</code> is used
         * to create a thread, starting the thread causes the object's
         * <code>run</code> method to be called in that separately executing
         * thread.
         * <p>
         * The general contract of the method <code>run</code> is that it may
         * take any action whatsoever.
         *
         * @see Thread#run()
         */
        @Override
        public void run() {
            int result = 0;
            Set<Map.Entry<String, Integer>> entries = bankWaterCount.entrySet();
            for (Map.Entry<String, Integer> entry : entries) {
                result += entry.getValue();
            }

            //將結果輸出
            bankWaterCount.put("result", result);
            System.out.println(result);
        }


    }


}

3、控制併發量的訊號量 Semaphore

Semaphore 訊號量用於控制多執行緒最大併發度(併發數量)。它協調多執行緒合理利用公共資源。

Semaphore可用於流量的控制,特別是公共資源有限的應用場景,比如資料庫連線。

package cn.zhjw.eurekaclientnode2.common;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @Desc: 訊號量併發流量的限制
 * @Author: zhaojiwei
 * @Date: 2018/11/1 16:56
 */
public class ThreadSynSemaphore {

    private final static int THREAD_COUNT = 30;
    private final static int SEMAPHORE_COUNT = 10;

    /**
     * 30個執行緒
     */
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

    /**
     * 10個訊號量 控制併發度
     *
     * @param args
     */
    private static Semaphore semaphore = new Semaphore(SEMAPHORE_COUNT);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println("save data!!");

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        }

        threadPool.shutdown();
    }
}

Semaphore 建構函式接受一個int型別的引數,表示可用的許可證,通過調研 acquire()獲取一個許可證,用完後通過反覆release() 歸還許可證即可。還可以通過tryAcquire()方法嘗試獲取許可證。

4、執行緒間交換資料的 交換者 Exchanger

Exchanger 是用於執行緒間交換資料的工具類。它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的 資料。

兩個執行緒同步Exchanger交換資料,第一個執行緒如果先執行exchange()方法,它會一直等待第二個執行緒也執行exchange()方法,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料,將本執行緒生產的資料傳遞給對方。

應用場景:

  • 遺傳演算法
  • 校對工作
package cn.zhjw.eurekaclientnode2.common;

import java.util.concurrent.*;

/**
 * @Desc: 執行緒間交換資料
 * @Author: zhaojiwei
 * @Date: 2018/11/1 17:15
 */
public class ThreadSynExchanger {

    /**
     * 交換字串資料
     */
    private static final Exchanger<String> exchanger = new Exchanger<>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                /**
                 * A 錄入銀行流水資料
                 */
                String A = "銀行流水A";
                try {
                    String exchange = exchanger.exchange(A, 1, TimeUnit.MINUTES);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }

            }
        });

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                /**
                 * B 員工錄入的銀行流水資料
                 */
                String B = "銀行流水B";
                try {
                    String A = exchanger.exchange(B, 1, TimeUnit.MINUTES);
                    System.out.println("A和B的錄入資料是否一致:" + A.equals(B) + ",A錄入的是:" + A + ",B 錄入的是:" + B);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

 

參考:《JAVA 併發程式設計的藝術》