1. 程式人生 > >Java粗淺認識-併發程式設計(四)-執行緒間通訊

Java粗淺認識-併發程式設計(四)-執行緒間通訊

執行緒間通訊

 執行緒間通訊,就是對同進程類共享資源的安全訪問,Java中通過AQSjava.util.concurrent.locks.AbstractQueuedSynchronizer)同步器來實現資源安全訪問,常見基礎工具型別,java.util.concurrent.CountDownLatch(java1.5)、java.util.concurrent.Semaphore(java1.5)、java.util.concurrent.CyclicBarrier(java1.5)、java.util.concurrent.Phaser(java7),關於AQS的講解,後續在原始碼解讀的篇章中會講到。

java.util.concurrent.CountDownLatch

需要注意的是,如果執行次數小於了count,會一直阻塞

    /**
     * countDownLatch控制
     * @throws InterruptedException
     */
    public static void countDownLatch() throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        int size = 10;
        CountDownLatch countDownLatch = new CountDownLatch(size);
        for(int i = 0;i<size;i++){
            executorService.execute(new CountDownLatchTask(countDownLatch));
        }
        countDownLatch.await();
        System.out.println("執行完畢");
        executorService.shutdown();
    }

    private static class CountDownLatchTask implements Runnable {
        private CountDownLatch latch;

        public CountDownLatchTask(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "執行任務。");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
        }
    }

java.util.concurrent.Semaphore

同時permits個執行緒執行,實力模擬同時兩個任務執行。

    /**
     * 訊號量限流
     */
    private static void semaphore() {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        int permits = 2;
        Semaphore semaphore = new Semaphore(permits);
        for (int i = 0; i < 10; i++) {
            executorService.execute(new SemaphoreTask(semaphore));
        }
        executorService.shutdown();
    }

    private static class SemaphoreTask implements Runnable {
        Semaphore semaphore;

        public SemaphoreTask(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            //獲取一個訊號
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "執行任務.");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //執行完後釋放
            semaphore.release();
        }
    }

java.util.concurrent.CyclicBarrier

可迴圈使用的屏障,可以理解為可以迴圈使用的計數器,達到parties後繼續往下執行。

/**
     * 需要注意,線上程池中每個執行緒執行完自己的任務後就await()
     * 在一種情況下會出現阻塞,當parties 大於執行緒池最大執行緒數時,所有的執行緒都被阻塞
     * 比如這裡的parties = 5,FixedThreadPool.nThreads=5,如果parties>FixedThreadPool.nThreads會一直阻塞
     * 其實還是容易想到,可執行執行緒都在等待剩餘任務。
     */
    private static void cyclicBarrier() {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{
            System.out.println("任務執行完畢");
        });
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            executorService.execute(new CyclicBarrierTask(cyclicBarrier));
        }
        executorService.shutdown();
    }

    private static class CyclicBarrierTask implements Runnable{
        private CyclicBarrier barrier;
        CyclicBarrierTask(CyclicBarrier barrier){
            this.barrier = barrier;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"執行任務。");
                TimeUnit.SECONDS.sleep(3);
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

 

java.util.concurrent.Phaser(java7)

A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage.

可重用的同步屏障,和CyclicBarrier and CountDownLatch功能相似,但支援更多的操作。

模擬,所有人玩一個遊戲,所有人都要完成n階段的任務,每個階段完成後,都有獎勵。

/**
     * 玩一個遊戲,所有人都要完成n階段的任務,每個階段完成後,都有獎勵
     */
    private static void phaser(final int n) {
        Phaser phaser = new Phaser(){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println(Thread.currentThread().getName()+"===============完成"+phase+"階段的事務,派發禮物。");
                return phase >= n || registeredParties == 0;
            }
        };
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 4; i++) {
            executorService.execute(new PhaserTask(phaser));
        }
        executorService.shutdown();
    }

    private static class PhaserTask implements Runnable{
        Phaser phaser;
        public PhaserTask(Phaser phaser){
            this.phaser = phaser;
        }
        @Override
        public void run() {
            phaser.register();
            Random random = new Random();
            do {
                System.out.println(Thread.currentThread().getName() + "進入"+phaser.getPhase()+"階段。");
                try {
                    int elapsed = random.nextInt(10);
                    TimeUnit.SECONDS.sleep(elapsed);
                    System.out.println(Thread.currentThread().getName() + "耗時"+elapsed+"s.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //進入下一個階段
                phaser.arriveAndAwaitAdvance();
            }while(!phaser.isTerminated());

            //執行完畢
            phaser.arriveAndDeregister();
        }
    }

總結

這章節講了4中同步器,java.util.concurrent.CountDownLatch、java.util.concurrent.Semaphore、java.util.concurrent.CyclicBarrier、java.util.concurrent.Phaser,需要自己寫例項思考才能靈活運用,下一章節,執行緒池使用。