1. 程式人生 > >ReentrantLock加鎖(lock())、釋放鎖(unlock())的實現

ReentrantLock加鎖(lock())、釋放鎖(unlock())的實現

一、簡介

ReentrantLock是JUC包下比較重要同樣也是比較常用的一個類,從類名可以看出它的功能:Lock是鎖,reentrant可重入,可重入鎖。在功能上ReentrantLock和synchronized是相似的,它可以通過這種方式實現加鎖和釋放鎖:

class X {
    private final ReentrantLock lock = new ReentrantLock();
    // ..
    public void m() {
      lock.lock();  // block until condition holds
      try {
        // ... method body
      } finally {
        lock.unlock()
      }
    }
  }}

要注意的是在加鎖和釋放鎖的時候一定要通過try-finally的方式來操作,防止在加鎖之後程式異常退出沒有呼叫 lock.unlock() 方法釋放鎖。
 ReentrantLock還可用通過Condition條件來實現等待和通知,不過這不是本文討論的重點,這裡就不再贅述。

二、實現原理

我們先來看一下ReentrantLock的lock()和unlock()方法的實現:

public void lock() {
    sync.lock();
}

public void unlock() {
    sync.release(1);
}

可以看到,加鎖和釋放鎖是通過sync物件的lock()方法和release()方法實現的。sync物件是什麼可以從它的定義和構造方法看出來:

/** Synchronizer providing all implementation mechanics */
private final Sync sync;

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

從定義的註釋我們可以看得出ReentrantLock的功能都是通過Sync實現的,而且Sync分為公平鎖和非公平鎖。

這裡我們先貼出FairSync和NonfairSync的類圖:

 

類圖

1.AbstractOwnableSynchronizer

我們先來看一下AbstractOwnableSynchronizer這個抽象類:

/**
 * A synchronizer that may be exclusively owned by a thread.  This
 * class provides a basis for creating locks and related synchronizers
 * that may entail a notion of ownership.  The
 * {@code AbstractOwnableSynchronizer} class itself does not manage or
 * use this information. However, subclasses and tools may use
 * appropriately maintained values to help control and monitor access
 * and provide diagnostics.
 *一個允許一個執行緒排他擁有的同步器,這個類提供了建立需要所有權概念的鎖和相關同步器的基礎。
 *AbstractOwnableSynchronizer類自己並不管理或使用任何資訊,然而,子類和使用工具類可能會適當
 *的使用它管理的值來控制監控獲取和提供診斷。
 * @since 1.6
 * @author Doug Lea
 */
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;

    /**
     * Empty constructor for use by subclasses.
     */
    protected AbstractOwnableSynchronizer() { }

    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * Sets the thread that currently owns exclusive access.
     * A {@code null} argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * {@code volatile} field accesses.
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * Returns the thread last set by {@code setExclusiveOwnerThread},
     * or {@code null} if never set.  This method does not otherwise
     * impose any synchronization or {@code volatile} field accesses.
     * @return the owner thread
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

setExclusiveOwnerThread(Thread thread) 這個方法設定當前擁有獨佔鎖的執行緒。
getExclusiveOwnerThread()這個方法返回當前擁有獨佔鎖的執行緒。
AbstractOwnableSynchronizer這個類通過setter/getter方法實現設定、獲取當前擁有獨佔鎖的執行緒,這個是一個非常簡單的類,我們就不再多說了。

2.AbstractQueuedSynchronizer簡單介紹

AbstractQueuedSynchronizer(以下簡寫為AQS)這個抽象類是ReentrantLock或者說是整個併發程式設計包下面鎖實現的關鍵所在,它定義了實現鎖的步驟(具體會在下面介紹),提供了幾個方法由子類實現,我們可以看到,雖然這個類中沒有抽象方法,但是有些方法是直接丟擲UnsupportedOperationException異常的。這些方法就是要由子類實現的:

protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
protected boolean isHeldExclusively()

ReentrantLock是獨佔鎖,它的內部類ReentrantLock.Sync和他的子類ReentrantLock.NonfairSync、ReentrantLock.FairSync並沒有實現tryAcquireShared(int arg)、tryReleaseShared(int arg)方法。
<p>到這裡我們先做一個小小的總結:ReentrantLock實現加鎖和釋放鎖是通過內部類ReentrantLock.NonfairSync、ReentrantLock.FairSync的方法實現的,這兩個類都繼承了AQS。接下來我們將分析ReentrantLock.NonfairSync、ReentrantLock.FairSync的具體實現。

3.FairSync、NonfairSync的實現

我們先來看一下具體實現程式碼:
非公平鎖:

