1. 程式人生 > >Java技術之AQS詳解

Java技術之AQS詳解

AQS是AbstractQueuedSynchronizer的簡稱。AQS提供了一種實現阻塞鎖和一系列依賴FIFO等待佇列的同步器的框架,如下圖所示。AQS為一系列同步器依賴於一個單獨的原子變數(state)的同步器提供了一個非常有用的基礎。子類們必須定義改變state變數的protected方法,這些方法定義了state是如何被獲取或釋放的。鑑於此,本類中的其他方法執行所有的排隊和阻塞機制。子類也可以維護其他的state變數,但是為了保證同步,必須原子地操作這些變數。

image

AbstractQueuedSynchronizer中對state的操作是原子的,且不能被繼承。所有的同步機制的實現均依賴於對改變數的原子操作。為了實現不同的同步機制,我們需要建立一個非共有的(non-public internal)擴充套件了AQS類的內部輔助類來實現相應的同步邏輯。AbstractQueuedSynchronizer並不實現任何同步介面,它提供了一些可以被具體實現類直接呼叫的一些原子操作方法來重寫相應的同步邏輯。AQS同時提供了互斥模式(exclusive)和共享模式(shared)兩種不同的同步邏輯。一般情況下,子類只需要根據需求實現其中一種模式,當然也有同時實現兩種模式的同步類,如ReadWriteLock。接下來將詳細介紹AbstractQueuedSynchronizer的提供的一些具體實現方法

state狀態

AbstractQueuedSynchronizer維護了一個volatile int型別的變數,使用者表示當前同步狀態。volatile雖然不能保證操作的原子性,但是保證了當前變數state的可見性。至於volatile的具體語義,可以參考相關文章。state的訪問方式有三種:

  • getState()
  • setState()
  • compareAndSetState()

這三種叫做均是原子操作,其中compareAndSetState的實現依賴於Unsafe的compareAndSwapInt()方法。程式碼實現如下:

/**
     * The synchronization state.
     */
private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */
protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

自定義資源共享方式

AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個執行緒能執行,如ReentrantLock)和Share(共享,多個執行緒可同時執行,如Semaphore/CountDownLatch)。

不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:

  • isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。

原始碼實現

接下來我們開始開始講解AQS的原始碼實現。依照acquire-release、acquireShared-releaseShared的次序來。

acquire(int)

acquire是一種以獨佔方式獲取資源,如果獲取到資源,執行緒直接返回,否則進入等待佇列,直到獲取到資源為止,且整個過程忽略中斷的影響。該方法是獨佔模式下執行緒獲取共享資源的頂層入口。獲取到資源後,執行緒就可以去執行其臨界區程式碼了。下面是acquire()的原始碼:

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

通過註釋我們知道,acquire方法是一種互斥模式,且忽略中斷。該方法至少執行一次tryAcquire(int)方法,如果tryAcquire(int)方法返回true,則acquire直接返回,否則當前執行緒需要進入佇列進行排隊。函式流程如下:

  • tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
  • addWaiter()將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
  • acquireQueued()使執行緒在等待佇列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
  • 如果執行緒在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

接下來一次介紹相關方法。

tryAcquire(int)

tryAcquire嘗試以獨佔的方式獲取資源,如果獲取成功,則直接返回true,否則直接返回false。該方法可以用於實現Lock中的tryLock()方法。該方法的預設實現是丟擲UnsupportedOperationException,具體實現由自定義的擴充套件了AQS的同步類來實現。AQS在這裡只負責定義了一個公共的方法框架。這裡之所以沒有定義成abstract,是因為獨佔模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實現另一模式下的介面。

