1. 程式人生 > >Java 併發工具類 CountDownLatch、CyclicBarrier、Semaphore、Exchanger

Java 併發工具類 CountDownLatch、CyclicBarrier、Semaphore、Exchanger


> 本文部分摘自《Java 併發程式設計的藝術》
## CountDownLatch CountDownLatch 允許一個或多個執行緒等待其他執行緒完成操作。假設現有一個需求:我們需要解析一個 Excel 裡多個 sheet 的資料,此時可以考慮使用多執行緒,每個執行緒解析一個 sheet 的資料,等到所有的 sheet 都解析完之後,程式需要提示解析完成。在這個需求中,要實現主執行緒等待所有執行緒完成 sheet 的解析操作,最簡單的做法就是使用 join() 方法 ```java public class JoinCountDownLatchTest { public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser2 finish"); } }); Thread parser2 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser2 finish"); } }); parser1.start(); parser2.start(); parser1.join(); parser2.join(); System.out.println("all parser finish"); } } ``` 在 JDK5 之後的併發包中提供的 CountDownLatch 也可以實現 join 的功能,並且比 join 的功能更多 ```java public class CountDownLatchTest { // CountDown 的建構函式接收一個 int 型別的引數作為計數器 // 假設想等待 N 個點完成,就傳入 N static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { System.out.println(1); // 每當呼叫 countDown 方法時,N 就會減一 c.countDown(); System.out.println(2); c.countDown(); } }).start(); // await 會阻塞當前執行緒,直到 N 變成零 c.await(); System.out.println(3); } } ```
## CyclicBarrier CyclicBarrier 可以讓一組執行緒到達一個屏障(同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會放行,所有被屏障攔截的執行緒才會繼續執行 ```java public class CyclicBarrierTest { // 傳入引數為2 static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { //呼叫await方法,計數減一,並阻塞,直到計數為零才放行 c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(1); } }).start(); try { c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(2); } } ``` 如果把 new CyclicBarrier(2) 修改為 new CyclicBarrier(3),則主執行緒和子執行緒會永遠等待,因為沒有第三個執行緒執行 await 方法 CyclicBarrier 還提供一個更高階的建構函式 CyclicBarrier(int parties, Runnable barrierAction),用於線上程到達屏障時,優先執行 barrierAction 方法 ```java public class CyclicBarrierTest2 { static CyclicBarrier c = new CyclicBarrier(2, new A()); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); } static class A implements Runnable { @Override public void run() { System.out.println(3); } } } ``` 最終輸出結果一定先是 3 開頭
## Semaphore Semaphore(訊號量)用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源 #### 1. 應用場景 Semaphore 可以用於做流量控制,特別是公用資源有限的應用場景,比如資料庫連線 ```java public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } } ``` 程式碼中,雖然有 30 個執行緒執行,但只允許 10 個執行緒併發執行。Semaphore 的構造方法 Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。Semaphore 的用法也很簡單,首先執行緒使用 Semaphore 的 acquire() 方法獲取一個許可證,使用完之後呼叫 release() 方法歸還即可,還可以使用 tryAcquire() 方法嘗試獲取許可證
## Exchanger Exchanger(交換者)是一個用於執行緒間協作的工具類,用於執行緒間的資料交換。它提供一個同步點,在這個同步點,兩個執行緒可以彼此交換資料。這兩個執行緒通過 exchange 方法交換資料,如果第一個執行緒先執行 exchange 方法,它會一直等待第二個執行緒執行 exchange 方法,當兩個執行緒都到達同步點時,兩個執行緒就可以交換資料了 假設現在有一個需求:我們需要將紙質銀行流水通過人工的方式錄入電子銀行流水,為了避免錯誤,採用 AB 崗兩人進行錄入,錄入完成後,系統需載入這兩人錄入的資料進行比較,看看是否錄入一致 ```java public class ExchangerTest { private static final Exchanger exchanger = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @Override public void run() { try { // A錄入銀行流水資料 String A = "銀行流水A"; exchanger.exchange(A); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.execute(new Runnable() { @Override public void run() { try { // B錄入銀行流水資料 String B = "銀行流水B"; String A = exchanger.exchange(B); System.out.println("A 和 B 資料是否一致:" + A.equals(B) + ", A 錄入的是:" + A + ", B 錄入的是:" + B); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.shutdown(); } } ``