1. 程式人生 > >java多執行緒併發系列之閉鎖(Latch)和柵欄(CyclicBarrier)

java多執行緒併發系列之閉鎖(Latch)和柵欄(CyclicBarrier)

-閉鎖(Latch)
閉鎖(Latch):一種同步方法,可以延遲執行緒的進度直到執行緒到達某個終點狀態。通俗的講就是,一個閉鎖相當於一扇大門,在大門開啟之前所有執行緒都被阻斷,一旦大門開啟所有執行緒都將通過,但是一旦大門開啟,所有執行緒都通過了,那麼這個閉鎖的狀態就失效了,門的狀態也就不能變了,只能是開啟狀態。也就是說閉鎖的狀態是一次性的,它確保在閉鎖開啟之前所有特定的活動都需要在閉鎖開啟之後才能完成。
應用場景:
確保某個計算在其需要的所有資源都被初始化之後才繼續執行。二元閉鎖(包括兩個狀態)可以用來表示“資源R已經被初始化”,而所有需要R的操作都必須先在這個閉鎖上等待。
確保某個服務在其依賴的所有其他服務都已經啟動之後才啟動。
等待直到某個操作的所有參與者都就緒在繼續執行。(例如:多人遊戲中需要所有玩家準備才能開始)
CountDownLatch是JDK 5+裡面閉鎖的一個實現,允許一個或者多個執行緒等待某個事件的發生。CountDownLatch有一個正數計數器,countDown方法對計數器做減操作,await方法等待計數器達到0。所有await的執行緒都會阻塞直到計數器為0或者等待執行緒中斷或者超時。
-柵欄(CyclicBarrier)
柵欄類似於閉鎖,它能阻塞一組執行緒直到某個事件發生。 柵欄與閉鎖的關鍵區別在於,所有的執行緒必須同時到達柵欄位置,才能繼續執行。閉鎖用於等待事件,而柵欄用於等待其他執行緒。
場景: 應用一些協議,比如幾個家庭成員決定在某個地方集合,所有人在6:00在某地集合,到了以後要等待其他人,之後才能討論去哪裡吃飯。 並行迭代,將一個問題分成很多子問題,當一系列的子問題都解決之後(所有子問題執行緒都已經await()),此時將柵欄開啟,所有子問題執行緒被釋放,而柵欄位置可以留著下次使用。
-例子:兩個分別關於CountDownlatch和CyclicBarrier的例子
1、CountDownLatch

有三個工人在為老闆幹活,這個老闆有一個習慣,就是當三個工人把一天的活都幹完了的時候,他就來檢查所有工人所幹的活。記住這個條件:三個工人先全部幹完活,老闆才檢查。所以在這裡用Java程式碼設計兩個類,Worker代表工人,Boss代表老闆,具體的程式碼實現如下:
工人:

package LatchAndCyclicBarrier;
 
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
 
public class Work implements Runnable{
 
 
        private CountDownLatch downLatch;
        private String name;
 
        public Work(CountDownLatch downLatch, String name){
            this.downLatch = downLatch;
            this.name = name;
        }
 
        public void run() {
            this.doWork();
            try{
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            }catch(InterruptedException ie){
            }
            System.out.println(this.name + "活幹完了!");
            this.downLatch.countDown();
 
        }
 
        private void doWork(){
            System.out.println(this.name + "正在幹活!");
        }
 
    }
 
老闆:

 
package LatchAndCyclicBarrier;
 
import java.util.concurrent.CountDownLatch;
 
public class Boss implements Runnable{
 
        private CountDownLatch downLatch;
 
        public Boss(CountDownLatch downLatch){
            this.downLatch = downLatch;
        }
 
        public void run() {
            System.out.println("老闆正在等所有的工人幹完活......");
            try {
                this.downLatch.await();
            } catch (InterruptedException e) {
            }
            System.out.println("工人活都幹完了,老闆開始檢查了!");
        }
 
    }
 
 
 
測試程式碼:

package LatchAndCyclicBarrier;
 
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class TestLatch {
 
    public static void main(String[] args) {
            ExecutorService executor = Executors.newCachedThreadPool();
 
            CountDownLatch latch = new CountDownLatch(3);
 
            Work w1 = new Work(latch,"張三");
            Work w2 = new Work(latch,"李四");
            Work w3 = new Work(latch,"王二");
 
            Boss boss = new Boss(latch);
 
            executor.execute(w3);
            executor.execute(w2);
            executor.execute(w1);
            executor.execute(boss);
 
            executor.shutdown();
        }
 
    }
 
 
 
 
