介紹

當你看到這篇文章的時候需要先了解AQS的原理,因為本文不涉及到AQS內部原理的講解。

CountDownLatch是一種同步輔助,讓我們多個執行緒執行任務時,需要等待執行緒執行完成後,才能執行下面的語句,之前執行緒操作時是使用Thread.join方法進行等待,CountDownLatch內部使用了AQS鎖,前面已經講述過AQS的內部結構,其實內部有一個state欄位,通過該欄位來控制鎖的操作,CountDownLatch是如何控制多個執行緒執行都執行結束?其實CountDownLatch內部是將state作為計數器來使用,比如我們初始化時,state計數器為3,同時開啟三個執行緒當有一個執行緒執行成功,每當有一個執行緒執行完成後就將state值減少1,直到減少到為0時,說明所有執行緒已經執行完畢。

原始碼解析

以一個例子來開始進行原始碼解析,後面的內容會針對例子來進行原始碼的分解過程,我們開啟三個執行緒,主執行緒需要等待三個執行緒都完成後才能進行後續任務處理,原始碼如下所示:

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        // 計數器3個。
        CountDownLatch countDownLatch = new CountDownLatch(3);

        for (int i = 0; i < 3; ++i) {
            new Thread(new Worker(countDownLatch, i)).start();
        }
        // 等待三個執行緒都完成
        countDownLatch.await();
        System.out.println("3個執行緒全部執行完成");
    }

    // 搬運工人工作執行緒工作類。
    static class Worker implements Runnable {
        private final CountDownLatch countDown;
        private final Integer id;

        Worker(CountDownLatch countDown, Integer id) {
            this.countDown = countDown;
            this.id = id;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(500);
                doWork();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDown.countDown();
            System.out.println("第" + id + "個執行緒執行完成工作");
        }

        void doWork() {
            System.out.println("第" + id + "個執行緒開始工作");
        }
    }
}

通過一個例子來說明一下CountDownLatch的工作原理,上面例子我們開啟了三個執行緒,每個執行緒分別執行自己的任務,主執行緒等待三個執行緒執行完成,看一下輸出的結果:

等待三個執行緒完成
第1個執行緒開始工作
第0個執行緒開始工作
第0個執行緒執行完成工作
第1個執行緒執行完成工作
第2個執行緒開始工作
第2個執行緒執行完成工作
3個執行緒全部執行完成

這裡我們將三個執行緒想象成搬運工人,將貨物搬運到車上,三個人必須將自己手頭分配的任務都搬運完成後才能觸發,也即是貨車司機需要等待三個人都完成才能發車,貨車司機此時手裡有個小本本,記錄本次搬運的總人數,執行緒未啟動時如下所示


1.png

當搬運工人開始工作時,每個搬運工人各自忙碌自己的任務,假如當工人1完成後,需要跟司機報備一下,說我已經完成任務了,這時候司機會在自己的小本本上記錄,工人1已經完成任務,此時還剩下兩個工人沒有完成任務。


2.png

每當工人完成自己手頭任務時,都會向司機報備,當所有工人都完成之後,此時工人的小本本記錄的完成人數都已完成,司機這時候就可以發車了,因為三個人已經完成了搬運工作。


3.png

通過上面的例子,大致瞭解了CountDownLatch的簡單原理,如何保證司機(state)記錄誰完成了誰沒完成呢?CountDownLatch內部通過AQS的state來完成計數器的功能,接下來通過原始碼來進行詳細分析:

public class CountDownLatch {
    /**
     * 同步控制,
     * 使用 AQS的state來表示計數。
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        // 初始化state值(也就是需要等待幾個執行緒完成任務)
        Sync(int count) {
            setState(count);
        }
        // 獲取state值。
        int getCount() {
            return getState();
        }
        // 獲得鎖。
        protected int tryAcquireShared(int acquires) {
            // 這裡判斷如果state=0的時候才能獲得鎖,反之獲取不到將當前執行緒放入到佇列中阻塞。
            // 這裡是關鍵點。
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // state進行減少,當state減少為0時,阻塞執行緒才能進行處理。
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    // 鎖物件。
    private final Sync sync;

    /**
     * 初始化同步鎖物件。
     */
    public CountDownLatch(int count) { 
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * 導致當前執行緒等待直到閂鎖倒計時到零,除非執行緒是被中斷。如果當前計數為零,則此方法立即返回。如果當前計數大於零,
     * 則當前執行緒將被禁用以進行執行緒排程並處於休眠狀態,直到發生以下兩種情況:
     * 1.計數達到零。
     * 2.如果當前執行緒被中斷。
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 等待計數器清零或被中斷,等待一段時間後如果還是沒有
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 使當前執行緒等待直到閂鎖倒計時到零,除非執行緒被中斷或指定的等待時間已過。
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * 返回state值。
     */
    public long getCount() {
        return sync.getCount();
    }
}

CountDownLatch原始碼看上去很少,通過CountDownLatch原始碼看到內部是基於AQS來進行實現的,內部類Sync類繼承自AbstractQueuedSynchronizer並且實現了tryAcquireSharedtryReleaseShared,通過建構函式看到,會建立一個AQS同步物件,並且將state值進行初始化,如果初始化count小於0則丟擲異常。

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 初始化AQS的state值。
    this.sync = new Sync(count);
}

