1. 程式人生 > >Java原始碼解析之可重入鎖ReentrantLock(一)

Java原始碼解析之可重入鎖ReentrantLock(一)

本文基於jdk1.8進行分析。

ReentrantLock是一個可重入鎖,在ConcurrentHashMap中使用了ReentrantLock。

首先看一下原始碼中對ReentrantLock的介紹。如下圖。ReentrantLock是一個可重入的排他鎖,它和synchronized的方法和程式碼有著相同的行為和語義,但有更多的功能。ReentrantLock是被最後一個成功lock鎖並且還沒有unlock的執行緒擁有著。如果鎖沒有被別的執行緒擁有,那麼一個執行緒呼叫lock方法,就會成功獲取鎖並返回。如果當前執行緒已經擁有該鎖,那麼lock方法會立刻返回。這個可以通過isHeldByCurrentThread方法和getHoldCount方法進行驗證。除了這部分介紹外,類前面的javadoc文件很長,就不在這裡全部展開。隨著後面介紹原始碼,會一一涉及到。

/**
 * A reentrant mutual exclusion {@link Lock} with the same basic
 * behavior and semantics as the implicit monitor lock accessed using
 * {@code synchronized} methods and statements, but with extended
 * capabilities.
 *
 * <p>A {@code ReentrantLock} is <em>owned</em> by the thread last
 * successfully locking, but not yet unlocking it. A thread invoking
 * {@code lock} will return, successfully acquiring the lock, when
 * the lock is not owned by another thread. The method will return
 * immediately if the current thread already owns the lock. This can
 * be checked using methods {@link #isHeldByCurrentThread}, and {@link
 * #getHoldCount}.

首先看一下成員變數,如下圖。ReentrantLock只有一個成員變數sync,即同步器,這個同步器提供所有的機制。Sync是AbstractQueuedSynchronizer的子類,同時,Sync有2個子類,NonfairSync和FairSync,分別是非公平鎖和公平鎖。Sync,NonfaireSync和FairSync的具體實現後面再講。

    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

下面看一下建構函式。如下圖。可以看到,ReentrantLock預設是非公平鎖,它可以通過引數,指定初始化為公平鎖或非公平鎖。

    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

下面看一下ReentrantLock的主要方法。首先是lock方法。如下圖。lock方法的實現很簡單,就是呼叫Sync的lock方法。而Sync的lock方法是個抽象的,具體實現在NonfairSync和FairSync中。這裡我們先不展開講,而是先讀一下lock方法的註釋,看看它的作用。lock方法的作用是獲取該鎖。分為3種情況。

1,如果鎖沒有被別的執行緒佔有,那麼當前執行緒就可以獲取到鎖並立刻返回,並把鎖計數設定為1。

2,如果當前執行緒已經佔有該鎖了,那麼就會把鎖計數加1,立刻返回。

3,如果鎖被另一個執行緒佔有了,那麼當前執行緒就無法再被執行緒排程,並且開始睡眠,直到獲取到鎖,在獲取到到鎖時,會把鎖計數設定為1。

lockInterruptibly方法與lock功能類似,但lockInterruptibly方法在等待的過程中,可以響應中斷。

    /**
     * Acquires the lock.
     *
     * <p>Acquires the lock if it is not held by another thread and returns
     * immediately, setting the lock hold count to one.
     *
     * <p>If the current thread already holds the lock then the hold
     * count is incremented by one and the method returns immediately.
     *
     * <p>If the lock is held by another thread then the
     * current thread becomes disabled for thread scheduling
     * purposes and lies dormant until the lock has been acquired,
     * at which time the lock hold count is set to one.
     */
    public void lock() {
        sync.lock();
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

下面,詳細看一下非公平鎖和公平鎖中對lock函式的實現。如下圖。下圖同時列出了公平鎖和非公平鎖中lock的實現邏輯。從註釋和程式碼邏輯中,都可以看出,非公平鎖進行lock時,先嚐試立刻闖入(搶佔),如果成功,則獲取到鎖,如果失敗,再執行通常的獲取鎖的行為,即acquire(1)。

        /**
         * 非公平鎖中的lock
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        //公平鎖中的lock
        final void lock() {
            acquire(1);
        }

那麼,我們首先了解下,非公平鎖“嘗試立刻闖入”,究竟做了什麼。稍後再繼續講解通常的獲取鎖的行為。下圖是立即闖入行為compareAndSetState(0, 1)的實現。從compareAndSetState函式的註釋中,可以知道,如果同步狀態值與期望值相等,那麼就把它的值設定為updated值。否則同步狀態值與期望值不相等,則返回false。這個操作和volatile有著相同的記憶體語義,也就是說,這個操作對其他執行緒是可見的。compareAndSetState函式註釋裡描述的功能,是通過unsafe.compareAndSwapInt方法實現的,而unsafe.compareAndSwapInt是一個native方法,是用c++實現的。那麼繼續追問,c++底層是怎麼實現的?C++底層是通過CAS指令來實現的。什麼是CAS指令呢?來自維基百科的解釋是,CAS,比較和交換,Compare and Swap,是用用於實現多執行緒原子同步的指令。它將記憶體位置的內容和給定值比較,只有在相同的情況下,將該記憶體的值設定為新的給定值。這個操作是原子操作。那麼繼續追問,CAS指令的原子性,是如何實現的呢?我們都知道指令時CPU來執行的,在多CPU系統中,記憶體是共享的,記憶體和多個cpu都掛在總線上,當一個CPU執行CAS指令時,它會先將匯流排LOCK位點設定為高電平。如果別的CPU也要執行CAS執行,它會發現匯流排LOCK位點已經是高電平了,則無法執行CAS執行。CPU通過LOCK保證了指令的原子執行。

現在來看一下非公平鎖的lock行為,compareAndSetState(0, 1),它期望鎖狀態為0,即沒有別的執行緒佔用,並把新狀態設定為1,即標記為佔用狀態。如果成功,則非公平鎖成功搶到鎖,之後setExclusiveOwnerThread,把自己設定為排他執行緒。非公平鎖這小子太壞了。如果搶佔失敗,則執行與公平鎖相同的操作。

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

下面看一下公平鎖獲取鎖時的行為。如下圖。這部分的邏輯有些多,請閱讀程式碼中的註釋進行理解。

    /**
     * 公平鎖的lock
     */
    final void lock() {
        acquire(1);
    }
    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        /**
         * acquire首先進行tryAcquire()操作。如果tryAcquire()成功時則獲取到鎖,即刻返回。
         * 如果tryAcquire()false時,會執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
         * 操作。如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg)true時,則當前執行緒中斷自己。
         * 如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg)false,則返回。
         * 其中tryAcquire()操作在NonfairSync中和FairSync中實現又有所區別。
         */
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * NonfairSync中的tryAcquire。
     * @param acquires
     * @return
     */
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //首先獲取當前同步狀態值
        int c = getState();
        if (c == 0) {
            //c為0,表示目前沒有執行緒佔用鎖。沒有執行緒佔用鎖時,當前執行緒嘗試搶鎖,如果搶鎖成功,則返回true。
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            //c不等於0時表示鎖被執行緒佔用。如果是當前執行緒佔用了,則將鎖計數加上acquires,並返回true。
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        //以上情況都不是時,返回false,表示非公平搶鎖失敗。
        return false;
    }
    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     * 這個是公平版本的tryAcquire
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //c=0時表示鎖未被佔用。這裡是先判斷佇列中前面是否有別的執行緒。沒有別的執行緒時,才進行CAS操作。
            //公平鎖之所以公平,正是因為這裡。它發現鎖未被佔用時,首先判斷等待佇列中是否有別的執行緒已經在等待了。
            //而非公平鎖,發現鎖未被佔用時,根本不管佇列中的排隊情況,上來就搶。
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     * 當搶鎖失敗時,先執行addWaiter(Node.EXCLUSIVE),將當前執行緒加入等待佇列,再執行該方法。
     * 該方法的作用是中斷當前執行緒,並進行檢查,知道當前執行緒是佇列中的第一個執行緒,並且搶鎖成功時,
     * 該方法返回。
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

後續接Java原始碼解析之可重入鎖ReentrantLock(二)