1. 程式人生 > >CountDownLatch原始碼探究 (JDK 1.8)

CountDownLatch原始碼探究 (JDK 1.8)

CountDownLatch能夠實現讓執行緒等待某個計數器倒數到零的功能,之前對它的瞭解也僅僅是簡單的使用,對於其內部如何實現執行緒等待卻不是很瞭解,最好的辦法就是通過看原始碼來了解底層的實現細節。CountDownLatch的原始碼並不是很複雜,因為其核心的功能是依賴AbstractQueuedSynchronizer(下文簡稱AQS)來實現的。CountDownLatch常用的方法很少,但是因為涉及到AQS,邏輯有些繞,要理清中間的邏輯稍微要費一些時間。

1.內部類Sync

CountDownLatch的核心功能是通過內部類Sync實現的,這個類繼承了AQS

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        //構造器,根據傳入的整數初始化狀態欄位state
        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        //tryAcquireShared唯一的作用是檢視狀態欄位是不是等於0
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            //自旋,在兩種條件下會退出自旋:a)state欄位已經為0;b)執行緒成功地將state欄位減1
            for (;;) {
                int c = getState();
                //如果state已經為0,就返回false
                if (c == 0)
                    return false;
                int nextc = c-1;
                //從下面的語句可以看到,只有當state=0才會返回true
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

2.構造器

CountDownLatch只有一個構造器,在構造器中會初始化sync欄位,結合Sync類的定義可知,構造器的唯一工作是將state欄位初始化為傳入的引數:

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

3.節點狀態waitStatus

等待的執行緒會構造成節點放在等待佇列中,節點的狀態waitStatus有如下幾種:

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

注意,在CountDownLatch中並沒有用到CONDITION狀態,因此後文將會直接忽略該狀態,當waitStatus > 0時,指的就是CANCELLED狀態。

4.核心方法

  • await()
    當計數器沒不等於0時,await()方法會讓當前執行緒掛起,該方法呼叫了AQS類的acquireSharedInterruptibly方法,如下:
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)  throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //顯然,tryAcquireShared方法只有在state=0時才返回1,表示計數器已歸零,此時方法直接返回,被阻塞的執行緒就可以繼續執行
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