/**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * <p>This method is always invoked by the thread performing
     * acquire.  If this method reports failure, the acquire method
     * may queue the thread, if it is not already queued, until it is
     * signalled by a release from some other thread. This can be used
     * to implement method {@link Lock#tryLock()}.
     *
     * <p>The default
     * implementation throws {@link UnsupportedOperationException}.
     *
     * @param arg the acquire argument. This value is always the one
     *        passed to an acquire method, or is the value saved on entry
     *        to a condition wait.  The value is otherwise uninterpreted
     *        and can represent anything you like.
     * @return {@code true} if successful. Upon success, this object has
     *         been acquired.
     * @throws IllegalMonitorStateException if acquiring would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

addWaiter(Node)

該方法用於將當前執行緒根據不同的模式(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)加入到等待佇列的隊尾,並返回當前執行緒所在的結點。如果佇列不為空,則以通過compareAndSetTail方法以CAS的方式將當前執行緒節點加入到等待佇列的末尾。否則,通過enq(node)方法初始化一個等待佇列,並返回當前節點。原始碼如下:

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

enq(node)

enq(node)用於將當前節點插入等待佇列,如果佇列為空,則初始化當前佇列。整個過程以CAS自旋的方式進行,直到成功加入隊尾為止。原始碼如下:

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued(Node, int)

acquireQueued()用於佇列中的執行緒自旋地以獨佔且不可中斷的方式獲取同步狀態(acquire),直到拿到鎖之後再返回。該方法的實現分成兩部分:如果當前節點已經成為頭結點,嘗試獲取鎖(tryAcquire)成功,然後返回;否則檢查當前節點是否應該被park,然後將該執行緒park並且檢查當前執行緒是否被可以被中斷。

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        //標記是否成功拿到資源,預設false
        boolean failed = true;
        try {
            boolean interrupted = false;//標記等待過程中是否被中斷過
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire(Node, Node)

shouldParkAfterFailedAcquire方法通過對當前節點的前一個節點的狀態進行判斷,對當前節點做出不同的操作,至於每個Node的狀態表示,可以參考介面文件

/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt()

該方法讓執行緒去休息,真正進入等待狀態。park()會讓當前執行緒進入waiting狀態。在此狀態下,有兩種途徑可以喚醒該執行緒:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()會清除當前執行緒的中斷標記位。

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

我們再回到acquireQueued(),總結下該函式的具體流程:

  • 結點進入隊尾後,檢查狀態,找到安全休息點;
  • 呼叫park()進入waiting狀態,等待unpark()或interrupt()喚醒自己;
  • 被喚醒後,看自己是不是有資格能拿到號。如果拿到,head指向當前結點,並返回從入隊到拿到號的整個過程中是否被中斷過;如果沒拿到,繼續流程1。

最後,總結一下acquire()的流程:

  • 呼叫自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
  • 沒成功,則addWaiter()將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
  • acquireQueued()使執行緒在等待佇列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
  • 如果執行緒在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

release(int)

release(int)方法是獨佔模式下執行緒釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待佇列裡的其他執行緒來獲取資源。這也正是unlock()的語義,當然不僅僅只限於unlock()。下面是release()的原始碼:

/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

/**
     * Attempts to set the state to reflect a release in exclusive
     * mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this object is now in a fully released
     *         state, so that any waiting threads may attempt to acquire;
     *         and {@code false} otherwise.
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

與acquire()方法中的tryAcquire()類似,tryRelease()方法也是需要獨佔模式的自定義同步器去實現的。正常來說,tryRelease()都會成功的,因為這是獨佔模式,該執行緒來釋放資源,那麼它肯定已經拿到獨佔資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮執行緒安全的問題。但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該執行緒是否已經完成釋放掉資源了!所以自義定同步器在實現時,如果已經徹底釋放資源(state=0),要返回true,否則返回false。

unparkSuccessor(Node)方法用於喚醒等待佇列中下一個執行緒。這裡要注意的是,下一個執行緒並不一定是當前節點的next節點,而是下一個可以用來喚醒的執行緒,如果這個節點存在,呼叫unpark()方法喚醒。

總之,release()是獨佔模式下執行緒釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待佇列裡的其他執行緒來獲取資源。

acquireShared(int)

acquireShared(int)方法是共享模式下執行緒獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待佇列,直到獲取到資源為止,整個過程忽略中斷。下面是acquireShared()的原始碼:

/**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

doAcquireShared(int)

將當前執行緒加入等待佇列尾部休息,直到其他執行緒釋放資源喚醒自己,自己成功拿到相應量的資源後才返回。原始碼如下:

/**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

跟獨佔模式比,還有一點需要注意的是,這裡只有執行緒是head.next時(“老二”),才會去嘗試獲取資源,有剩餘的話還會喚醒之後的隊友。那麼問題就來了,假如老大用完後釋放了5個資源,而老二需要6個,老三需要1個,老四需要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會繼續park()等待其他執行緒釋放資源,也更不會去喚醒老三和老四了。獨佔模式,同一時刻只有一個執行緒去執行,這樣做未嘗不可;但共享模式下,多個執行緒是可以同時執行的,現在因為老二的資源需求量大,而把後面量小的老三和老四也都卡住了。當然,這並不是問題,只是AQS保證嚴格按照入隊順序喚醒罷了(保證公平,但降低了併發)。實現如下:

/**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

此方法在setHead()的基礎上多了一步,就是自己甦醒的同時,如果條件符合(比如還有剩餘資源),還會去喚醒後繼結點,畢竟是共享模式!至此,acquireShared()也要告一段落了。讓我們再梳理一下它的流程:

  • tryAcquireShared()嘗試獲取資源,成功則直接返回;
  • 失敗則通過doAcquireShared()進入等待佇列park(),直到被unpark()/interrupt()併成功獲取到資源才返回。整個等待過程也是忽略中斷的。

releaseShared(int)

releaseShared(int)方法是共享模式下執行緒釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待執行緒,它會喚醒等待佇列裡的其他執行緒來獲取資源。下面是releaseShared()的原始碼:

/**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

此方法的流程也比較簡單,一句話:釋放掉資源後,喚醒後繼。跟獨佔模式下的release()相似,但有一點稍微需要注意:獨佔模式下的tryRelease()在完全釋放掉資源(state=0)後,才會返回true去喚醒其他執行緒,這主要是基於獨佔下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的執行緒併發執行,那麼擁有資源的執行緒在釋放掉部分資源時就可以喚醒後繼等待結點。


/**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

原文 Java技術之AQS詳解