執行結果:

李四正在幹活!
老闆正在等所有的工人幹完活......
王二正在幹活!
張三正在幹活!
李四活幹完了!
王二活幹完了!
張三活幹完了!
工人活都幹完了,老闆開始檢查了!
2、CyclicBarrier

接著上面的例子,還是這三個工人,不過這一次,這三個工人自由了,老闆不用檢查他們任務了,他們三個合作建橋,有三個樁,每人打一個,同時打完之後才能一起搭橋(搭橋需要三人一起合作)。也就是說三個人都打完樁之後才能繼續工作。
 
package LatchAndCyclicBarrier;
 
import java.util.concurrent.CyclicBarrier;
 
public class CycWork implements Runnable {
 
 
        private CyclicBarrier cyclicBarrier ;
        private String name ;
 
        public CycWork(CyclicBarrier cyclicBarrier,String name)
       {
               this .name =name;
               this .cyclicBarrier =cyclicBarrier;
       }
 
        @Override
        public void run() {
               // TODO Auto-generated method stub
 
              System. out .println(name +"正在打樁,畢竟不輕鬆。。。。。" );
 
               try {
                     Thread. sleep(5000);
                     System. out .println(name +"不容易,終於把樁打完了。。。。" );
                      cyclicBarrier .await();
 
              } catch (Exception e) {
                      // TODO: handle exception
                     e.printStackTrace();
              }
 
              System. out .println(name +":其他逗b把樁都打完了,又得忙活了。。。" );
 
 
       }
 
}
測試程式

package LatchAndCyclicBarrier;
 
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class CycTest {
 
        public static void main(String[] args)
       {
              ExecutorService executorpool=Executors. newFixedThreadPool(3);
              CyclicBarrier cyclicBarrier= new CyclicBarrier(3);
 
              CycWork work1= new CycWork(cyclicBarrier, "張三" );
              CycWork work2= new CycWork(cyclicBarrier, "李四" );
              CycWork work3= new CycWork(cyclicBarrier, "王五" );
 
              executorpool.execute(work1);
              executorpool.execute(work2);
              executorpool.execute(work3);
 
              executorpool.shutdown();
 
       }
 
}
執行結果:

李四正在打樁,畢竟不輕鬆。。。。。
張三正在打樁,畢竟不輕鬆。。。。。
王五正在打樁,畢竟不輕鬆。。。。。
李四不容易,終於把樁打完了。。。。
張三不容易,終於把樁打完了。。。。
王五不容易,終於把樁打完了。。。。
王五:其他逗b把樁都打完了,又得忙活了。。。
李四:其他逗b把樁都打完了,又得忙活了。。。
張三:其他逗b把樁都打完了,又得忙活了。。。
CountDownlatch和CyclicBarrierde 原始碼部分

1、CountDownLatch中的兩個關鍵方法

  public void countDown() {    //對計數器減一 表示有一個事件已經發生了
        sync.releaseShared(1); 
    }
 
 public void await() throws InterruptedException { //等到計數器為0
        sync.acquireSharedInterruptibly(1);
    }
 
await方法呼叫了AbstractQueuedSynchronizer中的acquireSharedInterruptibly
 public final void acquireSharedInterruptibly (int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
 public final boolean releaseShared (int arg) {
        if (tryReleaseShared(arg)) {   
            doReleaseShared();
            return true ;
        }
        return false ;
    }
protected boolean tryReleaseShared (int arg) {
        throw new UnsupportedOperationException();
    }
 
2、CyclicBarrier中的await()方法

  public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }
 private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
 
            if (g.broken)
                throw new BrokenBarrierException();
 
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
 
           int index = --count;
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }
 
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
 
                if (g.broken)
                    throw new BrokenBarrierException();
 
                if (g != generation)
                    return index;
 
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
 
上面dowait方法中有一個index,index=--count而count的值在原始碼中來自
count = parties;
提到 parties就不得不看看構造函數了

 public CyclicBarrier(int parties) {
        this(parties, null);
    }
 
如上例子,我們構造了CyclicBarrier(3)那麼此時的 count值為3,接著dowait原始碼,當index==0時,後面執行的

final Runnable command = barrierCommand;
其實是可以設定的,這個Runnable可以傳進來,當我們希望所有執行緒都達到某一時刻之後,用什麼執行緒執行接下來的工作,當沒有傳Runnable進來時,就繼續執行(喚醒其他執行緒),否則就runnable.run()(喚醒其他執行緒之前執行)