通常,呼叫await()的執行緒在執行到acquireSharedInterruptibly方法時,計數器並不為0,那麼當前執行緒就需要執行doAcquireSharedInterruptibly方法中的阻塞邏輯了。由於該方法內部呼叫了三個主要方法:addWaitershouldParkAfterFailedAcquireparkAndCheckInterrupt,在解析的過程中難免會穿插對這些方法的介紹,從而引入跳躍性。為了避免跳躍性引發的閱讀和理解上的困難,這裡準備先介紹addWaiter方法。

  • addWaiter
    private Node addWaiter(Node mode) {
        //將當前執行緒構造成一個Node節點
        Node node = new Node(Thread.currentThread(), mode);
        //獲取尾節點
        Node pred = tail;
        //尾節點不為空,說明佇列已完成初始化
        if (pred != null) {
            //將node節點放到對尾,這裡的做法是先將node的prev指標指向尾節點,然後通過原子操作將新新增的node更新成尾節點,成功的話addWaiter方法結束
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                //原子操作成功的話,更新原尾節點的next指標
                pred.next = node;
                return node;
            }
        }
        //執行到這裡有兩種情況:1)尾節點為空,即佇列還沒初始化;2)佇列已初始化,但是上文將node節點設定成尾節點失敗,此時node節點還沒有真正新增進佇列
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //如果佇列還沒初始化,則先初始化,做法是將一個空節點作為頭結點,然後讓尾節點也指向這個空節點
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //這裡會一直自旋,直到成功地將node節點更新成尾節點
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter方法的主要作用就是將當前執行緒新增到等待佇列的隊尾,如果佇列還沒初始化,則先初始化,enq方法使用自旋避免入隊失敗。

  • doAcquireSharedInterruptibly
    接下來正式開始介紹doAcquireSharedInterruptibly方法,原始碼如下:
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //將當前執行緒新增到等待佇列,注意引數是Node.SHARED,下文會用到
        final Node node = addWaiter(Node.SHARED);
        //該欄位在state=0時才會被設定為false
        boolean failed = true;
        try {
            //又是自旋,該自旋的終止條件有兩種:1)state=0,計數器正常結束,執行return語句返回;2)執行緒響應中斷異常,跳出自旋
            for (;;) {
                //獲取node的前驅節點
                final Node p = node.predecessor();
                //如果前驅節點是頭結點,則執行if程式碼塊的邏輯
                if (p == head) {
                    //獲取state欄位的狀態,如果state=0則返回1,否則返回-1
                    int r = tryAcquireShared(arg);
                    //r>=0,說明計數器結束了,需要喚醒阻塞的執行緒
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        //計數器正常結束時,會將failed設定為false,避免執行finally中的語句
                        failed = false;
                        return;
                    }
                }
                //執行到這裡說明state!=0,真正的阻塞邏輯在parkAndCheckInterrupt方法裡
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            //如果執行緒被中斷,那麼failed=true,執行cancelAcquire方法
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireSharedInterruptibly先通過addWaiter方法將當前執行緒新增到等待佇列尾部,然後開始自旋。如果state欄位不為0,那麼會執行到末尾的條件語句:

    if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        throw new InterruptedException();

先來看看shouldParkAfterFailedAcquire幹了些什麼:

    //注意pred是node的前驅節點
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果已經是SIGNAL狀態,則之間返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //ws>0只能是cancelled狀態,此時通過修改指標將這些cancelled的節點從佇列刪除
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //如果前驅節點的狀態既不是SIGNAL,也不是CANCELLED,那麼只可能是0或者PROPAGATE,就把前驅節點的狀態更新為 Node.SIGNAL。注意:1)CONDITION狀態在CountDownLatch中並沒有用到;2)節點新建的時候狀態都是0,是在這裡才被修改成了SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

之前對節點的SIGNAL狀態是怎麼來的一直有點迷糊,看了上面的程式碼才發現是在最後一個else分支中設定的。從shouldParkAfterFailedAcquire原始碼瞭解到,該方法只有在前驅節點狀態是SIGNAL時才返回true,此時才有機會執行parkAndCheckInterrupt方法。parkAndCheckInterrupt是真正讓執行緒掛起的地方,來看看其原始碼:

    private final boolean parkAndCheckInterrupt() {
        //執行緒最終會阻塞在這裡,執行緒恢復之後也將從這裡繼續執行
        LockSupport.park(this);
        return Thread.interrupted();
    }

parkAndCheckInterrupt方法藉助LockSupport實現執行緒阻塞,被阻塞的執行緒在被喚醒後會返回當前執行緒的中斷狀態(注意Thread.interrupted()會清除執行緒的中斷狀態)。好了,到這裡整個邏輯就比較清楚了,如果執行緒是正常被喚醒(即state=0),那麼parkAndCheckInterrupt返回falsedoAcquireSharedInterruptibly方法會接著自旋一次,這裡再次將自旋程式碼貼出:

    for (;;) {
        //獲取node的前驅節點
        final Node p = node.predecessor();
        //如果前驅節點是頭結點,則執行if程式碼塊的邏輯
        if (p == head) {
            //獲取state欄位的狀態,如果state=0則返回1,否則返回-1
            int r = tryAcquireShared(arg);
            //r>=0,說明計數器結束了,需要喚醒阻塞的執行緒
            if (r >= 0) {
                setHeadAndPropagate(node, r);
                p.next = null; // help GC
                failed = false;
                return;
            }
        }
        //執行到這裡說明state!=0,真正的阻塞邏輯在parkAndCheckInterrupt方法裡
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
            throw new InterruptedException();
    }

那麼setHeadAndPropagate方法做了些什麼事呢,看看它的原始碼(刪掉了原始碼中的註釋):

    //回憶一下,顯然propagate=1,node是當前插入到對尾的新節點
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        //把node設定為頭結點
        setHead(node);
        //此時propagate > 0的條件已經滿足,直接執行if程式碼塊的邏輯
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //如果沒有下一個節點,或者下一個節點的isShared返回true,就釋放。還記得嗎,在構造新節點的時候addWaiter的引數是Node.SHARED,這裡就是判斷這個欄位
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

接下來看一下doReleaseShared是如何實現的:

    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            //獲取頭結點
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //如果頭結點的狀態是SIGNAL,那麼會將其狀態修改為0,該步驟一直自旋直到成功為止
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //成功修改頭結點的狀態後,會執行下面這個方法
                    unparkSuccessor(h);
                }
                //如果頭結點狀態已經改成0了,就再次將其狀態更新為Node.PROPAGATE,目的是???
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

