1. 程式人生 > >Java concurrent AQS 源碼詳解

Java concurrent AQS 源碼詳解

ted node thread range skip nothing 共享 mea 需要

一、引言

  AQS(同步阻塞隊列)是concurrent包下鎖機制實現的基礎,相信大家在讀完本篇博客後會對AQS框架有一個較為清晰的認識

  這篇博客主要針對AbstractQueuedSynchronizer的源碼進行分析,大致分為三個部分:

    1.   靜態內部類Node的解析
    2.   重要常量以及字段的解析
    3.   重要方法的源碼詳解。

  所有的分析僅基於個人的理解,若有不正之處,請諒解和批評指正,不勝感激!!!

二、Node解析

  AQS在內部維護了一個同步阻塞隊列,下面簡稱sync queue,該隊列的元素即靜態內部類Node的實例

  首先來看Node中涉及的常量定義,源碼如下

        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static
final int CANCELLED = 1; /** waitStatus value to indicate successor‘s thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate
*/ static final int PROPAGATE = -3;

  以下兩個均為Node#nextWaiter字段的可取值

    SHARED:若Node#nextWaiter為SHARED,那麽表明該Node節點處於共享模式

    EXCLUSIVE:若Node#nextWaiter為EXCLUSIVE,那麽表明該Node節點處於獨占模式

  以下五個均為Node#waitStatus字段的可取值

    CANCELLED:用於標記一個已被取消的節點,一旦Node#waitStatus的值被設為CANCELLED,那麽waitStatus的值便不再被改變

    SIGNAL標記一個節點(記為node)處於這樣一種狀態:當node釋放資源(unlock/release)時,node節點必須喚醒其後繼節點

    CONDITION:用於標記一個節點位於條件變量的阻塞隊列中(我稱這個阻塞隊列為Condition list),本篇暫不介紹Condition相關源碼,因此讀者可以暫時忽略

    PROPAGATE:僅用於標記sync queue頭節點,且為一種暫時狀態,表明共享狀態正在傳遞中(如果有剩余資源且sync queue中仍有等待的節點,那麽這些節點會依次獲取資源,直至資源消耗殆盡或者隊列為空),僅在共享模式中出現

  其次,再看Node中重要字段,源碼如下

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn‘t need to
         * signal. So, most code doesn‘t need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev‘s from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

   waitStatus:節點的狀態,可取值有五種,分別是SIGNAL、CANCEL、CONDITION、PROPAGATE、0。其中獨占模式僅涉及到SIGNAL、CANCEL、0三種狀態,共享模式僅涉及到SIGNAL、CANCEL、PROPAGATE、0四種狀態。CONDITION狀態不會出現在sync queue中,而是位於條件變量的Condition list中,本篇博客暫不討論ConditoinObject

  pre:前繼節點,該字段通過CAS操作進行賦值,保證可靠(現在不理解沒關系,後面的方法解析會多次提到)

  next:後繼節點,該字段的賦值操作是非線程安全的,即next是不可靠的(Node#next為null並不代表節點不存在後繼)。但是,一旦next不為null,那麽next也是可靠的(現在不理解沒關系,後面的方法解析會多次提到)

  thread:該節點關聯的線程

  nextWaiter:獨占模式中就是null,共享模式中就是SHARED。在ConditionObject的Condition list中指向下一個節點

  註意:Condition list用nextWaiter來連接單向鏈表(pre與next是無用的),sync queue利用pre和next來連接雙向鏈表(nextWaiter僅用於標記獨占或者共享模式而已),不要搞混了!!!

三、AQS字段解析

  AQS字段僅有三個,源碼如下

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;

   head:sync queue隊列的頭節點

  tail:sync queue隊列的尾節點

  state:資源狀態

四、重要方法解析

4.1 acquire

  該方法是獨占模式下的入口方法,可以相應interrupt,但是不會拋出InterruptedException異常

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        //首先執行tryAcquire(arg)嘗試獲取資源,如果成功則直接返回
        //如果tryAcquire(arg)獲取資源失敗,則講當前線程封裝成Node節點加入到sync queue隊列中,並通過acquireQueued進行獲取資源直至成功(如果尚未有資源可獲取,那麽acquireQueued會阻塞當前線程)
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }    

  

  其中tryAcquire方法如下,該方法的具體含義交給AQS的子類去完成,註意,該方法的實現不可有任何耗時操作,更不可阻塞線程,僅實現是否可獲取資源(換言之,是否可獲取鎖)的邏輯即可,源碼如下

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

  

  addWaiter的作用是:將當前線程封裝成一個Node節點,並且添加到sync queue中

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        //生成指定模式的Node節點
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //以下幾行進行入隊操作,如果失敗,交給enq進行入隊處理。其實,我認為可以直接調用enq,不知道作者設置如下幾行的意圖
        if (pred != null) {
            node.prev = pred;
            //通過CAS操作串行化並發入隊操作,僅有一個線程會成功,由於node節點的prev字段是在CAS操作之前進行的,一旦CAS操作成功,node節點的prev字段就是指向了其前繼節點,因此說prev字段是安全的
            if (compareAndSetTail(pred, node)) {
                //這裏直接通過賦值操作賦值next字段,註意,可能有別的線程會在next字段賦值之前訪問到next字段,因此next字段是非可靠的(一個節點的next字段為null並不代表該節點沒有後繼)
                pred.next = node;
                //一旦next字段賦值成功,那麽next字段又變為可靠的了
                return node;
            }
        }
        //通過enq入隊
        enq(node);
        return node;
    }

  enq入隊,源碼如下

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node‘s predecessor
     */
    private Node enq(final Node node) {
        //死循環進行入隊操作,CAS操作常規模式
        for (;;) {
            Node t = tail;
            //此時隊列為空,需要初始化
            if (t == null) { // Must initialize
                //此時可能多個線程都在執行該方法,因此只有一個線程才能初始化sync queue,此處添加的節點我稱之為Dummy Node,該節點沒有關聯線程
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //以下四行與addWaiter類似,不再贅述
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }                

  這裏拋出一個問題:在初始化sync queue中,將一個new Node()設置為了sync queue的頭結點,該節點沒有關聯任何線程,我稱之為"Dummy Node",這個頭結點"Dummy Node"待會可能會被設置為SIGNAL狀態,那麽它是如何喚醒後繼節點的呢?我會在在講到release時進行解釋

  到這裏,線程已被封裝成節點,並且成功添加到sync queue中去了,接下來,來看最重要的acquireQueued方法

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        //用於記錄是否獲取成功,我現在還不清楚何時會失敗= =
        boolean failed = true;
        try {
            //記錄是否被中斷過,如果被中斷過,則需要在acquire方法中恢復中斷現場
            boolean interrupted = false;
            //同樣的套路,CAS配合死循環
            for (;;) {
                //獲取node節點的前繼節點p
                final Node p = node.predecessor();
                //當p為sync queue頭結點時,才有資格嘗試獲取資源,換言之,當且僅當一個節點是sync queue中第二個節點時,它才有資格獲取資源
                if (p == head && tryAcquire(arg)) {
                    //一旦獲取成功,以下語句都是線程安全的,所有字段直接賦值即可,不需要CAS或者加鎖
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //否則,找到前繼節點,並將其設置為SIGNAL狀態後阻塞自己
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                //如果失敗了
                cancelAcquire(node);
        }
    }

  該方法的主要邏輯就是:不斷地通過死循環執行獲取資源(當且僅當節點是sync queue中第二個節點時才有資格獲取資源)或者阻塞自己的操作,只有成果獲取資源後才能夠返回

  接下來,來看shouldParkAfterFailedAcquire方法以及parkAndCheckInterrupt方法

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node‘s predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //一旦發現前繼節點是SIGNAL狀態,就返回true,在acquireQueued方法中會阻塞當前線程
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            //這裏給出兩個問題:
                 //1.如果在當前線程阻塞之前,前繼節點就喚醒了當前線程,那麽當前線程不就永遠阻塞下去了嗎?
                 //2.萬一有別的線程更改了前繼節點的狀態,導致前繼節點不喚醒當前線程,那麽當前線程不就永遠阻塞下去了嗎?
            return true;

        //如果前繼節點處於CANCELL狀態(僅有CANCELL狀態大於0)
        if (ws > 0) {
            //那麽跳過那些被CANCELL的節點,先前找到第一個有效節點
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        //前繼節點狀態要麽是0,要麽是PROPAGATE,將其通過CAS操作設為SIGNAL,不用管是否成功,退回到上層函數acquireQueued進行再次判斷
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don‘t park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

  該方法的主要邏輯就是:將前繼節點設置為SIGNAL

  關於上面提到的兩個問題

    1. 如果在當前線程阻塞之前,前繼節點就喚醒了當前線程,那麽當前線程不就永遠阻塞下去了嗎?--->AQS采用的是Unsafe#park以及Unsafe#unpark,這對方法能夠很好的處理這類問題,可以先unpark獲取一枚許可,然後執行park不會阻塞當前線程,而是消耗這個提前獲取的許可,註意,多次unpark僅能獲取一枚許可

    2.萬一有別的線程更改了前繼節點的狀態,導致前繼節點不喚醒當前線程,那麽當前線程不就永遠阻塞下去了嗎?--->一旦一個節點被設為SIGNAL狀態,AQS框架保證,任何改變其SIGNAL狀態的操作都會喚醒其後繼節點,因此,只要節點看到其前繼節點為SIGNAL狀態,便可放心阻塞自己

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        //返回是否被中斷過
        return Thread.interrupted();
    }

  至此,獨占模式的acquire調用鏈分析完畢,總結一下:首先嘗試獲取鎖(tryAcquire),若成功則直接返回。若失敗,將當前線程封裝成Node節點加入到sync queue隊列中,當該節點位於第二個節點時,會重新嘗試獲取鎖,成功則返回,失敗則阻塞自己,直至前繼節點喚醒自己

  AQS通過死循環以及CAS操作來串行化並發操作,並且通過這種適當的自旋加阻塞,來減少頻繁的加鎖解鎖操作

4.2 release

  release方法是獨占模式下釋放資源(即解鎖)的入口,源碼如下

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        //調用tryRelease嘗試釋放資源
        if (tryRelease(arg)) {
            Node h = head;
            //只要頭節點不為空且狀態不為0,就喚醒後繼節點,對於獨占模式也就只有SIGNAL狀態一種,頭結點在任何情況下都不可能為CANCELL狀態
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

  在此,解釋一下enq方法中提到的問題,即那個"Dummy Node"如何喚醒後繼:由於"Dummy Node"不關聯任何線程,因此真正的喚醒操作實際上是由外部的線程來完成的,這裏的外部線程是指從未進入sync queue的線程,因此,"Dummy Node"節點設置為SIGNAL狀態,也能夠正常喚醒後繼

  同理,tryRelease也是交給AQS子類實現的方法,只需要定義釋放資源的邏輯即可,該方法的實現不應該有耗時的操作,更不該阻塞

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

  通過unparkSuccessor方法喚醒指定節點的後繼節點

    /**
     * Wakes up node‘s successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        //若節點狀態小於0,將其通過CAS操作改為0,表明本次SIGNAL的任務已經完成,至於CAS是否成功,或者是否再次被其他線程修改,都與本次無關unparkSuccessor無關,只是該節點被賦予了新的任務而已。
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //這裏通過非可靠的next字段直接獲取後繼,如果非空,那麽說明該字段可靠,如果為空,那麽利用可靠的prev字段從tail向前找到當前node節點的後繼節點
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚醒後繼節點
        if (s != null)
            LockSupport.unpark(s.thread);
    }        

4.3 acquireShared

待續

4.4 releaseShared

待續

  

  

Java concurrent AQS 源碼詳解