1. 程式人生 > >Java併發——Java中的併發類工具

Java併發——Java中的併發類工具

在JDK的併發包裡面提供了幾個非常有用的工具類:CountDwonLatch、CyclicBarrier、Semaphore、Exchanger。

其中CountDwonLatch、CyclicBarrier、Semaphore工具類提供了一種併發流程控制的手段,Exchanger提供了一種線上程間交換資料的手段。

一、四種併發工具類

1、CountDwonLatch(閉鎖)

CountDwonLatch用來等待一個或者多個執行緒完成操作,作用類似於當前執行緒裡呼叫join()方法,讓當前執行緒等待join()進來的執行緒執行完畢再執行當前執行緒剩下的邏輯,但是CountDownLatch比join()的功能更加強大,使用方法如下:

import java.util.concurrent.CountDownLatch;
class test{
    static CountDownLatch cdl = new CountDownLatch(2);   //①新建一個CountDwonLatch物件並傳入計數器的值
    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    System.out.println("thread 1: 1 ");
                    Thread.sleep(1000);
                    System.out.println("thread 1 : 2");
                    cdl.countDown();                   //②在被等待的執行緒或者步驟執行完畢後呼叫countDwon()方法讓計數器減1
                }
                catch(InterruptedException e){
                    e.printStackTrace();
                }

            }
        }).start();
        new Thread(new Runnable(){
            @Override
            public void run(){
                try{
                    System.out.println("thread 2 : 1");
                    Thread.sleep(1000);
                    System.out.println("thread 2 : 2");
                    cdl.countDown();
                }
                catch(InterruptedException e){
                    e.printStackTrace();
                }


            }        }).start();
        cdl.await();                                 //③在等待的主執行緒中呼叫await()方法等待其他執行緒,知道計數器為0 再執行主執行緒接下來的邏輯
        System.out.println("thread : main");

    }

}

輸出結果:

thread 1: 1 
thread 2 : 1
thread 1 : 2
thread 2 : 2
thread : main

CountDwonLatch的使用方法:

在CountDwonLatch中,countDwon()方法和await()方法搭配使用才能起到類似join()的作用:

(1)首先建立一個CountDwonLatch物件並傳入要等待的執行緒的數量,這是個計數器;

(2)在被等待的執行緒或者步驟執行完畢後呼叫countDwon()方法讓計數器減1,countDwon()方法是一個等待的計數器,每次呼叫countDwon()方法,計數器減1,直到計數器為0,countDwon可以用在任何地方,可以是一個步驟的一個點,也可以是一個執行緒。

(3)在等待其他執行緒的主執行緒中,呼叫await()方法來等待其他呼叫了countDwon()的執行緒,直到計數器為0,再執行該執行緒接下來的邏輯,當然,如果某個執行緒執行的時間過久,當前執行緒不可能一直等待,那麼可以呼叫await(long time, TimeUnit unit)方法。

*注意:計數器大於0時才會阻塞當前執行緒,一旦計數器等於0就不會再阻塞呼叫await()的當前執行緒;

            在建立CountDwonLatch時傳入計數器的初始值後,計數器就不能重新初始化了;

2、CyclicBarrier(同步屏障)

讓一組執行緒到達一個屏障(或者同步點)時被阻塞,知道所有執行緒到達同步屏障後,同步屏障開門,所有執行緒繼續執行。

同步屏障的使用:同步屏障有兩種使用方式,從CyclicBarrier的構造方式撒謊個可以看出來

 //構造器 1
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
//構造器 2 
public CyclicBarrier(int parties) {
     this(parties, null);
}

構造器1:傳入要阻塞的執行緒的數量parties和一個Runnable的物件,這個物件的作用是用於處理以下複雜的業務場景情形:當需要在第一個執行緒到達屏障前,前處理一個任務,這個任務可以寫在barrierAction中。

構造器2:runnable物件為null說明只需要簡單的等待其他執行緒到達同步屏障即可。

接下來舉例說明兩種同步屏障的使用:

構造器2:

(1)建立一個CyclicBarrier物件,傳入要阻塞在同步屏障的執行緒數量;

(2)在每個要阻塞在同步屏障的子執行緒中呼叫cb.await( )方法;

