1. 程式人生 > >同步工具類--閉鎖、訊號量、柵欄的總結

同步工具類--閉鎖、訊號量、柵欄的總結

      閉鎖用於一組執行緒等待(阻塞)一個外部事件的發生,這個事件發生之前這些執行緒阻塞,等待控制執行緒開啟閉鎖,然後這些執行緒同時開始執行。閉鎖強調的是阻塞後的同時開始;柵欄則是一組執行緒相互等待,直到所有執行緒都到達某一點時才打開柵欄,然後執行緒可以繼續執行,也就是說控制執行緒先設定一個時間點,然後這些執行緒各自執行,執行完等待(阻塞),直到這組執行緒中的所有執行緒執行完,然後控制執行緒柵欄開啟,這些執行緒同時繼續執行。柵欄強調的是各自執行完後的相互等待以及繼續執行。訊號量根據一個計數器控制一個結果的數量,條件滿足情況下才能進行增加和移除操作,否則進行操作的執行緒阻塞。

工具

作用

主要方法

閉鎖

(CountDownLatch)

類似於門。門初始是關閉的,試圖進門的執行緒掛起等待開門。當負責開門程序將門開啟後,所有等待執行緒被喚醒。

門一旦開啟就不能再關閉了。

CountDownLatch(int n):指定閉鎖計數器

await() :掛起等待閉鎖計數器為0

countDown():閉鎖計數器減1

柵欄

(CyclicBarrier)

和閉鎖有類似之處。閉鎖是等待“開門”事件;柵欄是等待其他執行緒。例如有N個執行緒檢視通過柵欄,此時先到的要等待,直到所有執行緒到到達後,柵欄開啟,所有等待執行緒被喚醒通過柵欄。

CyclicBarrier(int n):需要等待的執行緒數量

await():掛起等待達到執行緒數量

訊號量

(Semaphore)

和鎖的作用類似。區別是鎖只允許被一個執行緒獲取,但是訊號量可以設定資源數量。當沒有可用資源時,才被掛起等待。

Semaphore(int n):指定初始的資源數量

acquire():試圖獲取資源。當沒有可用資源時掛起

release():釋放一個資源

場景對比:

l  閉鎖場景:幾個人相約去公園遊玩,在家做好準備,約定在某一時刻同時出發去公園,準備工作進行的快的不能提前出門,到點出門。

l  柵欄場景:幾個人相約去公園遊玩,幾個人去到公園門口,要等全部到達公園門口後才一起進入公園。

l

  訊號量場景:幾個人相約去公園遊玩,等大家都到公園後,發現來的太遲了,公園遊客飽和,公園限制入場遊客的數量。遊客在門口等待,出來一人,再進入一人,只能一個一個進入。

CountDownLatch:

public class TestHarness{
    public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{
        final CountDownLatch startGate = new CountDownLatch(1);//全部執行緒開始的閉鎖
        final CountDownLatch endGate = new CountDownLatch(nThread);//全部執行緒結束的閉鎖
        for(int i = 0 ; i < nThreads ; i++){
            Thread t = new Thread(){
                public void run(){
                    try{
                        startGate.await();//所有的執行緒都在這裡等待閉鎖開啟
                        try{
                            task.run();//實際要呼叫的方法
                        }finally{
                            endGate.countDown(); //每一個執行緒執行完畢,呼叫countDown()方法,直到全部的執行緒(nThreads個)執行完畢,閉鎖開啟
                        }
                    }catch(InterruptedException e){}
                }
            };
            t.start(); //t.start()不會立即開始執行task.run()方法,執行緒一開始執行startGate.await();會等待閉鎖startGate開啟
        }
        long start = System.nanoTime();//記錄開始時間
        startGate.countDown();//閉鎖startGate初始化為1,執行一次countDown()方法,閉鎖就打開了,所有的執行緒可以繼續執行
        endGate.await();//當前執行緒在這裡等待所有的執行緒執行完畢
        long end = System.nonaTIme();//記錄結束時間
        return end - start;
    }
}

semaphore :

public class BoundedHashSet<T>{
        private final Set<T> set;
        private final Semaphore sem;
        //構造方法,初始化邊界
        public BoundedHashSet(int bound){
            this.set = Collections.synchronizedSet(new HashSet<T>());
            sem = new Semaphore(bound);//初始化許可集的大小,即該有界阻塞佇列的大小
        }
 
        public boolean add(T o) throws InterruptedException{
            sem.acquire();//新增元素時獲取一個許可,如果剩餘許可為0,則說明此有界阻塞佇列已滿
            boolean wasAdded = false; //返回值,預設為新增元素失敗。
            try{
                wasAdded=set.add(o);//向佇列中新增元素,如果成功,則返回成功
                return wasAdded;
            }finally{
                if(!wasAdded) sem.release(); //如果新增失敗,把方法一開始獲取的許可釋放掉
            }
        }
        
        public boolean remove (Object o){
            boolean wasRemoved = set.remove(o);
            if(wasRemoved) sem.release();//如果刪除成功,則釋放一個許可
            return wasRemoved;
        }
    }

barrier:

class Solver {
   final int N;
   final float[][] data;
   final CyclicBarrier barrier;
   
   class Worker implements Runnable {
     int myRow;
     Worker(int row) { myRow = row; }
     public void run() {
       while (!done()) {
         processRow(myRow); //每一個執行緒負責處理一行資料


         try {
           barrier.await(); //處理完行資料之後在這個關卡等待其他執行緒,當所有的執行緒都處理完畢,才繼續向下執行
         } catch (InterruptedException ex) { 
            return; 
         } catch (BrokenBarrierException ex) { 
            return; 
         }
       }
     }
   }


   public Solver(float[][] matrix) {
     data = matrix;
     N = matrix.length;
     barrier = new CyclicBarrier(N, 
                                 new Runnable() {
                                   public void run() { 
                                     mergeRows(...); //彙總計算的結果語句
                                   }//關卡CyclicBarrier 初始化時引數一為:關卡需要等待的執行緒的數量n,只有當n個執行緒全部到達此關卡,才能順利通過;引數二為:一個關卡行為,當關卡順利通過之後,在一個子任務執行緒中執行這個行為。Runnable型別~
                                 });
     for (int i = 0; i < N; ++i) 
       new Thread(new Worker(i)).start(); //每一行一個執行緒處理,分別啟動


     waitUntilDone();
   }
}