1. 程式人生 > >Java併發程式設計的藝術之八----java中的併發工具類

Java併發程式設計的藝術之八----java中的併發工具類

1.等待多執行緒完成的countDownLatch

CountDownLatch允許一個或多個執行緒等待其他執行緒完成操作。

執行緒中,讓一個執行緒等待最簡單的做法是使用join方法,執行緒A中呼叫B.join方法,說明讓執行緒A等待執行緒B完成之後再執行。

實現原理:不停檢查執行緒是否存活,如果join執行緒存活則讓當前執行緒永遠等待。Wait(0)表示永遠等待下去

直到join執行緒終止後,執行緒的this.notifyAll方法被呼叫

CountDownLatch接受一個int型別的引數作為計數器,如果你想等待n個點完成,這裡就傳入n。呼叫countDownLatch的countDown方法時,n就會減1,如果計數器大於0,await方法等待,如果計數器等於零,await方法不等待。countDown方法可以用在任何地方,可以是n個執行緒,也可以1個執行緒裡n個步驟。

ExecutorService service = Executors.newCachedThreadPool();

       final CountDownLatch cdOrder = new CountDownLatch(1);

       final CountDownLatch cdAnswer = new CountDownLatch(3);

       for(int

i = 0; i < 3; i++){

           Runnable runnable = new Runnable(){

              @Override

              public void run() {

                  // TODO Auto-generated method stub

                  try {

                     System.out.println("執行緒" + Thread.currentThread().getName() +

                         "正在準備接受命令");

                     cdOrder.await();

                     System.out.println("執行緒" + Thread.currentThread().getName() +

                            "已接收命令");

                     Thread.sleep((long)Math.random()*1000);

                     System.out.println("執行緒" + Thread.currentThread().getName() +

                            "迴應命令處理結果");

                     cdAnswer.countDown();

                  } catch (InterruptedException e) {

                     // TODO Auto-generated catch block

                     e.printStackTrace();

                  }

              }

           };

           service.execute(runnable);

       }

       try{

           Thread.sleep((long)Math.random()*1000);

           System.out.println("執行緒" + Thread.currentThread().getName() +

                  "即將釋出命令");

           cdOrder.countDown();

           System.out.println("執行緒" + Thread.currentThread().getName() +

                  "已傳送命令,正在等待結果");

           cdAnswer.await();

           System.out.println("執行緒" + Thread.currentThread().getName() +

                  "已收到所有響應結果");

       }catch(Exception e){

           e.printStackTrace();

       }

       service.shutdown();

      

2.同步屏障CyclicBarrier

可迴圈使用的屏障,讓一組執行緒到達一個屏障(同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。

假設CyclicBarrier=3,如果阻塞沒有到達三個,那麼await方法就會一直等待,如果到達三個,那麼就不會阻塞。

應用場景:

接受指令,有三個執行緒,需要到達指定地點集合,第一個執行緒先到,呼叫await方法,第二個執行緒也是一樣,等到第三個執行緒到了,最後一個執行緒到達屏障,則await開始執行。

CyclicBarrier和CountDownLatch的區別

CountDownLatch的計數器只能用一次,而CyclicBarrier計數器可以使用reset重置,其實不用手動重置,await到達最後一個的時候,就會自動將計數器置為初始化個數,CyclicBarrier能處理更為複雜的業務,

CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的執行緒數量。isBroken()方法用來了解阻塞的執行緒是否被中斷

3.控制併發執行緒數的semaphore

控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源

* semaphore實現的功能就類似廁所有5個坑,假如有十個人要上廁所

 * ,那麼同事能有多少個人去上廁所呢,同時只有5個人能夠佔用,當5個人

 * 中的任何一個人讓開後,其中在等待的另外5個人中又有一個人可以佔用了

 *

 * 另外等待的5個人中可以隨機獲得優先機會,可以是按照先來後到的順序獲得機會

 * ,這取決於構造semaphore物件時傳入的引數選項

 * final Semaphore sp = new Semaphore(3, true);true表示按照先來後來順序

 *

 *單個訊號量的semaphore物件可以實現互斥鎖的功能,並且可以由一個執行緒

 *獲得鎖,另一個執行緒釋放鎖,應用於死鎖回覆的場合

 

public class SemaphoreTest {

    public static void main(String[] args) {

       ExecutorService service = Executors.newCachedThreadPool();

       final Semaphore sp = new Semaphore(3);

       for(int i = 0; i < 10; i++){

           Runnable runnable = new Runnable(){

              public void run(){

                  try{

                     sp.acquire();

                  }catch(InterruptedException e1){

                     e1.printStackTrace();

                  }

                  System.out.println("執行緒" + Thread.currentThread().getName() +

                         "進入,當前已有" + (3 - sp.availablePermits()) + "併發");

                  try{

                     Thread.sleep((long)(Math.random() * 1000));

                  }catch(InterruptedException e){

                     e.printStackTrace();

                  }

                  System.out.println("執行緒" + Thread.currentThread().getName() + "即將離開");

                  sp.release();

                  //下面程式碼有時候執行不準確,因為其沒有和上面程式碼合成原子步驟

                  System.out.println("執行緒" + Thread.currentThread().getName() + "已離開,當前已有"

                         + (3-sp.availablePermits()) + "併發");

              }

           };

           service.execute(runnable);

       }

    }

}

首先執行緒使用Semaphore的acquire()方法獲取一個許可證,如果semaphore小於0,就阻塞使用完之後呼叫release()方法歸還許可證。

4.執行緒間交換資料的Exchanger

它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchange方法交換資料,如果第一個執行緒先執行exchange()方法,它會一直等待第二個執行緒也執行exchange方法,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料,將本執行緒生產出來的資料傳遞給對方

public static void main(String[] args) {

       ExecutorService service = Executors.newCachedThreadPool();

       final Exchanger exchanger = new Exchanger();

       service.execute(new Runnable(){

           @Override

           public void run() {

              // TODO Auto-generated method stub

              try{

                  String data1 = "zxx";

                  System.out.println("執行緒" + Thread.currentThread().getName() +

                         "正在把資料" + data1 + "換出去");

                  Thread.sleep((long) (Math.random()*1000));

                  String data2 = (String) exchanger.exchange(data1);

                  System.out.println("執行緒" + Thread.currentThread().getName() +

                         "換回的資料為" + data2);

              }catch(Exception e){

                  e.printStackTrace();

              }

           }

       });

      

       service.execute(new Runnable(){

           @Override

           public void run() {

              // TODO Auto-generated method stub

              try{

                  String data1 = "lhm";

                  System.out.println("執行緒" + Thread.currentThread().getName() +

                         "正在把資料" + data1 + "換出去");

                  Thread.sleep((long) (Math.random()*1000));

                  String data2 = (String) exchanger.exchange(data1);

                  System.out.println("執行緒" + Thread.currentThread().getName() +

                         "換回的資料為" + data2);

              }catch(Exception e){

                  e.printStackTrace();

              }

           }

       });

      

       service.shutdown();

      

    }