import java.util.concurrent.CyclicBarrier;
class test{
    static CyclicBarrier cb = new CyclicBarrier(2);  //建立一個CyclicBarrier物件,傳入要阻塞在同步屏障的執行緒數量;
    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    cb.await();    //在每個要阻塞在同步屏障的子執行緒中呼叫cb.await( )方法;
                    System.out.println(1);
                }
                catch(Exception e){
                    System.out.println("thread 1");
                }
            }
        }).start();
        try{
            cb.await();       //在每個要阻塞在同步屏障的子執行緒中呼叫cb.await( )方法;
            System.out.println(3);
        }
        catch(Exception e){
            System.out.println("main");
        }
    }

}

構造器1:

(1)建立一個CyclicBarrier物件,傳入要阻塞在同步屏障的執行緒數量,和barrierAction物件;

(2)寫一個實現Runnable介面的class,用於實現第一個執行緒到達同步屏障前的業務邏輯;

(3)在每個要阻塞在同步屏障的子執行緒中呼叫cb.await( )方法;

import java.util.concurrent.CyclicBarrier;  
class test{
    static CyclicBarrier cb = new CyclicBarrier(2,new DoSomeThing()); //建立一個CyclicBarrier物件,傳入要阻塞在同步屏障的執行緒數量,和barrierAction物件;
    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    cb.await();             // 在每個要阻塞在同步屏障的子執行緒中呼叫cb.await( )方法;
                    System.out.println(1);
                }
                catch(Exception e){
                    System.out.println("thread 1");
                }
            }
        }).start();
        try{
            cb.await();
            System.out.println(2);
        }
        catch(Exception e){
            System.out.println("main");
        }
    }
    public static class DoSomeThing implements Runnable{  //寫一個實現Runnable介面的class,用於實現第一個執行緒到達同步屏障前的業務邏輯;
        @Override
        public void run(){
            System.out.println("happen-before CyclicBarrier");
        }
    }
}

*注意:

因為cb.await( )方法會丟擲InterruptedException和BrokenBarrierException異常,因此在子執行緒中要使用try-catch方法來捕捉這兩種異常;

CyclicBarrier中的執行緒計數器可以使用reset()方法重置;

在所有執行緒到達同步屏障後,並不是所有執行緒“同時”開始執行,而是使各個執行緒的啟動時間降到最低;

3、Semaphore(訊號量)

Semaphore用於流量控制,用於控制訪問特定資源的執行緒數量,通過協調各個執行緒,以保證合理的使用公共資源。Semaphore就像公路上的交通訊號燈或者收費站,用於控制車的流量,如果一條公路只允許10 個車通行,相當於在路口設定一個頒發通行證的收費站,一輛車只有收到了通行證才能進入這條公路,如果通行證頒發完畢了,那麼其他的車就先等在收費站外面,如果公路上的車出了這條公路,那麼他把通行證還給收費站,收費站又可以繼續把通行證頒發給等在收費站外面的車。

Semaphore的使用方法:

(1)建立一個Semaphore物件,並傳入一個int型別的引數,初始化通行證數量;

(2)在要佔用公共資源的子執行緒業務邏輯前,呼叫s.acquire( )方法獲得通行證,在實現業務邏輯後,呼叫release()方法釋放通行證;

import java.util.concurrent.Semaphore;
class test{
    static Semaphore s = new Semaphore(5);  //建立一個Semaphore物件,並傳入一個int型別的引數,初始化通行證數量;
    public static void main(String[] args) throws InterruptedException {
        for(int i = 0; i<10; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        s.acquire(); //在要佔用公共資源的子執行緒業務邏輯前,呼叫s.acquire( )方法獲得通行證
                        System.out.println(Thread.currentThread() + "is saving data    " + "availablePermits:" + s.availablePermits() + "   getQueueLength:" + s.getQueueLength());
                        Thread.sleep(1000);
                        s.release();  //在實現業務邏輯後,呼叫release()方法釋放通行證;
                    }
                    catch(Exception e){
                        System.out.println("thread 1");
                    }

                }
            }).start();
        }

    }

}

輸出結果:

Thread[Thread-3,5,main]is saving data    availablePermits:4   getQueueLength:0
Thread[Thread-7,5,main]is saving data    availablePermits:3   getQueueLength:0
Thread[Thread-0,5,main]is saving data    availablePermits:2   getQueueLength:0
Thread[Thread-4,5,main]is saving data    availablePermits:1   getQueueLength:0
Thread[Thread-8,5,main]is saving data    availablePermits:0   getQueueLength:0
Thread[Thread-2,5,main]is saving data    availablePermits:0   getQueueLength:4
Thread[Thread-6,5,main]is saving data    availablePermits:0   getQueueLength:3
Thread[Thread-1,5,main]is saving data    availablePermits:0   getQueueLength:2
Thread[Thread-5,5,main]is saving data    availablePermits:0   getQueueLength:1
Thread[Thread-9,5,main]is saving data    availablePermits:0   getQueueLength:0

