1. 程式人生 > >CountDownLatch、CyclicBarrier和Semaphore 使用示例及原理

CountDownLatch、CyclicBarrier和Semaphore 使用示例及原理

CountDownLatch

CountDownLatch使用者監聽某些初始化操作,並且執行緒進行阻塞,等初始化執行完畢後,通知主執行緒繼續工作執行。

CountDownLatch 使用示例

使用示例,執行緒t3 要等待t1和t2執行完畢才執行:

/**
 * @Description: CountDownLatch 等待和喚醒
 * @Author: wangmeng
 * @Date: 2018/12/16-16:38
 */
public class UseCountDownLatch {

    public static void main(String[] args)
{ CountDownLatch countDownLatch = new CountDownLatch(2); Thread t1 = new Thread(new Runnable() { @Override public void run() { System.out.println("進入t1執行緒。。。"); try { TimeUnit.SECONDS.sleep(3); }
catch (InterruptedException e) { e.printStackTrace(); } System.out.println("t1執行緒初始化完畢,通知t3執行緒繼續操作!"); countDownLatch.countDown(); } }, "t1"); Thread t2 = new Thread(new Runnable() { @Override
public void run() { System.out.println("進入t2執行緒。。。"); try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("t2執行緒初始化完畢,通知t3執行緒繼續操作!"); countDownLatch.countDown(); } }, "t2"); Thread t3 = new Thread(new Runnable() { @Override public void run() { System.out.println("進入t3 執行緒,並且等待..."); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("t3執行緒進行後續的執行操作..."); } }, "t3"); t1.start(); t2.start(); t3.start(); } }

列印結果:

進入t1執行緒。。。
進入t3 執行緒,並且等待...
進入t2執行緒。。。
t1執行緒初始化完畢,通知t3執行緒繼續操作!
t2執行緒初始化完畢,通知t3執行緒繼續操作!
t3執行緒進行後續的執行操作...

CountDownLatch 原始碼解讀

其實CountDownLatch用的底層原理就是AQS, 可以參考:(AQS原理詳解)。AQS全域性維護的有一個volatile修飾的state欄位,當state為0時就會通知countDownLatch等待執行緒執行。
這也就是所以我們在new CountDownLatch(int n) 時指定的引數,n為多少,也就是要呼叫多少次countDown()方法。

public void await() throws InterruptedException { }; //呼叫await()方法的執行緒會被掛起,它會等待直到count值為0才繼續執行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()類似,只不過等待一定的時間後count值還沒變為0的話就會繼續執行
public void countDown() { }; //將count值減1

看看await()方法, 其底層呼叫的就是AQS中的getState方法,通過判斷state是否為0來決定是否喚醒等待的執行緒。
如果不為0則呼叫Unsafe中的park方法進行自旋,直到state==0時才繼續往下執行(喚醒等待的執行緒)。

public void await() throws InterruptedException {
    //呼叫AQS中的方法
     sync.acquireSharedInterruptibly(1);
 }

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

//CountDownLatch中的方法,獲取state值
protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
 }

//當AQS中的state不為0就會執行此方法,這個方法也就是讓執行緒等待。使用直到state==0才結束迴圈。
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

看到上面await方法了,那麼countDown就可以直接猜出來了,無外乎就是使得AQS中的state通過CAS操作進行減一,如下:

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

CyclicBarrier

CyclicBarrier是迴環柵欄的概念,多執行緒來的進行阻塞,等待某一個臨界值條件滿足後,同時執行。
假設有一個場景:每個執行緒代表一個跑步運動員,當運動員都準備好後,才一起出發,只要有一個人沒有準備好,大家都等待。

CyclicBarrier 應用例項

/**
 * @Description: 測試CyclicBarrier
 * @Author: wangmeng
 * @Date: 2018/12/16-17:05
 */
public class UseCyclicBarrier {

    //模擬運動員的類。
    static class Runner implements Runnable {

        private String name;

        private CyclicBarrier cyclicBarrier;

        @Override
        public void run() {
            try {
                System.out.println("運動員:" + this.name + "進行準備工作!");
                TimeUnit.SECONDS.sleep((new Random().nextInt(5)));
                System.out.println("運動員:" + this.name + "準備完成!");
                this.cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }

            System.out.println("運動員" + this.name + "開始起跑!!!");
        }

        public Runner(String name, CyclicBarrier cyclicBarrier) {
            this.name = name;
            this.cyclicBarrier = cyclicBarrier;
        }
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService executorPools = Executors.newFixedThreadPool(3);

