1. 程式人生 > >25.大白話說java並發工具類-CountDownLatch,CyclicBarrier,Semaphore,Exchanger

25.大白話說java並發工具類-CountDownLatch,CyclicBarrier,Semaphore,Exchanger

java rac second service join方法 -o -s exc tor

1. 倒計時器CountDownLatch

在多線程協作完成業務功能時,有時候需要等待其他多個線程完成任務之後,主線程才能繼續往下執行業務功能,在這種的業務場景下,通常可以使用Thread類的join方法,讓主線程等待被join的線程執行完之後,主線程才能繼續往下執行。當然,使用線程間消息通信機制也可以完成。其實,java並發工具類中為我們提供了類似“倒計時”這樣的工具類,可以十分方便的完成所說的這種業務場景。

為了能夠理解CountDownLatch,舉一個很通俗的例子,運動員進行跑步比賽時,假設有6個運動員參與比賽,裁判員在終點會為這6個運動員分別計時,可以想象每當一個運動員到達終點的時候,對於裁判員來說就少了一個計時任務。直到所有運動員都到達終點了,裁判員的任務也才完成。這6個運動員可以類比成6個線程,當線程調用CountDownLatch.countDown方法時就會對計數器的值減一,直到計數器的值為0的時候,裁判員(調用await方法的線程)才能繼續往下執行。

下面來看些CountDownLatch的一些重要方法。

先從CountDownLatch的構造方法看起:

public CountDownLatch(int count)

構造方法會傳入一個整型數N,之後調用CountDownLatch的countDown方法會對N減一,直到N減到0的時候,當前調用await方法的線程繼續執行。

CountDownLatch的方法不是很多,將它們一個個列舉出來:

  1. await() throws InterruptedException:調用該方法的線程等到構造方法傳入的N減到0的時候,才能繼續往下執行;

  2. await(long timeout, TimeUnit unit):與上面的await方法功能一致,只不過這裏有了時間限制,調用該方法的線程等到指定的timeout時間後,不管N是否減至為0,都會繼續往下執行;

  3. countDown():使CountDownLatch初始值N減1;

  4. long getCount():獲取當前CountDownLatch維護的值;

下面用一個具體的例子來說明CountDownLatch的具體用法:

public class CountDownLatchDemo {
private static CountDownLatch startSignal = new CountDownLatch(1);
//用來表示裁判員需要維護的是6個運動員
private static CountDownLatch endSignal = new CountDownLatch(6);
?
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 運動員等待裁判員響哨!!!");
startSignal.await();
System.out.println(Thread.currentThread().getName() + "正在全力沖刺");
endSignal.countDown();
System.out.println(Thread.currentThread().getName() + " 到達終點");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Thread.sleep(10);
System.out.println("裁判員發號施令啦!!!");
startSignal.countDown();
endSignal.await();
System.out.println("所有運動員到達終點,比賽結束!");
executorService.shutdown();
}
}
輸出結果:
?
pool-1-thread-2 運動員等待裁判員響哨!!!
pool-1-thread-3 運動員等待裁判員響哨!!!
pool-1-thread-1 運動員等待裁判員響哨!!!
pool-1-thread-4 運動員等待裁判員響哨!!!
pool-1-thread-5 運動員等待裁判員響哨!!!
pool-1-thread-6 運動員等待裁判員響哨!!!
裁判員發號施令啦!!!
pool-1-thread-2正在全力沖刺
pool-1-thread-2 到達終點
pool-1-thread-3正在全力沖刺
pool-1-thread-3 到達終點
pool-1-thread-1正在全力沖刺
pool-1-thread-1 到達終點
pool-1-thread-4正在全力沖刺
pool-1-thread-4 到達終點
pool-1-thread-5正在全力沖刺
pool-1-thread-5 到達終點
pool-1-thread-6正在全力沖刺
pool-1-thread-6 到達終點
所有運動員到達終點,比賽結束!

該示例代碼中設置了兩個CountDownLatch,第一個endSignal用於控制讓main線程(裁判員)必須等到其他線程(運動員)讓CountDownLatch維護的數值N減到0為止。另一個startSignal用於讓main線程對其他線程進行“發號施令”,startSignal引用的CountDownLatch初始值為1,而其他線程執行的run方法中都會先通過 startSignal.await()讓這些線程都被阻塞,直到main線程通過調用startSignal.countDown();,將值N減1,CountDownLatch維護的數值N為0後,其他線程才能往下執行,並且,每個線程執行的run方法中都會通過endSignal.countDown();endSignal維護的數值進行減一,由於往線程池提交了6個任務,會被減6次,所以endSignal維護的值最終會變為0,因此main線程在latch.await();阻塞結束,才能繼續往下執行。

