1. 程式人生 > >Java中的併發工具類

Java中的併發工具類

前言: JDK1.5中增加了幾個併發工具類,CountDownLatch,CyclicBarrier,Semaphore分別提供了一種併發流程控制的手段,Exchanger則提供了線上程間交換資料的一種手段.

1.等待多執行緒完成的CountDownLatch CountDownLatch允許一個或多個執行緒等待其他執行緒完成操作.例如,應用程式的主執行緒希望在負責啟動框架服務的執行緒已經啟動所有的框架服務之後再執行。

package com.h.concurrent;

import java.util.concurrent.CountDownLatch;

/**
 * CountDownLatch :閉鎖,在完成某些運算時,只有其他所有執行緒的運算全部完成,當前運算才繼續執行
 * 示例:利用閉鎖計算各個執行緒執行完成所需的時間
 * 讓主執行緒等待計算偶數的5個任務執行緒全部執行完後再統計總的執行時間
 */
public class TestCountDownLatch { public static void main(String[] args) { /** * 任務:列印[0,50000]之間的所有偶數,並計算耗費時間 * 實現:開啟5個執行緒,併發執行任務,在main主執行緒中彙總耗費的時間 */ /** * 1.使用join() * join用於讓當前執行執行緒等待join執行緒執行結束,其實現原理是不停檢查join執行緒是否存活, * 如果是則讓當前執行緒永遠等待 * 程式碼核心: * while(isAlive){ * wait(0); * } * 直到join執行緒終止後,執行緒的this.notifyAll()會被呼叫. */
long start = System.currentTimeMillis(); Runnable task = () -> { for (int i = 0; i <= 50000; i++) { if (i % 2 == 0) { System.out.println(i); } } }; Thread thread = null; for (int i = 0
; i < 5; i++) { thread = new Thread(task); thread.start(); try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } long end = System.currentTimeMillis(); System.out.println("耗費時間為:" + (end - start) + "ms"); //================================================================ /** * 2.使用CountDownLatch */ long start = System.currentTimeMillis(); final CountDownLatch latch = new CountDownLatch(5); // LatchDemo lt = new LatchDemo(latch); for (int i = 0; i < 5; i++) { new Thread(lt).start(); } try { //主執行緒必須在啟動其他執行緒後立即呼叫CountDownLatch.await()方法。 //這樣主執行緒的操作就會在這個方法上阻塞,直到其他執行緒完成各自的任務。 latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("耗費時間為:" + (end - start) + "ms"); } } class LatchDemo implements Runnable { private CountDownLatch latch; public LatchDemo(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { for (int i = 0; i <= 50000; i++) { if (i % 2 == 0) { System.out.println(i); } } } finally { latch.countDown(); } } }

CountDownLatch latch = new CountDownLatch(int N); await()會阻塞當前執行緒,直到傳入的計數器N變為0.由於countDown()方法可以用在任何地方,因此這裡說的計數器N,可以是N個執行緒,也可以是1個執行緒裡的N個步驟.用在多個執行緒時,只需要把這個CountDownLatch的引用傳遞到執行緒裡即可,此時,計數器的初始值就為執行緒的數量,每當一個執行緒完成了自己的任務後,計數器的值就會減1,當計數器值到達0時,它表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。如果某個執行緒處理的比較慢,不想讓主執行緒一直等待,可以使用await(long time,TimeUnit unit),這個方法在等待特定時間後,就會不再阻塞當前執行緒.join也有類似的方法.另:CountDownLatch的計數器值一旦初始化就不能再改變,一個執行緒呼叫countDown()方法happens-before另外一個執行緒呼叫await()方法. 在這裡插入圖片描述

應用場景:

