1. 程式人生 > >Java併發工具類

Java併發工具類

目錄

在JDK的併發包(java.util.concurrent)裡提供了幾個非常有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段,Exchanger工具類則提供了線上程間交換資料的一種手段。

1.等待多執行緒完成的CountDownLatch

CountDownLatch允許一個或多個執行緒等待其他執行緒完成操作,類似於join方法。join的實現原理是不停的檢查join執行緒是否存活,如果join執行緒存活則讓當前執行緒永遠等待。CountDownLatch經常用於監聽某些初始化操作,等初始化執行完畢後,通知主執行緒繼續工作。

CountDownLatch的建構函式接收一個int型別的引數作為計數器,如果你想等待N個點完成,這裡就傳入N。當呼叫CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前執行緒,直到N變成零。由於CountDownLatch的countDown方法可以用在任何地方,所以這裡說的N個點,可以是N個執行緒,也可以是1個執行緒裡的N個執行步驟。當然,如果一個執行緒執行很慢的話,我們不可能讓主執行緒一直等待下去,所以可以使用另外一個帶有指定時間的await方法,await(long time,TimeUnit unit)這個方法等待特定時間後,就會不再阻塞當前執行緒。

實現的例項程式碼如下:

   public class CountDownLatchTest {
    static CountDownLatch countDownLatch = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(1);
                countDownLatch.countDown();
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(2);
                countDownLatch.countDown();
            }
        }).start();
        countDownLatch.await();
        System.out.println("thread finished");
    }
 }

2.同步屏障CyclicBarrier

CylicBarrier讓一組執行緒到達一個屏障(同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。假設每個執行緒代表一個跑步運動員,當運動員都準備好以後,才一起出發,只要有一個人沒有準備好,大家都等待。CylicBarrier預設的構造方法是CylicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CylicBarrier已經到達屏障,然後當前執行緒被阻塞。

例項程式碼如下:

 public class CyclicBarrierTest {
    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(1);
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(2);
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println("thread finished");
    }
}

如果把建構函式更改為3,則主執行緒和子執行緒永遠都不會得到執行,因為沒有第3個執行緒執行await方法,即沒有第3個執行緒到達屏障,所以之前到達屏障的兩個執行緒永遠不會繼續執行。

CyclicBarrier還提供了一個更加高階的建構函式CyclicBarrier(int parties,Runnable barrierAction),用於線上程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。CylicBarrier可以用於計算資料,最後合併計算結果的場景。例項程式碼:

public class CyclicBarrierTest implements Runnable {
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(4, this);
    private Executor executor = Executors.newFixedThreadPool(4);
    private ConcurrentMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();

    private void calculate() {
        for (int i = 0; i < 4; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    concurrentMap.put(Thread.currentThread().getName(), 1);
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrierTest cyclicBarrierTest = new CyclicBarrierTest();
        cyclicBarrierTest.calculate();
        Thread.sleep(1000);
        System.out.println(cyclicBarrierTest.concurrentMap.get("result"));
        System.out.println(cyclicBarrierTest.cyclicBarrier.getNumberWaiting());
        System.out.println(cyclicBarrierTest.cyclicBarrier.isBroken());
    }

    @Override
    public void run() {
        Integer result = 0;
        for (Map.Entry<String, Integer> entry : concurrentMap.entrySet()) {
            result += entry.getValue();
        }
        concurrentMap.put("result", result);
    }
}

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset方法重置。所以CyclicBarrier可以處理更加複雜的業務場景。例如,如果計算髮生錯誤,可以重置計數器,並讓執行緒重新執行一次。

3.控制併發執行緒數的Semaphore

Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒以保證合理的使用公共資源。Semaphore可以用於做流量控制,特別是公共資源有限的應用場景,比如資料庫的連線。

例項程式碼:

   public class SemaphoreTest {
    private Executor executors = Executors.newFixedThreadPool(20);
    private Semaphore semaphore = new Semaphore(5);

    private void linkDataBase() {
        for (int i = 0; i < 20; i++) {
            executors.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("link data base");
                    semaphore.release();
                }
            });
        }
    }

    public static void main(String[] args) {
        SemaphoreTest semaphoreTest = new SemaphoreTest();
        semaphoreTest.linkDataBase();
    }
}

雖然有20個執行緒在執行,但是隻允許5個併發執行。Semaphore的用法非常的簡答,首先執行緒使用Semaphore的acquire()方法獲得一個許可證,使用完之後呼叫release()方法歸還許可證,還可以使用tryAcquire()方法嘗試獲取許可證。Semaphore還提供了一些其他的方法:

4.執行緒間交換資料的Exchanger

Exchanger是一個用於執行緒間協作的工具類。Exchanger用於進行執行緒間的資料交換。它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchanger()方法交換資料,如果一個執行緒先執行exchanger()方法交換資料,它會一直等待第二個執行緒也執行exchanger方法,當兩個執行緒到達同步點的時候,兩個執行緒可以將本執行緒生產出來的資料傳遞給對方,交換資料。

public class ExchangerTest {
    private Executor executor = Executors.newFixedThreadPool(2);
    private Exchanger exchanger = new Exchanger();

    private void exchangeData() {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                String A = "A";
                try {
                    String B = (String) exchanger.exchange(A);
                    System.out.println("B執行緒的值:" + B);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        executor.execute(new Runnable() {
            @Override
            public void run() {
                String B = "B";
                try {
                    String A = (String) exchanger.exchange(B);
                    System.out.println("A執行緒的值:" + A);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    public static void main(String[] args) {
        ExchangerTest exchangerTest = new ExchangerTest();
        exchangerTest.exchangeData();
    }
}

如果兩個執行緒有一個沒有執行exchange()方法,則會一直等待,為了避免一直等待,可以使用exchange(V x,long timeout,TimeUnit unit)設定最大等待時長。