        executorPools.submit(new Thread(new Runner("張三", cyclicBarrier)));
        executorPools.submit(new Thread(new Runner("李四", cyclicBarrier)));
        executorPools.submit(new Thread(new Runner("王五", cyclicBarrier)));

        executorPools.shutdown();
    }
}

列印的結果:

運動員:張三進行準備工作!
運動員:李四進行準備工作!
運動員:王五進行準備工作!
運動員:張三準備完成!
運動員:王五準備完成!
運動員:李四準備完成!
運動員李四開始起跑!!!
運動員張三開始起跑!!!
運動員王五開始起跑!!!

可以看到三個執行緒都是先執行完初始化操作,然後才一起喚醒執行後續的操作。

CyclicBarrier 原始碼解讀

CyclicBarrier是通過ReentrantLock和Condition來實現的。

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 鎖住
    lock.lock();
    try {
        // 當前代
        final Generation g = generation;
        // 如果這代損壞了,丟擲異常
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果執行緒中斷了,丟擲異常
        if (Thread.interrupted()) {
            // 將損壞狀態設定為 true
            // 並通知其他阻塞在此柵欄上的執行緒
            breakBarrier();
            throw new InterruptedException();
        }
        // 獲取下標    
        int index = --count;
        // 如果是 0 ,說明到頭了
        if (index == 0) { // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // 執行柵欄任務
                if (command != null)
                    command.run();
                ranAction = true;
                // 更新一代,將 count 重置,將 generation 重置.
                // 喚醒之前等待的執行緒
                nextGeneration();
                // 結束
                return 0;
            } finally {
                // 如果執行柵欄任務的時候失敗了,就將柵欄失效
                if (!ranAction)
                    breakBarrier();
            }
        }

        for (;;) {
            try {
                // 如果沒有時間限制,則直接等待,直到被喚醒
                if (!timed)
                    trip.await();
                // 如果有時間限制,則等待指定時間
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // g == generation >> 當前代
                // ! g.broken >>> 沒有損壞
                if (g == generation && ! g.broken) {
                    // 讓柵欄失效
                    breakBarrier();
                    throw ie;
                } else {
                    // 上面條件不滿足,說明這個執行緒不是這代的.
                    // 就不會影響當前這代柵欄執行邏輯.所以,就打個標記就好了
                    Thread.currentThread().interrupt();
                }
            }
            // 當有任何一個執行緒中斷了,會呼叫 breakBarrier 方法.
            // 就會喚醒其他的執行緒,其他執行緒醒來後,也要丟擲異常
            if (g.broken)
                throw new BrokenBarrierException();
            // g != generation >>> 正常換代了
            // 一切正常,返回當前執行緒所在柵欄的下標
            // 如果 g == generation,說明還沒有換代,那為什麼會醒了?
            // 因為一個執行緒可以使用多個柵欄,當別的柵欄喚醒了這個執行緒,就會走到這裡,所以需要判斷是否是當前代。
            // 正是因為這個原因,才需要 generation 來保證正確。
            if (g != generation)
                return index;
            // 如果有時間限制,且時間小於等於0,銷燬柵欄,並丟擲異常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

用上面的示例總結一下CyclicBarrier的await方法實現,假設執行緒thread1和執行緒thread2都執行到CyclicBarrier的await(),都進入dowait(boolean timed, long nanos),thread1先獲取到獨佔鎖,執行到–count的時,index等於1,所以進入下面的for迴圈,接著執行trip.await(),進入await()方法,執行Node node = addConditionWaiter()將當前執行緒構造成Node節點並加入到Condition等待佇列中,然後釋放獲取到的獨佔鎖,當前執行緒進入阻塞狀態;此時,執行緒thread2可以獲取獨佔鎖,繼續執行–count,index等於0,所以先執行command.run(),輸出myThread,然後執行nextGeneration(),nextGeneration()中trip.signalAll()只是將Condition等待佇列中的Node節點按之前順序都轉移到了AQS同步佇列中,這裡也就是將thread1對應的Node節點轉移到了AQS同步佇列中,thread2執行完nextGeneration(),返回return 0之前,細看程式碼還需要執行lock.unlock(),這裡會執行到ReentrantLock的unlock()方法,最終執行到AQS的unparkSuccessor(Node node)方法,從AQS同步佇列中的頭結點開始釋放節點,喚醒節點對應的執行緒,即thread1恢復執行。

如果有三個執行緒thread1、thread2和thread3,假設執行緒執行順序是thread1、thread2、thread3,那麼thread1、thread2對應的Node節點會被加入到Condition等待佇列中,當thread3執行的時候,會將thread1、thread2對應的Node節點按thread1、thread2順序轉移到AQS同步佇列中,thread3執行lock.unlock()的時候,會先喚醒thread1,thread1恢復繼續執行,thread1執行到lock.unlock()的時候會喚醒thread2恢復執行。

更多可參考:併發程式設計之 CyclicBarrier 原始碼分析

CountdownLatch和CyclicBarrier的區別

1、CountDownLatch簡單的說就是一個執行緒等待,直到他所等待的其他執行緒都執行完成並且呼叫countDown()方法發出通知後,當前執行緒才可以繼續執行。
2、CyclicBarrier是所有執行緒都進行等待,直到所有執行緒都準備好進入await()方法之後,所有執行緒同時開始執行!
3、CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為複雜的業務場景,比如如果計算髮生錯誤,可以重置計數器,並讓執行緒們重新執行一次。

Semaphore

Semaphore翻譯成字面意思為 訊號量,Semaphore可以控同時訪問的執行緒個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
關於限流的其他方式可以參考我另一篇博文:限流的簡單使用及學習
相關概念:

  • PV(page view)網站的總訪問量,頁面瀏覽量或點選量,使用者每重新整理一次就會被記錄一次。
  • UV(unique Visitor)訪問網站的一臺電腦客戶端為一個訪客。一般來講時間上以00:00-24:00之內相同ip的客戶端只記錄。
  • QPS(query per second)即每秒查詢數,qps很大程度上代表了系統業務上的繁忙程度,每次請求的背後,可能對應著多次磁碟I/O,多次網路請求,多個cpu時間片等。我們通過qps可以非常直觀的瞭解當前系統業務情況,一旦當前qps超過所設定的預警閥值,可以考慮增加機器對叢集擴容,以免壓力過大導致宕機,可以根據前期的壓力測試得到估值,在結合後期綜合運維情況,估算出閥值。
  • RT(response time)請求的響應時間,這個指標非常關鍵,直接說明前端使用者的體驗,任何系統設計師都想降低rt時間。
  • 當然還涉及cpu、記憶體、網路、磁碟等情況,更細節的問題很多,如select、update、delete/ps等資料庫層面的統計。
  • 容量評估:一般來說通過開發、運維、測試、以及業務等相關人員,綜合出系統的一系列閥值,然後我們根據關鍵閥值如qps、rt等,對系統進行有效的變更。
  • 一般來講,我們進行多輪壓力測試以後,可以對系統進行峰值評估,採用所謂的80/20原則,即80%的訪問請求將在20%的時間內達到。這樣我們可以根據系統對應的PV計算出峰值qps。
  • 峰值qps= (總PV × 80%)/ (60 × 60 × 24 × 20%)
  • 然後在將總的峰值qps除以單臺機器所能承受的最高的qps值,就是所需要機器的數量:機器數 = 總的峰值qps / 壓測得出的單機極限qps
  • 當然不排除系統在上線前進行大型促銷活動,或者雙十一、雙十二熱點事件、遭受到DDos攻擊等情況,系統的開發和運維人員急需要了解當前系統執行的狀態和負載情況,一般都會有後臺系統去維護。

Semaphore 使用示例:

/**
 * @Description:使用Semaphore模擬限流操作
 * @Author: wangmeng
 * @Date: 2018/12/16-18:30
 */
public class UseSemaphore {

    public static void main(String[] args) {
        ExecutorService threadPools = Executors.newFixedThreadPool(20);
        //同一時間只能有5個執行緒執行
        Semaphore semaphore = new Semaphore(5);


        for (int i = 0; i < 20; i++) {
            final int token = i;
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();

                        //進行業務操作
                        System.out.println("獲得許可,執行操作..." + token);
                        long sleepTime = (long)(Math.random() * 10000);
                        Thread.sleep(sleepTime);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                }
            };

            threadPools.execute(run);
        }

        System.out.println("queue length: " + semaphore.getQueueLength());
        threadPools.shutdown();
    }
}

原理也是使用AQS中的state變數。程式碼我就不貼了。
Semaphore 就是一個共享鎖,通過設定 state 變數來實現對這個變數的共享。當呼叫 acquire 方法的時候,state 變數就減去一,當呼叫 release 方法的時候,state 變數就加一。當 state 變數為 0 的時候,別的執行緒就不能進入程式碼塊了,就會在 AQS 中阻塞等待。