/**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

公平鎖:

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

我們可以先比較一下公平鎖和非公平鎖的異同點:我們先看一下lock()方法,這裡再公平鎖和非公平鎖的lock()方法貼出來:

/**
 *非公平鎖的實現
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

/**
 *公平鎖的實現
 */
final void lock() {
    acquire(1);
}

acquire(int)方法是AQS定義的方法,我們可以先來看看這個方法的實現:

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire(int)我們知道,是由子類實現的(這裡體現了面試繼承的思想,用了模板模式),它的功能是嘗試獲取鎖,我們接下來會分析具體實現。addWaiter(Node.EXCLUSIVE)是生成一個新的節點新增到等待佇列裡,引數表示這個節點獲取鎖是以獨佔方式獲取。acquireQueued()這個方法是實現lock阻塞(其實是等待,上一篇文章講過,ReentrantLock是通過LockSupport.park()實現執行緒等待的,這個時候執行緒的狀態是WAITING)。acquireQueued()這個方法的具體實現接下來會講到。selfInterrupt()方法其實就是終端當前執行緒:Thread.currentThread().interrupt();至於為什麼會要終端當前執行緒,我們接下來也會講。
我們現在先來看一看tryAcquire(int)的實現,公平鎖和非公平鎖對這個方法的實現是不同的。先來看一看公平鎖對tryAcquire(int)的實現:

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

getState()可以獲取鎖當前被佔有的狀態(其實是重入次數,0的時候是沒有被任何執行緒佔有,大於0的時候是重入次數)。如果getState()不是0,說明當前鎖被執行緒佔用,如果佔用鎖的執行緒是當前執行緒,就表示是鎖重入,這個時候將狀態增加acquires並返回ture表示嘗試獲取鎖成功。如果getState()是0的話,表示現在沒有執行緒佔有鎖,這個時候通過hasQueuedPredecessors()去判斷等待佇列(等待佇列是什麼,是怎麼產生的我們後面會講到,現在只需要知道等待佇列是一個FIFO的佇列,每個佇列的節點會儲存等待獲取鎖的執行緒,在佇列前面的節點表示先嚐試獲取的執行緒節點)裡是否有先於當前執行緒的執行緒在等待,這個操作是公平鎖和非公平鎖區別所在,接下來講到非公平鎖的時候會詳細講解。如果沒有先於當前執行緒的執行緒在等待佇列裡等待,就通過compareAndSetState(0, acquires))方法來講state設定為acquires,這個步驟是通過CAS(CAS會在以後的文章中討論)實現的,因為state是一個臨界資源,有可能兩個執行緒同時獲取鎖的時候對這個值進行修改,所以通過CAS操作來保證執行緒安全的設定state。如果設定成功,則表示佔有鎖成功,,然後通過AbstractOwnableSynchronizer的setter方法將當前執行緒設定為當前佔有鎖的執行緒。然後返回true表示嘗試獲取鎖成功。這就是公平鎖嘗試獲取鎖的過程。
接下來我們看一下非公平鎖嘗試獲取鎖的過程:

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

我們可以看到非公平鎖和公平鎖在嘗試獲取鎖的唯一區別是:當前沒有執行緒佔有鎖的時候,公平鎖會先去等待佇列判斷有沒有先於當前執行緒的執行緒等待獲取鎖,而非公平鎖不會去判斷有沒有其他等待執行緒,而是直接去嘗試佔有鎖。
當嘗試佔有鎖成功後就會執行使用者自己的程式碼,如果嘗試佔有鎖失敗,這裡回到了AQS的acquire(int)方法,當tryAcquire(arg)返回false的時候,會執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。addWaiter(Node.EXCLUSIVE)是什麼東西呢?上文提到了等待佇列,這個方法就是等待佇列產生的地方。首先要知道AQS中定義了兩個屬性:

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

AQS中定義了佇列的頭節點和尾節點(這裡又涉及到了一個知識點volatile,不清楚的同學可以自行查閱資料,以後的文章也可能會討論),什麼是Node節點呢?我們先把程式碼貼出來:

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