頭結點的狀態成功更新為0後,會執行unparkSuccessor方法的邏輯,該方法原始碼如下:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //獲取後繼節點
        Node s = node.next;
        //如果沒有後繼節點,或者後繼節點是CANCELLED狀態,則執行下面的程式碼塊
        if (s == null || s.waitStatus > 0) {
            s = null;
            //從佇列末尾向開頭遍歷,找到靠近頭結點的第一個不為CANCELLED狀態的節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //找到這樣的非CANCELLED節點,就將其喚醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

unparkSuccessor的主要工作是將頭結點後面第一個非CANCELLED狀態的節點所對應的執行緒喚醒。

  • cancelAcquire
    到目前為止,並沒有發現CANCELLED狀態是在哪裡設定,因為還有一個方法沒有分析。doAcquireSharedInterruptibly中的finally語句塊會處理執行緒被中斷的情況,執行的是cancelAcquire方法的邏輯,其原始碼如下:
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
        //執行緒中斷後,將其對應的節點中儲存的執行緒清空
        node.thread = null;

        // Skip cancelled predecessors
        //從佇列中刪除狀態為CANCELLED的節點
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        //CANCELLED狀態在這裡設定
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        //如果當前是尾節點,其第一個非CANCELLED狀態的前驅節點設定為新的尾節點,pred後面的節點將會被GC回收。注意,下面的兩個原子操作,不管是否成功,都沒有重試
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                //當前執行緒對應的節點不是尾節點,其有後繼節點並且後繼節點不是CANCELLED狀態,通過修改指標將當前執行緒節點從佇列刪除
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                //根據前面的if條件,在以下幾種情況時會執行到這裡,喚醒node節點的後繼節點
                //1)pred=head,即當前被中斷的執行緒前面的所有執行緒都是CANCELLED狀態
                //2)pred!=head,但是pred節點的狀態不等於SIGNAL,且將pred節點的狀態修改為SIGNAL失敗
                //3)pred節點記錄的執行緒是null,目前已知頭結點的thread欄位確實為null,除此之外還有其他情況嗎???
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

分析到這裡,才剛把await()的邏輯分析完,但是僅僅分析程式碼仍然是不夠的,因為本人分析到這裡的時候,腦袋仍然是蒙的,主要原因是缺少一個全域性的認識。程式碼放在這裡都能看懂,但是程式碼為什麼這樣寫?當計數器結束(即state=0)時,佇列中的等待執行緒是一起全部換新,還是一個一個依次喚醒?執行緒被喚醒後重新執行doAcquireSharedInterruptibly中的自旋時,和第一次執行到底有哪些地方不一樣呢?因此,有必要對以上的邏輯進行整體梳理。
看完這部分原始碼之後,發現核心的邏輯都包含在doAcquireSharedInterruptibly中,現在是時候回過頭來整理一下該方法的邏輯了。
假設有現在有一個執行緒t1執行了await方法,由於等待佇列還沒初始化,因此先構造一個空的頭節點,並且把t1構造成節點加到佇列中,如下圖:

接著,在shouldParkAfterFailedAcquire方法中修改頭結點的狀態:

現在又有新的t2執行緒執行了await,此時佇列的結構將更新為下圖:

即每新增一個節點到等待隊尾,就將其前驅節點的狀態更新為Node.SIGNAL(即-1),然後所有的執行緒都阻塞在parkAndCheckInterrupt方法裡。現在,計數器已經結束,最後一個執行countDown方法的執行緒順帶執行了doReleaseShared方法,將頭結點的waitStatus更新成了0,如下圖:

繼續向下執行到unparkSuccessor方法,喚醒執行緒t1t1parkAndCheckInterrupt方法中醒來,繼續自旋。t1的前置節點就是頭結點head,且state=0t1開始執行setHeadAndPropagate,將自己設定為頭結點,並在setHead方法中將threadprev欄位都設定為空,如下圖:

執行緒t1接著執行doReleaseShared方法,把頭節點(此時t1就是頭結點)狀態更新為0,並喚醒t2,開始執行await之後的邏輯,如下圖:

喚醒t2後,t1退出await方法,此時佇列如下:

t2開始執行後,同樣把自己設定為頭結點,如下:

在執行setHeadAndPropagate方法時,t2沒有後繼節點了,仍然會執行doReleaseShared方法,但是在doReleaseShared方法中,t2即使頭結點也是尾節點,那就什麼也不做,直接結束並退出await方法,此時佇列裡就只剩下一個頭結點了。

  • countDown
    現在,終於可以開始看看countDown方法的邏輯了:
    public void countDown() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        //之前分析過,該方法會將state的值減1,並且只有在減1後state=0才會返回true,表示計數器結束了
        if (tryReleaseShared(arg)) {
            //喚醒後繼節點中第一個不為CANCELLED狀態的節點
            doReleaseShared();
            return true;
        }
        return false;
    }

當一個執行緒將state修改成0時,順便還要執行doReleaseShared方法,這個方法會將頭結點的後繼節點喚醒。
有一個小細節需要注意,doReleaseShared方法在原始碼中有兩個地方呼叫,一個入口就是剛講的countDown方法,另一個就是從await方法進入,在setHeadAndPropagate中呼叫,但是二者是有先後順序的是,是countDown方法喚醒最前面的執行緒之後,再由該執行緒依次喚醒後面的執行緒