  • 實現最大的並行性:有時我們想同時啟動多個執行緒,實現最大程度的並行性。例如,我們想測試一個單例類。如果我們建立一個初始計數為1的CountDownLatch,並讓所有執行緒都在這個鎖上等待,那麼我們可以很輕鬆地完成測試。我們只需呼叫 一次countDown()方法就可以讓所有的等待執行緒同時恢復執行。
  • 開始執行前等待n個執行緒完成各自任務:例如應用程式啟動類要確保在處理使用者請求前,所有N個外部系統已經啟動和運行了。

2.同步屏障CyclicBarrier CyclicBarrier:可迴圈使用的屏障,它讓一組執行緒到達一個屏障(同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行. 構造方法 (1)CyclicBarrier cb = new CyclicBarrier(int parties),parties表示屏障攔截的執行緒數量,每個執行緒呼叫await()方法告訴CyclicBarrier我已到達了同步點,然後當前執行緒被阻塞. (2)CyclicBarrier cb = new CyclicBarrier(int parties,Runnable barrierAction)用於在所有執行緒到達屏障時,優先執行barrierAction,方便處理複雜的場景啊. 在這裡插入圖片描述

package com.h.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * Created by John on 2018/5/12.
 */
public class TestCyclicBarrier {
    public static void main(String[] args) {
        final CyclicBarrier barrier = new CyclicBarrier(2, () -> System.out.println("Thread:" + Thread.currentThread().getName()+ " " +3));
	 
        new Thread(() -> {
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("Thread:" + Thread.currentThread().getName()+ " " + 1);
        },"Child-Thread").start();

        try {
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }

        System.out.println("Thread:" + Thread.currentThread().getName()+ " " + 2);
    }
}
列印結果:
Thread:Child-Thread 3
Thread:Child-Thread 1
Thread:main 2
package com.h.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * Created by John on 2018/5/12.
 */
public class TestCyclicBarrier {

   private static final CyclicBarrier barrier = new CyclicBarrier(2, () -> System.out.println("Thread:" + Thread.currentThread().getName()+ " " +3));

    public static void main(String[] args) {
        new Thread(new BarrierTask(barrier,1),"Child-Thread").start();
        new Thread(new BarrierTask(barrier,2),"Child-Thread").start();
    }
}

class BarrierTask implements Runnable{
    private CyclicBarrier barrier;
    private int value;

    public BarrierTask(CyclicBarrier barrier,int value) {
        this.barrier = barrier;
        this.value = value;
    }

    @Override
    public void run() {
        try {
            barrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("Thread:" + Thread.currentThread().getName()+ " " + value);
    }
}
列印結果有2種情況:
Thread:Child-Thread 3
Thread:Child-Thread 1
Thread:Child-Thread 2:
Thread:Child-Thread 3
Thread:Child-Thread 2
Thread:Child-Thread 1
不論哪種結果,3一定是第一個被打印出來的

應用場景 CyclicBarrier可以用於多執行緒計算資料,最後合併計算結果的場景.

package com.h.concurrent;

import java.util.OptionalDouble;
import java.util.concurrent.*;

/**
 * Created by John on 2018/5/12.
 * 需求:在[1,500],[501,1000],[1001,1500],[1501,2000]的四個區間內分別產生50個隨機整數,計算出產生的所有隨機整數的平均值
 * 開啟四個字執行緒並行產生每個區間內的隨機整數,並計算總和,最後在統計整體平均值
 */
public class TestCyclicBarrier {

    public static void main(String[] args) {
        BarrierTask barrierTask = new BarrierTask();
        barrierTask.count();
    }
}

class BarrierTask implements Runnable {
    /**
     * 建立4個屏障,處理完之後執行當前類的run方法
     */
    private CyclicBarrier barrier = new CyclicBarrier(4, this);

    private ExecutorService executor = Executors.newFixedThreadPool(4);

    /**
     * 儲存每個執行緒計算的隨機數的和
     */
    private ConcurrentHashMap<String, Integer> sunCountMap = new ConcurrentHashMap<>(4);

    public void count() {
        for (int i = 0; i < 4; i++) {
            final int index = i + 1;
            executor.execute(() -> {
                sunCountMap.put(Thread.currentThread().getName(), getRandomSum((index - 1) * 500 + 1, index * 500));
                try {
                    System.out.println("當前等待執行計算的執行緒數量:" + barrier.getNumberWaiting());
                    //計算完成,插入一個屏障
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }

    @Override
    public void run() {
        OptionalDouble average = sunCountMap.values().stream().mapToDouble(n -> n).average();
        System.out.println(average.getAsDouble());
    }

    private int getRandomSum(int start, int end) {
        int sum = 0;
        for (int i = 0; i < 50; i++) {
            sum += ThreadLocalRandom.current().nextInt(start, end);
        }
        return sum;
    }
}
列印結果:
當前等待執行計算的執行緒數量:0
當前等待執行計算的執行緒數量:0
當前等待執行計算的執行緒數量:1
當前等待執行計算的執行緒數量:1
50740.75

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置,所以CyclicBarrier能處理更為複雜的場景.例如:如果計算髮生錯誤,可以重置計數器,讓執行緒重新執行一次.CyclicBarrier還提供其他有用的方法,如getNUmberWaiting方法可以獲得CyclicBarrier阻塞的執行緒數量;isBroken()方法用來了解阻塞的執行緒是否被中斷.