我們可以看到,Node類定義了節點的先去節點和後繼節點(prev、next),至於nextWaiter是ReentrantLock的Condition要使用的屬性,不在本文討論範圍,所以大家可以先忽略這個屬性。Node還定義了thread,表示一個等待鎖的執行緒。waitStatus表示當前節點所處的狀態,我麼可以看到有1、0、-1、-2、-3五種狀態。
1(CANCELLED):表示當前節點被取消,通過lock()加鎖的節點是不會處於這種狀態的,只有通過lockInterruptibly()方法加鎖才會有這種狀態,因為通過lock()加鎖的時候,執行緒是不會響應中斷的,這點我們後面會詳細介紹。
0:表示節點剛被建立,是初始狀態。
-1(SIGNAL):表示一個節點的後繼正在被阻塞,所以這個節點釋放鎖或者被中斷後,必須unpark它的後繼節點。為了避免鎖的競爭,acquire()方法必須先保證節點處於SIGNAL狀態,然後再去自動的獲取鎖,如果失敗就阻塞。
-2(CONDITION):這個是Condition所使用的狀態,我們這裡可以不用關心,AQS所維護的佇列不會處於這個狀態,只有在AQS的內部類ConditionObject所維護的一個佇列的節點才會處於這個狀態。
-3(PROPAGATE):這個狀態我們也可以先不用關心。共享模式下才會用到這個狀態。
我們討論了這麼多關於Node類的內容,現在我們回到addWaiter(Node.EXCLUSIVE)這個方法上來,我們說過,AQS的等待佇列是由這個方法產生的,接下來我們看一下這個方法的實現:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

首先這個方法會先建立一個持有當前執行緒的Node物件,然後判斷尾節點是不是null,如果不是null,則把新建立的節點的前驅節點設定為尾節點,然後通過CAS操作將尾節點的指向設定為新建立的節點,如果成功,把原來的尾節點的後繼節點設定為新建立的節點。這個說法比較複雜,用一句話概括就是把新節點新增到佇列的尾部,也就是入隊。如果尾節點是null,或者compareAndSetTail(pred, node)這個方法失敗後,會呼叫enq(node)方法,我們看一下這個方法的實現:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

其實這個方法也是一個無鎖程式設計的典型例子,我們可以看到,當尾節點是空,這個時候表示等待佇列還沒有節點進入,也就是之前沒有發生鎖競爭的時候,或者說這是第一次發生鎖競爭,這個時候會新建一個空節點來作為隊首和隊尾。然後會將傳進來的節點新增到隊尾。這個時候我們對addWaiterr(Node mode)方法做個總結:這個方法是建立一個持有當前執行緒的Node物件,然後通過執行緒安全的方法將這個物件入隊並返回這個物件。
當addWaiter(Node.EXCLUSIVE), arg)方法返回節點之後,會呼叫acquireQueued()方法,這個方法也是實現ReentrantLock阻塞的關鍵。我們看一下具體的實現:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

我們傳進來的node引數是剛加入佇列的node,首先進入一個迴圈,為什麼會迴圈操作呢,接下來會討論。我們先看看迴圈內的具體實現:先獲得當前節點的前驅節點p,如果p是頭結點,說明前面並沒有先於當前節點等待的執行緒,這個時候就去嘗試獲取鎖,如果獲取成功就會把頭結點設定為當前節點(從這裡我們可以看出頭結點的兩個語義:頭結點可能是一個新建立的空Node物件,也可能是一個正在持有鎖的執行緒節點)。前驅節點是頭結點,並且 tryAcquire(arg)失敗,說明前驅結點的執行緒正在持有鎖還沒有釋放,或者說是前驅結點的執行緒持有的鎖被釋放了,這個時候有其他執行緒搶佔了鎖。這個時候我們就去執行shouldParkAfterFailedAcquire(p, node) 方法,這個方法的功能是判斷當前節點在獲取鎖失敗後是否需要阻塞。我們看一下這個方法的具體實現:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

如果當前節點的前驅節點是SIGNAL狀態,說明前驅節點在等待鎖,所以這個時候當前節點需要阻塞。如果前驅節點不是SIGNAL狀態而是CANCELLED狀態,就會把前驅節點以及前驅節點之前的直接關聯的狀態時CANCELLED狀態的節點跳過然後返回false,不讓當前執行緒阻塞,而是讓當前執行緒去執行外部的方法,也就是去繼續迴圈嘗試獲取鎖。如果先去節點的狀態不是SIGNAL、CANCELLED、狀態,在ReentrantLock裡它的狀態就應該是0狀態,也就是初始狀態,這二個時候說明前面沒有先於當前節點的節點等待獲取鎖,所以就把當前節點的狀態設定為SIGNAL狀態,表示當前節點是正在等待競爭鎖的,而不是SIGNAL的節點是會被阻塞的,根本沒有競爭鎖的機會。
當通過呼叫shouldParkAfterFailedAcquire方法判斷當前節點是否需要阻塞後,如果不需要阻塞,當前節點會迴圈呼叫這段程式碼去競爭鎖:

    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
         setHead(node);
         p.next = null; // help GC
         failed = false;
         return interrupted;
 }

如果呼叫shouldParkAfterFailedAcquire判斷當前節點需要被阻塞,就呼叫parkAndCheckInterrupt()方法,我們先看一下這個方法的實現:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