另外,需要註意的是,當調用CountDownLatch的countDown方法時,當前線程是不會被阻塞,會繼續往下執行,比如在該例中會繼續輸出pool-1-thread-4 到達終點

2. 循環柵欄:CyclicBarrier

CyclicBarrier也是一種多線程並發控制的實用工具,和CountDownLatch一樣具有等待計數的功能,但是相比於CountDownLatch功能更加強大。

為了理解CyclicBarrier,這裏舉一個通俗的例子。開運動會時,會有跑步這一項運動,我們來模擬下運動員入場時的情況,假設有6條跑道,在比賽開始時,就需要6個運動員在比賽開始的時候都站在起點了,裁判員吹哨後才能開始跑步。跑道起點就相當於“barrier”,是臨界點,而這6個運動員就類比成線程的話,就是這6個線程都必須到達指定點了,意味著湊齊了一波,然後才能繼續執行,否則每個線程都得阻塞等待,直至湊齊一波即可。cyclic是循環的意思,也就是說CyclicBarrier當多個線程湊齊了一波之後,仍然有效,可以繼續湊齊下一波。CyclicBarrier的執行示意圖如下:

技術分享圖片

當多個線程都達到了指定點後,才能繼續往下繼續執行。這就有點像報數的感覺,假設6個線程就相當於6個運動員,到賽道起點時會報數進行統計,如果剛好是6的話,這一波就湊齊了,才能往下執行。CyclicBarrier在使用一次後,下面依然有效,可以繼續當做計數器使用,這是與CountDownLatch的區別之一。這裏的6個線程,也就是計數器的初始值6,是通過CyclicBarrier的構造方法傳入的。

下面來看下CyclicBarrier的主要方法:

//等到所有的線程都到達指定的臨界點
await() throws InterruptedException, BrokenBarrierException
?
//與上面的await方法功能基本一致,只不過這裏有超時限制,阻塞等待直至到達超時時間為止
await(long timeout, TimeUnit unit) throws InterruptedException,
BrokenBarrierException, TimeoutException
?
//獲取當前有多少個線程阻塞等待在臨界點上
int getNumberWaiting()
?
//用於查詢阻塞等待的線程是否被中斷
boolean isBroken()
//將屏障重置為初始狀態。如果當前有線程正在臨界點等待的話,將拋出BrokenBarrierException。
void reset()

另外需要註意的是,CyclicBarrier提供了這樣的構造方法:

public CyclicBarrier(int parties, Runnable barrierAction)

可以用來,當指定的線程都到達了指定的臨界點的時,接下來執行的操作可以由barrierAction傳入即可。

一個例子

下面用一個簡單的例子,來看下CyclicBarrier的用法,我們來模擬下上面的運動員的例子。

public class CyclicBarrierDemo {
//指定必須有6個運動員到達才行
private static CyclicBarrier barrier = new CyclicBarrier(6, () -> {
System.out.println("所有運動員入場,裁判員一聲令下!!!!!");
});
public static void main(String[] args) {
System.out.println("運動員準備進場,全場歡呼............");
?
ExecutorService service = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
service.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 運動員,進場");
barrier.await();
System.out.println(Thread.currentThread().getName() + " 運動員出發");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
?
}
?
輸出結果:
運動員準備進場,全場歡呼............
pool-1-thread-2 運動員,進場
pool-1-thread-1 運動員,進場
pool-1-thread-3 運動員,進場
pool-1-thread-4 運動員,進場
pool-1-thread-5 運動員,進場
pool-1-thread-6 運動員,進場
所有運動員入場,裁判員一聲令下!!!!!
pool-1-thread-6 運動員出發
pool-1-thread-1 運動員出發
pool-1-thread-5 運動員出發
pool-1-thread-4 運動員出發
pool-1-thread-3 運動員出發
pool-1-thread-2 運動員出發

從輸出結果可以看出,當6個運動員(線程)都到達了指定的臨界點(barrier)時候,才能繼續往下執行,否則,則會阻塞等待在調用await()

3. CountDownLatch與CyclicBarrier的比較

CountDownLatch與CyclicBarrier都是用於控制並發的工具類,都可以理解成維護的就是一個計數器,但是這兩者還是各有不同側重點的:

  1. CountDownLatch一般用於某個線程A等待若幹個其他線程執行完任務之後,它才執行;而CyclicBarrier一般用於一組線程互相等待至某個狀態,然後這一組線程再同時執行;CountDownLatch強調一個線程等多個線程完成某件事情。CyclicBarrier是多個線程互相等待,等大家都完成,再攜手共進。

  2. 調用CountDownLatch的countDown方法後,當前線程並不會阻塞,會繼續往下執行;而調用CyclicBarrier的await方法,會阻塞當前線程,直到CyclicBarrier指定的線程全部都到達了指定點的時候,才能繼續往下執行;