可以發現,我們在程式碼中,有10個執行緒在執行,但是隻有5個執行緒併發的執行,剩餘的執行緒都在等待佇列中。

Semaphore還有一些其他的方法:

boolean  tryAcquire()嘗試獲取通行證;

int  getQueueLength(  )獲取等待佇列(執行緒)的長度;

int  availablePermits()獲取剩餘可用的通行證數量;

boolean  QueuedThreads(  )是否有執行緒在等待獲取通行證;

void  reducePermits(int reduction)減少通行證數量,這是一個protedcted方法;

Cillection  getQueuedThreads(  )獲取等待獲取通行證的執行緒集合,這是個protected方法;


4、Exchanger(交換者)

Exchanger是一個執行緒間提供資料交換功能的寫作工具,他提供了一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。執行緒間通過呼叫excahange()方法交換資料,如果第一個執行緒先到達同步點,執行exchange方法,那麼他會一直在同步點等待第二個執行緒到達同步點,第二個執行緒也執行exchange方法,這時兩個執行緒都到達同步點,可以交換彼此的資料。

使用:

(1)建立一個Exchanger物件;

(2)在要交換(同步)資料的同步點呼叫excr.exchange(  )方法

import java.util.concurrent.Exchanger;
class test{
    static Exchanger<String> exc = new Exchanger<>();  //建立一個Exchanger物件;
    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    String A= "銀行流水A";
                    String B = exc.exchange(A); //在要交換(同步)資料的同步點呼叫excr.exchange(  )方法
                    System.out.println("A的視角: A、B流水是否一致:" + A.equals(B) + "   A錄入的是:" + A + "   B錄入的是:" + B);

                }
                catch(Exception e){
                    e.printStackTrace();
                }

            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    String B= "銀行流水B";
                    String A = exc.exchange(B);  //在要交換(同步)資料的同步點呼叫excr.exchange(  )方法
                    System.out.println("B的視角: A、B流水是否一致:" + A.equals(B) + "   A錄入的是:" + A + "   B錄入的是:" + B);
                }
                catch(Exception e){
                    e.printStackTrace();
                }

            }
        }).start();

    }

}

輸出:

B的視角: A、B流水是否一致:false   A錄入的是:銀行流水A   B錄入的是:銀行流水B
A的視角: A、B流水是否一致:false   A錄入的是:銀行流水A   B錄入的是:銀行流水B

可以看出,Exchanger的“交換”更偏向於資料的同步與共享,而不是“你的給我,我的給你”這樣有來有回的交換,是“你知道一個資訊,我知道另一個訊息,我們彼此交換了資訊,那麼我們就都知道了兩個訊息”。如果不願意在同步點一直等待另一個執行緒,那麼可以用設定等待時間的exchange方法:excr.exchange(V x, long timeout, timeUnit unit)。

*注意:

Exchanger交換資料是成對的交換;

Exchanger可以看做雙向的同步佇列,一個執行緒從個佇列頭部進行操作,一個從個佇列尾部進行操作;

二、併發工具類的使用場景

1、CountDwonLatch:一個執行緒等到其它幾個執行緒執行完才能執行的場景;

2、CyclicBarrier:需要多執行緒的計算結果最後對這些結果進行合併的場景;

3、Semaphore:公共資源有限而併發執行緒較多的場景;

4、Exchanger:需要資料交換共享的場景,例如遺傳演算法中,需要選擇兩個人來交配,交換兩人的資料並根據交換規則來得到交配結果,再例如用於校對工作,交換兩個執行緒的資料,用於校對兩個執行緒的資料是否相等;

三、併發工具類CountDwonLatch與CyclicBarrier的區別

1、CountDwonLatch阻塞一個執行緒,CyclicBarrier阻塞多個執行緒;

2、CountDwonLatch的計數器在構造物件時確定了就不能更改,但是CyclicBarrier可以使用reset()方法重置計數器;

3、CyclicBarrier的功能更加豐富,比如int getWaitingNumber()方法返回被阻塞在同步屏障的執行緒數,boolean  isBroken()方法返回是否有阻塞的執行緒被中斷