這個方法會呼叫LockSupport.park(this);來讓當前執行緒進入等待狀態。LockSupport.park(this);這個方法會響應執行緒中斷而結束,所以在這個方法結束後要判斷當前執行緒的中斷狀態,並返回當前執行緒的中斷狀態。我們之前說過,ReentrantLock的lock()方法是不會響應執行緒中斷的,原因就是在這:當執行緒被中斷後parkAndCheckInterrupt()返回true,這個時候只會把interrupted中斷標誌設定為true,然後還會迴圈去執行上面提到的程式碼判斷當前執行緒是不是有資格去競爭鎖。這個操作是和通過呼叫lockInterruptibly()實現中斷是不同的,lockInterruptibly()會響應執行緒中斷,他是通過呼叫AQS的doAcquireInterruptibly(int arg)方法實現的,有興趣的同學可以看一下這個方法的實現:

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這個方法和acquireQueued基本是一樣的,只是在 parkAndCheckInterrupt()判斷執行緒被中斷後丟擲了一個異常來推出迴圈操作。
繼續回到acquireQueued這個方法,什麼時候parkAndCheckInterrupt()方法呼叫LockSupport.park(this);會正常推出呢?一個獲得鎖的執行緒執行加鎖的程式碼塊後,會呼叫unlock()方法來實現鎖的釋放,這個時候unlock()內部的方法會呼叫LockSupport.unpark(Thread)方法,引數內傳入的執行緒物件是頭節點的後繼節點持有的執行緒,用來使阻塞在 parkAndCheckInterrupt()方法上的頭節點的後繼節點釋放。(因為頭節點代表著正在持有鎖的執行緒,頭節點的後繼節點表示等待鎖釋放的執行緒)。當acquireQueued方法結束後,我們再回到AQS的acquire方法,我們可以知道如果在acquireQueued方法阻塞過程中,如果中間中斷了執行緒,雖然不會實現執行緒中斷,但是acquireQueued方法會返回ture,這個時候selfInterrupt()方法會被執行,我們上文提到過,selfInterrupt()方法會調Thread.currentThread().interrupt();。為什麼要執行這個操作呢?因為方法acquireQueued在阻塞過程中如果執行緒中斷,這個時候依然會判斷當前這個執行緒所在等待佇列中的前驅結點是否是頭節點,如果不是還是會呼叫LockSupport.park()再次阻塞執行緒,在parkAndCheckInterrupt()返回是否是因為中斷結束的時候,使用的是Thread.interrupted()方法,這個方法在呼叫結束後會清除中斷狀態。所以如果不呼叫selfInterrupt()這個方法,當前執行緒的中斷狀態就是false。這裡呼叫selfInterrupt()來設定中斷狀態為true。
到此為止,我們已經討論完了ReentrantLock通過lock()加鎖的整個過程。看似比較複雜,但是如果能慢慢理一下,就會發現其實原理是蠻簡單的。

4、unlock()的實現

我們先來看看具體實現:

    public void unlock() {
        sync.release(1);
    }

unlock()方法是通過AQS的release(int)方法實現的,我們可以看一下:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

這個方法和acquire的思路一樣,也用了模板模式,tryRelease()是由子類實現的,我們來看一下ReentrantLock中的Sync對它的實現:

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

這個操作和tryAcquire的操作是一個相反的過程。先通過getState獲得狀態標識,如果這個標識和要釋放的數量相等,就會把當前佔有鎖的執行緒設定為null,實現鎖的釋放,然後返回true,否則把狀態標識減去releases再返回false。
釋放鎖之後,判斷頭結點是不是空,如果是空表示沒有等待的鎖的執行緒。如果不是空,再判斷頭結點的狀態是不是0,頭結點是0表示什麼呢?我們從前面的討論中可以知道,頭結點一般表示當前正在持有鎖的執行緒的節點,但是當頭結點是0時,頭結點表示獲取鎖第一次發生競爭後,初始化等待佇列被設定的節點,這個節點的後繼節點沒有被阻塞。所以這個時候我們也不需要做後續操作。如果頭結點不是0,就呼叫unparkSuccessor(Node)方法。我們來看一下這個方法:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

首先會先獲取該節點的狀態(這個時候是頭結點)。如果頭結點是負的,就說明後繼節點有可能等待被釋放。這個時候呼叫 compareAndSetWaitStatus(node, ws, 0)把節點的狀態設定為0,不過這個操作的結果不會影響後續的操作。然後獲取頭結點的後繼節點,判斷後繼節點的狀態不是CANCELLED狀態,如果是,這找到第一個不是CANNCELLED狀態的結點,然後使這個執行緒取消阻塞狀態。至此為止unlock()結束。



作者:醜星星
連結:https://www.jianshu.com/p/4c0a6744fc3f
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。