根據上面的例子我們來看一下初始化情況下的AQS內部情況:


5.png

awit方法

當呼叫awit方法時,其實內部呼叫的AQS的acquireSharedInterruptibly方法,這個方法會呼叫Sync中tryAcquireShared的方法,通過上面例子,我們初始化時將state值初始化2,但是Sync中判斷(getState() == 0) ? 1 : -1;此時state值為2,判定為false,則返回-1,當返回負數時,內部會將當前執行緒掛起,並且放入AQS的佇列中,直到AQS的state值減少到0會喚醒當前執行緒,或者是當前執行緒被中斷,執行緒會丟擲InterruptedException異常,然後返回。

/**
 * 導致當前執行緒等待直到閂鎖倒計時到零,除非執行緒是被中斷。如果當前計數為零,則此方法立即返回。如果當前計數大於零,
 * 則當前執行緒將被禁用以進行執行緒排程並處於休眠狀態,直到發生以下兩種情況:
 * 1.計數達到零。
 * 2.如果當前執行緒被中斷。
 */
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

當執行緒呼叫await方法時,其實內部呼叫的是AQS的acquireSharedInterruptibly,我們來看一下AQS內部acquireSharedInterruptibly的方法

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 響應中斷
        if (Thread.interrupted())
            throw new InterruptedException();
        // 呼叫tryAcquireShared 方法。
        if (tryAcquireShared(arg) < 0)
            // 阻塞執行緒,將執行緒加入到阻塞佇列等到其他執行緒恢復執行緒。
            doAcquireSharedInterruptibly(arg);
    }
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireSharedInterruptibly內部呼叫的是CountDownLatch內部類Sync實現的tryAcquireShared方法,tryAcquireShared判斷state是否已經清空,也就是計數器是否已經清零了,清零時才能進行執行,此時並沒有進行清空,則會將當前執行緒掛起,並且將掛起的執行緒放入到AQS的阻塞佇列,等待其他執行緒喚醒動作。


6.png

coutDown方法

當執行緒執行完成後,會呼叫CountDownLatchcountDowncountDown方法內部呼叫的AQS的releaseSharedreleaseShared方法實現在Sync類中,該方法主要作用是將state計數器中的值,進行減1操作,先進行判斷是否已經將state值修改為0,如果修改為則不進行下面的操作,防止狀態已經修改為0時,其他執行緒還呼叫了countDown操作導致state值變為負數,當state值減少1時,會通知阻塞佇列中的等待執行緒,假設上面的例子其中一個執行緒先執行了countDown方法,則此時state=1,並且喚醒阻塞佇列中的執行緒,執行緒還是會去呼叫tryAcquireShared方法,發現還是返回-1,則還會將當前執行緒進行掛起阻塞並且加入到阻塞佇列中。此時佇列狀態如下所示:


7.png

當另外一個執行緒也執行完成,呼叫countDown時,state減少1則變為state=0,當這時候喚醒等待的執行緒時,tryAcquireShared返回的結果是1,則會直接返回成功。

總結

CountDownLatch是利用AQS的state來做計數器功能,當初始化CountDownLatch時,會將state值進行初始化,讓呼叫CountDownLatch的awit時,會判斷state計數器是否已經變為0,如果沒有變為0則掛起當前執行緒,並加入到AQS的阻塞佇列中,如果有執行緒呼叫了CountDownLatch的countDown時,這時的操作是將state計數器進行減少1,每當減少操作時都會喚醒阻塞佇列中的執行緒,執行緒會判斷此時state計數器是否已經都執行完了,如果還沒有執行完則繼續掛起當前執行緒,直到state計數器清零或執行緒被中斷為止。

喜歡的朋友可以關注我的微信公眾號,不定時推送文章


123.png