1. 程式人生 > >CountDownLatch、ReentrantLock原始碼解析

CountDownLatch、ReentrantLock原始碼解析

1.AQS

因業務中在用多執行緒並行執行程式碼塊中會用到CountDownLatch來控制多執行緒之間任務是否完成的通知,最近突然想去看一下CountDownLatch在await及喚醒是如何實現的,便開始了閱讀原始碼及查閱資料,然後打開了一個新大門。發現它是基於AbstractQueuedSynchronizer(下文簡稱AQS)框架實現的。

所以我們先了解AQS是幹什麼的。它提供的功能可以概括為兩點:獲取資源,如果獲取失敗加入等待佇列並且休眠該執行緒;釋放資源,同時檢查是否符合喚醒等待佇列中執行緒的條件,如符合就喚醒執行緒繼續執行。

ReentrantLock,CountDownLatch,ReentrantReadWriteLock等都是基於這個類做的。
它提供了幾個方法讓子類進行復寫以實現各自的功能:

tryAcquire(int arg)//嘗試獲取獨佔資源
tryRelease(int arg)//嘗試釋放獨佔資源
isHeldExclusively()//判斷當前執行緒是否獲取獨佔資源

tryAcquireShared(int arg)//嘗試獲取共享資源
tryReleaseShared(int arg)//嘗試釋放共享資源

這5個方法可以分為兩類獨佔類介面(前3個)和共享類介面(後兩個),因為一個子類中一般只需要(也應該如此,ReentrantReadWriteLock同時需要獨佔和共享,但也是分成兩個類來實現的)實現其中一類方法簇,所以作者並沒有把他們寫成抽象方法,這樣對AQS的子類更友好。

這裡對AQS獨佔和共享概念解釋一下,這是對AQS的資源(也就是state欄位)的描述。即在滿足可以獲取資源的條件後,在佇列中的等待的執行緒是喚醒一個(一個執行緒獨佔)還是說等待的執行緒都可以喚醒(共享)。

對於AQS的state欄位,也就是執行緒搶奪的資源,不同的子類有不同的定義,標題中提到的CountDownLatch和ReentrantLock正好是對state有不同的概念,看到對這兩個類的分析,大家就自然清楚了。AQS內部實現了對資源的獲取,釋放邏輯,讓子類實現的主要是嘗試獲取和釋放資源的場景邏輯。

下面對AQS內部邏輯進行分析,不過其子類根本不用關注這些邏輯,這些方法主要實現了:獲取資源,獲取失敗後加入佇列並且讓執行緒阻塞,釋放資源滿足執行緒監視的條件後,從等待佇列中剔除並喚醒相應的執行緒。,所以在看CountDownLatch,ReentrantLock程式碼時可以不細看這個方法的實現,先了解每個方法的作用,搞清楚子類的邏輯後,可以在慢慢研究AQS內部控制。

1.1 獲取共享資源

//AQS原始碼
public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

獲取資源的程式碼,首先呼叫tryAcquireShared 這個方法來判斷是否可以獲取到鎖(這個方法交友子類實現其判斷是否可以獲取的邏輯),如果可以獲取任何程式碼都不執行,如果不可以獲取就進入if執行doAcquireShared(arg)

 /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);//1
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {//2
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//3
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

因為不能獲取到資源,所有執行緒要加入到等待佇列中並且執行緒進入阻塞狀態。這裡要說一下AQS的等待佇列是由一個雙向連結串列實現的,節點Node有前後節點,當前執行緒,等待狀態值這幾個欄位組成。所以1處就是在連結串列中加入一個共享模式的節點。這個方法我就不寫了,裡面用了CAS+自旋的無鎖的方式 確保在多執行緒下可以正確插入。

在2處判斷當前節點是不是當前佇列中第一個節點(head頭節點就是一個空節點,head指向的下一個節點,才是等待佇列中的第一個節點執行緒),如果是第一個,在次檢測一下是否可以獲取到鎖,如果可以獲取鎖了,該執行緒就直接從佇列裡剔除,繼續執行了(可能有人問之前不是嘗試獲取過一次嗎,這裡幹嘛還在嘗試,我認為是為了減少執行緒無價值的狀態變更吧),如果依舊不能獲取資源,進行3處,shouldParkAfterFailedAcquire這個方法是判斷該執行緒該不該進行休眠

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {//2
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

可以看到通過LockSupport.park把執行緒置成阻塞狀態。

1.2 釋放共享資源

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {//1
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

可以看到1處是一個死迴圈,如果當前節點釋放後,會一直便利直到佇列為空或者進入不可喚醒狀態

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

釋放共享節點AQS從佇列中剔除並且uppark該執行緒。

2.CountDownLatch實現

CountDownLatch的核心就是實現AQS的Sync內部類

 private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {//1
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {//2
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

可以看到1,2處實現了tryAcquireShared和tryReleaseShared方法,結合上面AQS原始碼分析應該很容易明白了。

    public CountDownLatch(int count) {//CDL類構造方法
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public void countDown() {
        sync.releaseShared(1);
    }

可以看到await方法呼叫的AQS的方法,該方法 if (tryAcquireShared(arg) < 0) 執行嘗試呼叫的方法,這個由Sync實現,其實現就判斷當前state是否等於0,等於說明coutnDown呼叫初始化的次數,不用阻塞。countDown也就會對state-1在Sync使用CAS進行了處理。

3.ReentrantLock

可重入鎖提供了公平與非公平鎖兩種模式,他們的唯一區別就是在搶佔鎖的順序,所以在內部實現的公平與非公平也是在lock()方法有所區別,公平模式下,有一個FIFO的佇列進行排序。

這裡以非公平鎖為列

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

這裡state為0表示當前沒有執行緒持有該鎖,>0表示有執行緒持有該鎖,因為是可重入,所以當前執行緒lock一次,state就會+1,如下程式碼1處所示

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;//1
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

可重入鎖使用的是AQS的獨佔模式,在細節上有些不同,AQS也是有兩套方法,整個邏輯是一致的,不同點

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

在喚醒等待程序這塊,只會喚醒一次AQS佇列的第一個節點。在共享方法裡是一個迴圈。