1. 程式人生 > >java CountDownLatch原理和示例

java CountDownLatch原理和示例

CountDownLatch簡介

CountDownLatch是一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。

CountDownLatch和CyclicBarrier的區別
(01) CountDownLatch的作用是允許1或N個執行緒等待其他執行緒完成執行;而CyclicBarrier則是允許N個執行緒相互等待。
(02) CountDownLatch的計數器無法被重置;CyclicBarrier的計數器可以被重置後使用,因此它被稱為是迴圈的barrier。
關於CyclicBarrier的原理,後面一章再來學習。

CountDownLatch函式列表

CountDownLatch(int count)
構造一個用給定計數初始化的 CountDownLatch。

// 使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷。
void await()
// 使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷或超出了指定的等待時間。
boolean await(long timeout, TimeUnit unit)
// 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒。
void countDown()
// 返回當前計數。
long getCount()
// 返回標識此鎖存器及其狀態的字串。
String toString()

CountDownLatch資料結構

CountDownLatch的UML類圖如下:

CountDownLatch的資料結構很簡單,它是通過”共享鎖”實現的。它包含了sync物件,sync是Sync型別。Sync是例項類,它繼承於AQS。

CountDownLatch原始碼分析(基於JDK1.7.0_40)

CountDownLatch是通過“共享鎖”實現的。下面,我們分析CountDownLatch中3個核心函式: CountDownLatch(int count), await(), countDown()。

1. CountDownLatch(int count)

public
CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }

說明:該函式是建立一個Sync物件,而Sync是繼承於AQS類。Sync建構函式如下:

Sync(int count) {
    setState(count);
}

setState()在AQS中實現,原始碼如下:

protected final void setState(long newState) {
    state = newState;
}

說明:在AQS中,state是一個private volatile long型別的物件。對於CountDownLatch而言,state表示的”鎖計數器“。CountDownLatch中的getCount()最終是呼叫AQS中的getState(),返回的state物件,即”鎖計數器“。

2. await()

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

說明:該函式實際上是呼叫的AQS的acquireSharedInterruptibly(1);

AQS中的acquireSharedInterruptibly()的原始碼如下:

public final void acquireSharedInterruptibly(long arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

說明:acquireSharedInterruptibly()的作用是獲取共享鎖。
如果當前執行緒是中斷狀態,則丟擲異常InterruptedException。否則,呼叫tryAcquireShared(arg)嘗試獲取共享鎖;嘗試成功則返回,否則就呼叫doAcquireSharedInterruptibly()。doAcquireSharedInterruptibly()會使當前執行緒一直等待,直到當前執行緒獲取到共享鎖(或被中斷)才返回。

tryAcquireShared()在CountDownLatch.java中被重寫,它的原始碼如下:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

說明:tryAcquireShared()的作用是嘗試獲取共享鎖。
如果”鎖計數器=0”,即鎖是可獲取狀態,則返回1;否則,鎖是不可獲取狀態,則返回-1。

private void doAcquireSharedInterruptibly(long arg)
    throws InterruptedException {
    // 建立"當前執行緒"的Node節點,且Node中記錄的鎖是"共享鎖"型別;並將該節點新增到CLH佇列末尾。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 獲取上一個節點。
            // 如果上一節點是CLH佇列的表頭,則"嘗試獲取共享鎖"。
            final Node p = node.predecessor();
            if (p == head) {
                long r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // (上一節點不是CLH佇列的表頭) 當前執行緒一直等待,直到獲取到共享鎖。
            // 如果執行緒在等待過程中被中斷過,則再次中斷該執行緒(還原之前的中斷狀態)。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

說明
(01) addWaiter(Node.SHARED)的作用是,建立”當前執行緒“的Node節點,且Node中記錄的鎖的型別是”共享鎖“(Node.SHARED);並將該節點新增到CLH佇列末尾。關於Node和CLH在”Java多執行緒系列–“JUC鎖”03之 公平鎖(一)”已經詳細介紹過,這裡就不再重複說明了。
(02) node.predecessor()的作用是,獲取上一個節點。如果上一節點是CLH佇列的表頭,則”嘗試獲取共享鎖“。
(03) shouldParkAfterFailedAcquire()的作用和它的名稱一樣,如果在嘗試獲取鎖失敗之後,執行緒應該等待,則返回true;否則,返回false。
(04) 當shouldParkAfterFailedAcquire()返回ture時,則呼叫parkAndCheckInterrupt(),當前執行緒會進入等待狀態,直到獲取到共享鎖才繼續執行。
doAcquireSharedInterruptibly()中的shouldParkAfterFailedAcquire(), parkAndCheckInterrupt等函式在”Java多執行緒系列–“JUC鎖”03之 公平鎖(一)”中介紹過,這裡也就不再詳細說明了。

3. countDown()

public void countDown() {
    sync.releaseShared(1);
}

說明:該函式實際上呼叫releaseShared(1)釋放共享鎖。

releaseShared()在AQS中實現,原始碼如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

說明:releaseShared()的目的是讓當前執行緒釋放它所持有的共享鎖。
它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。

tryReleaseShared()在CountDownLatch.java中被重寫,原始碼如下:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        // 獲取“鎖計數器”的狀態
        int c = getState();
        if (c == 0)
            return false;
        // “鎖計數器”-1
        int nextc = c-1;
        // 通過CAS函式進行賦值。
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

說明:tryReleaseShared()的作用是釋放共享鎖,將“鎖計數器”的值-1。

總結CountDownLatch是通過“共享鎖”實現的。在建立CountDownLatch中時,會傳遞一個int型別引數count,該引數是“鎖計數器”的初始狀態,表示該“共享鎖”最多能被count給執行緒同時獲取。當某執行緒呼叫該CountDownLatch物件的await()方法時,該執行緒會等待“共享鎖”可用時,才能獲取“共享鎖”進而繼續執行。而“共享鎖”可用的條件,就是“鎖計數器”的值為0!而“鎖計數器”的初始值為count,每當一個執行緒呼叫該CountDownLatch物件的countDown()方法時,才將“鎖計數器”-1;通過這種方式,必須有count個執行緒呼叫countDown()之後,“鎖計數器”才為0,而前面提到的等待執行緒才能繼續執行!

以上,就是CountDownLatch的實現原理。

CountDownLatch的使用示例

下面通過CountDownLatch實現:”主執行緒”等待”5個子執行緒”全部都完成”指定的工作(休眠1000ms)”之後,再繼續執行。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

public class CountDownLatchTest1 {

    private static int LATCH_SIZE = 5;
    private static CountDownLatch doneSignal;
    public static void main(String[] args) {

        try {
            doneSignal = new CountDownLatch(LATCH_SIZE);

            // 新建5個任務
            for(int i=0; i<LATCH_SIZE; i++)
                new InnerThread().start();

            System.out.println("main await begin.");
            // "主執行緒"等待執行緒池中5個任務的完成
            doneSignal.await();

            System.out.println("main await finished.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class InnerThread extends Thread{
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
                // 將CountDownLatch的數值減1
                doneSignal.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

執行結果

main await begin.
Thread-0 sleep 1000ms.
Thread-2 sleep 1000ms.
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
Thread-3 sleep 1000ms.
main await finished.

結果說明:主執行緒通過doneSignal.await()等待其它執行緒將doneSignal遞減至0。其它的5個InnerThread執行緒,每一個都通過doneSignal.countDown()將doneSignal的值減1;當doneSignal為0時,main被喚醒後繼續執行。