  3. CountDownLatch方法比較少,操作比較簡單,而CyclicBarrier提供的方法更多,比如能夠通過getNumberWaiting(),isBroken()這些方法獲取當前多個線程的狀態,並且CyclicBarrier的構造方法可以傳入barrierAction,指定當所有線程都到達時執行的業務功能;

  4. CountDownLatch是不能復用的,而CyclicLatch是可以復用的。

  5. 1. 控制資源並發訪問--Semaphore

    Semaphore可以理解為信號量,用於控制資源能夠被並發訪問的線程數量,以保證多個線程能夠合理的使用特定資源。Semaphore就相當於一個許可證,線程需要先通過acquire方法獲取該許可證,該線程才能繼續往下執行,否則只能在該方法處阻塞等待。當執行完業務功能後,需要通過release()方法將許可證歸還,以便其他線程能夠獲得許可證繼續執行。

    Semaphore可以用於做流量控制,特別是公共資源有限的應用場景,比如數據庫連接。假如有多個線程讀取數據後,需要將數據保存在數據庫中,而可用的最大數據庫連接只有10個,這時候就需要使用Semaphore來控制能夠並發訪問到數據庫連接資源的線程個數最多只有10個。在限制資源使用的應用場景下,Semaphore是特別合適的。

    下面來看下Semaphore的主要方法:

    //獲取許可,如果無法獲取到,則阻塞等待直至能夠獲取為止
    void acquire() throws InterruptedException
    ?
    //同acquire方法功能基本一樣,只不過該方法可以一次獲取多個許可
    void acquire(int permits) throws InterruptedException
    ?
    //釋放許可
    void release()
    ?
    //釋放指定個數的許可
    void release(int permits)
    ?
    //嘗試獲取許可,如果能夠獲取成功則立即返回true,否則,則返回false
    boolean tryAcquire()
    ?
    //與tryAcquire方法一致,只不過這裏可以指定獲取多個許可
    boolean tryAcquire(int permits)
    ?
    //嘗試獲取許可,如果能夠立即獲取到或者在指定時間內能夠獲取到,則返回true,否則返回false
    boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
    ?
    //與上一個方法一致,只不過這裏能夠獲取多個許可
    boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    //返回當前可用的許可證個數
    int availablePermits()
    ?
    //返回正在等待獲取許可證的線程數
    int getQueueLength()
    ?
    //是否有線程正在等待獲取許可證
    boolean hasQueuedThreads()
    ?
    //獲取所有正在等待許可的線程集合
    Collection<Thread> getQueuedThreads()

    另外,在Semaphore的構造方法中還支持指定是否具有公平性,默認的是非公平性,這樣也是為了保證吞吐量。

    一個例子

    下面用一個簡單的例子來說明Semaphore的具體使用。我們來模擬這樣一樣場景。有一天,班主任需要班上10個同學到講臺上來填寫一個表格,但是老師只準備了5支筆,因此,只能保證同時只有5個同學能夠拿到筆並填寫表格,沒有獲取到筆的同學只能夠等前面的同學用完之後,才能拿到筆去填寫表格。該示例代碼如下:

    public class SemaphoreDemo {
    ?
    //表示老師只有5支筆
    private static Semaphore semaphore = new Semaphore(5);
    ?
    public static void main(String[] args) {
    ?
    //表示10個學生
    ExecutorService service = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 10; i++) {
    service.execute(() -> {
    try {
    System.out.println(Thread.currentThread().getName() + " 同學準備獲取筆......");
    semaphore.acquire();
    System.out.println(Thread.currentThread().getName() + " 同學獲取到筆");
    System.out.println(Thread.currentThread().getName() + " 填寫表格ing.....");
    TimeUnit.SECONDS.sleep(3);
    semaphore.release();
    System.out.println(Thread.currentThread().getName() + " 填寫完表格,歸還了筆!!!!!!");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }
    service.shutdown();
    }
    ?
    }
    輸出結果:
    ?
    pool-1-thread-1 同學準備獲取筆......
    pool-1-thread-1 同學獲取到筆
    pool-1-thread-1 填寫表格ing.....
    pool-1-thread-2 同學準備獲取筆......
    pool-1-thread-2 同學獲取到筆
    pool-1-thread-2 填寫表格ing.....
    pool-1-thread-3 同學準備獲取筆......
    pool-1-thread-4 同學準備獲取筆......
    pool-1-thread-3 同學獲取到筆
    pool-1-thread-4 同學獲取到筆
    pool-1-thread-4 填寫表格ing.....
    pool-1-thread-3 填寫表格ing.....
    pool-1-thread-5 同學準備獲取筆......
    pool-1-thread-5 同學獲取到筆
    pool-1-thread-5 填寫表格ing.....
    pool-1-thread-6  同學準備獲取筆......
    pool-1-thread-7 同學準備獲取筆......
    pool-1-thread-8 同學準備獲取筆......
    pool-1-thread-9 同學準備獲取筆......
    pool-1-thread-10 同學準備獲取筆......
    pool-1-thread-4  填寫完表格,歸還了筆!!!!!!
    pool-1-thread-9 同學獲取到筆
    pool-1-thread-9 填寫表格ing.....
    pool-1-thread-5 填寫完表格,歸還了筆!!!!!!
    pool-1-thread-7 同學獲取到筆
    pool-1-thread-7 填寫表格ing.....
    pool-1-thread-8 同學獲取到筆
    pool-1-thread-8 填寫表格ing.....
    pool-1-thread-1 填寫完表格,歸還了筆!!!!!!
    pool-1-thread-6 同學獲取到筆
    pool-1-thread-6 填寫表格ing.....
    pool-1-thread-3 填寫完表格,歸還了筆!!!!!!
    pool-1-thread-2 填寫完表格,歸還了筆!!!!!!
    pool-1-thread-10 同學獲取到筆
    pool-1-thread-10 填寫表格ing.....
    pool-1-thread-7 填寫完表格,歸還了筆!!!!!!
    pool-1-thread-9 填寫完表格,歸還了筆!!!!!!
    pool-1-thread-8 填寫完表格,歸還了筆!!!!!!
    pool-1-thread-6 填寫完表格,歸還了筆!!!!!!
    pool-1-thread-10 填寫完表格,歸還了筆!!!!!!

    根據輸出結果進行分析,Semaphore允許的最大許可數為5,也就是允許的最大並發執行的線程個數為5,可以看出,前5個線程(前5個學生)先獲取到筆,然後填寫表格,而6-10這5個線程,由於獲取不到許可,只能阻塞等待。當線程pool-1-thread-5釋放了許可之後,pool-6-thread-10`就可以獲取到許可,繼續往下執行。對其他線程的執行過程,也是同樣的道理。從這個例子就可以看出,Semaphore用來做特殊資源的並發訪問控制是相當合適的,如果有業務場景需要進行流量控制,可以優先考慮Semaphore。

    2.線程間交換數據的工具--Exchanger

    Exchanger是一個用於線程間協作的工具類,用於兩個線程間能夠交換。它提供了一個交換的同步點,在這個同步點兩個線程能夠交換數據。具體交換數據是通過exchange方法來實現的,如果一個線程先執行exchange方法,那麽它會同步等待另一個線程也執行exchange方法,這個時候兩個線程就都達到了同步點,兩個線程就可以交換數據。

    Exchanger除了一個無參的構造方法外,主要方法也很簡單:

    //當一個線程執行該方法的時候,會等待另一個線程也執行該方法,因此兩個線程就都達到了同步點
    //將數據交換給另一個線程,同時返回獲取的數據
    V exchange(V x) throws InterruptedException
    ?
    //同上一個方法功能基本一樣,只不過這個方法同步等待的時候,增加了超時時間
    V exchange(V x, long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException

    一個例子

    Exchanger理解起來很容易,這裏用一個簡單的例子來看下它的具體使用。我們來模擬這樣一個情景,在青春洋溢的中學時代,下課期間,男生經常會給走廊裏為自己喜歡的女孩子送情書,相信大家都做過這樣的事情吧 :)。男孩會先到女孩教室門口,然後等女孩出來,教室那裏就是一個同步點,然後彼此交換信物,也就是彼此交換了數據。現在,就來模擬這個情景。

    public class ExchangerDemo {
    private static Exchanger<String> exchanger = new Exchanger();
    ?
    public static void main(String[] args) {
    ?
    //代表男生和女生
    ExecutorService service = Executors.newFixedThreadPool(2);
    ?
    service.execute(() -> {
    try {
    //男生對女生說的話
    String girl = exchanger.exchange("我其實暗戀你很久了......");
    System.out.println("女孩兒說:" + girl);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    service.execute(() -> {
    try {
    System.out.println("女生慢慢的從教室裏走出來......");
    TimeUnit.SECONDS.sleep(3);
    //女生對男生說的話
    String boy = exchanger.exchange("我也很喜歡你......");
    System.out.println("男孩兒說:" + boy);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    ?
    }
    }
    ?
    輸出結果:
    ?
    女生慢慢的從教室你走出來......
    男孩兒說:我其實暗戀你很久了......
    女孩兒說:我也很喜歡你......

    這個例子很簡單,也很能說明Exchanger的基本使用。當兩個線程都到達調用exchange方法的同步點的時候,兩個線程就能交換彼此的數據。

25.大白話說java並發工具類-CountDownLatch,CyclicBarrier,Semaphore,Exchanger