1. 程式人生 > >java 佇列同步器AbstractQueuedSynchronizer(AQS)實現分析

java 佇列同步器AbstractQueuedSynchronizer(AQS)實現分析

AQS 

  1. 內部使用一個int變數state表示同步狀態。
  2. 內部使用一個隱式的FIFO佇列(並沒有宣告這樣一個佇列,只是通過每個節點記錄它的上下節點來從邏輯上產生一個佇列)來完成阻塞執行緒的排隊。

這個FIFO佇列在 AQS 中被定義為一個內部類Node:

  1. EXCLUSIVE、SHARED:

            節點的兩種模式:獨佔模式和共享模式,分別對應獨佔鎖和共享鎖。

  1. Thread:就是節點對應的執行緒。
  2. waitStatus:
  1. 取消狀態CANCELLED。
  2. 通知狀態SIGNAL。
  3. 條件阻塞狀態CONDITION。
  4. 傳播狀態PROPAGATE。
  1. prev:前驅節點
  2. next:後繼節點
  3. nextWaiter:
    1. 等待隊列表示 後繼節點
    2. 在作為同步佇列節點時,nextWaiter可能有兩個值:EXCLUSIVE、SHARED 標識當前節點是獨佔模式還是共享模式。

 

 


static final class Node {
    /**
     * 共享鎖模式
     */
    static final Node SHARED = new Node();
    /**
     * 獨佔鎖模式
     */
    static final Node EXCLUSIVE = null;

    /**
     * 取消狀態,在同步佇列中等待的執行緒等待超時或被中斷,需要在同步佇列中取消等待,節點進入該狀態不在變化
     */
    static final int CANCELLED = 1;
    /**
     * 通知狀態,當前節點的後繼節點處於等待執行狀態
     * 當前節點的執行緒如果釋放了同步狀態或者被取消,將通知後繼結點,使得後繼結點執行緒執行。
     */
    static final int SIGNAL = -1;

    /**
     * 條件阻塞狀態
     * 節點執行緒等待在Condition上,
     * 當其他執行緒對Condition呼叫了signal()方法後,
     * 該節點將會從等待佇列中轉移到同步佇列中,
     * 加入到對同步狀態的獲取中。
     */
    static final int CONDITION = -2;
    /**
     * 傳播狀態,表示當前場景下後續的acquireShared能夠得以執行。
     */
    static final int PROPAGATE = -3;
    /**
     * 節點的的狀態
     * 初始狀態為0  表示當前節點在sync佇列中,等待著獲取狀態。
     */
    volatile int waitStatus;
    /**
     * 前驅節點
     */
    volatile Node prev;
    /**
     * 後繼節點
     */
    volatile Node next;
    /**
     * 獲取同步狀態的執行緒(當前節點執行緒)
     */
    volatile Thread thread;
    /**
     * 等待隊列表示 後繼節點
     * 在作為同步佇列節點時,nextWaiter可能有兩個值:EXCLUSIVE、SHARED 標識當前節點是獨佔模式還是共享模式;
     */
    Node nextWaiter;

    /**
     * 是否共享鎖模式
     */


    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * 返回前驅節點
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

 

同步佇列的基本結構:

尾節點設定:

       如果執行緒獲取同步狀態失敗 執行緒會被構造進Node 中 然後加入到同步佇列,加入到佇列的過程必須是安全的因為可能有多個執行緒同時獲取同步狀態失敗, AQS中使用基於CAS的方法加入佇列:

/**
 *
 * @param expect  之前的隊尾
 * @param update  需要加入佇列的節點
 */
compareAndSetTail(Node expect , Node update)

在一個Node被CAS設定為佇列之前,這個Node的prev已經被設定為之前的尾節點,而在這個Node被設定為隊尾之後,之前尾節點的next才會被指向這個Node。

首節點設定:

 

      在佇列中,首節點是當前獲取同步狀態成功的節點,首節點在釋放同步狀態時,會喚醒後繼節點,而後繼節點會在自己獲取同步狀態成功時,將自己設定為首節點。因為設定首節點是由持有同步狀態的執行緒來完成的,因此不需要使用CAS來保證執行緒安全,只需要持有同步狀態的執行緒將首節點設定為原首節點的後繼節點並斷開原首節點的next引用即可。

 

獨佔鎖的獲取和釋放

執行緒A 進入程式碼塊執行 當前沒有其他執行緒執行緒A 獲取到同步狀態 呼叫了 acquire方法 獲取到AQS的鎖

新城 B 執行緒C 進入程式碼塊  由於執行緒A 獲取的同步狀態 此時 執行緒B和C 無法獲取 B和C 加入到 同步佇列

執行緒A 執行完畢 完畢呼叫了 release方release方法喚醒同步佇列中head節點執行緒

     獲取獨佔同步狀態使用方法acquire(int arg):

 

  public final void acquire(int arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

       其中tryAcquire 是我們重寫的方法(上一節有描述) 如果 tryAcquire 返回true 執行緒獲得同步狀態否則 執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法執行緒阻塞並且加入同步佇列.

  •       addWaiter 方法 加入到尾部佇列
private Node addWaiter(Node mode) {
    // 將當前執行緒構造為Node,mode傳入值為Node.EXCLUSIVE (標識執行緒為獨佔模式 儲存在 Node的nextWaiter欄位)
    Node node = new Node(Thread.currentThread(), mode); 
    // 如果當前尾節點不為空,嘗試設定當前節點為尾節點,
    // 如果當前不存在尾節點或者cas設定失敗,呼叫完整設定尾節點enq方法。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            // cas 把當前節點設定為尾節點成功後 把之前尾節點的後繼節點設定為當前尾節點
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 如果當前佇列為null 沒有尾節點 新建一個空節點作為首節點和尾節點 然後進入下一個for迴圈
        if (t == null) { 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
         // 不為null 則把當前節點設為尾節點,並將原尾節點next指向當前節點
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  • acquireQueued方法 :判斷是否輪到自己獲取同步狀態
 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
               //如果當前節點的前驅是頭節點,說明即將輪到自己獲得同步狀態,呼叫tryAcquire檢查是否能獲取到同步狀態

                if (p == head && tryAcquire(arg)) {
                    setHead(node);  //獲取到同步狀態則將自己設為頭節點
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果前驅節點已經是SIGNAL狀態則前驅節點執行完成後會喚醒當前節點
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //前驅節點狀態為CANCELLED,則繼續查詢前驅節點的前驅節點,因為當首節點喚醒時,會跳過CANCELLED節點
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        //如果是0或PROPAGATE狀態,則用CAS設定為SIGNAL
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    //該方法在 shouldParkAfterFailedAcquire 方法返回true後執行,shouldParkAfterFailedAcquire 方法返回true代表前驅節點已經被設定為SIGNAL狀態,
    因此當前節點可以阻塞等待喚醒了,使用LockSupport.park(this)方法來阻塞。這個方法會一直阻塞直到首節點喚醒當前節點或當前節點被中斷,如果是被中斷,中斷標識將會被一直往上層方法傳,最終acquire方法會執行selfInterrupt。
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    釋放獨佔鎖使用方法release(int arg):

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

      tryRelease方法是我們自己在AQS子類中重寫的方法,可以看到release方法的邏輯比較簡單,如果tryRelease方法返回false,那麼release方法直接返回false;如果tryRelease方法返回true則通過unparkSuccessor方法喚醒後繼節點:

    private void unparkSuccessor(Node node) {
        //如果頭節點的狀態是負值,將其歸0.如果失敗了也ok。
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //通常要喚醒的節點就是頭節點的直接後繼節點,但是如果直接後繼節點是null或狀態為CANCELLED,則從tail向前遍歷取離head最近的一個非CANCELLED狀態的節點。這裡之所以要從tail向前遍歷,前面說過原因:最後的tail節點在構造的時候在某時刻可能只有其向前一個節點的prev引用,而沒有前一個節點向它的next引用。
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); // 喚醒下一個節點
    }

共享式同步狀態的獲取和釋放

          共享式同步狀態呼叫的方法是acquireShared:

  public final void acquireShared(int arg) {

        //獲取同步狀態的返回值大於等於0時表示可以獲取同步狀態

        //小於0時表示可以獲取不到同步狀態 需要進入佇列等待

        if (tryAcquireShared(arg) < 0)

            doAcquireShared(arg);

    }

    private void doAcquireShared(int arg) {

      //和獨佔式一樣的入隊操作

        final Node node = addWaiter(Node.SHARED);

        boolean failed = true;

        try {

            boolean interrupted = false;

               //自旋

            for (; ; ) {

                final Node p = node.predecessor();

                if (p == head) {

                    int r = tryAcquireShared(arg);

                    if (r >= 0) {

                     //前驅結點為頭節點且成功獲取同步狀態 可退出自旋

                        setHeadAndPropagate(node, r);

                        p.next = null; // help GC

                        if (interrupted)

                            selfInterrupt();

                        failed = false;

                        return;

                    }

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                        parkAndCheckInterrupt())

                    interrupted = true;

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        //退出自旋的節點變成首節點
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||

                (h = head) == null || h.waitStatus < 0) {

            Node s = node.next;

            if (s == null || s.isShared())

                doReleaseShared();

        }

    }

    和獨佔式獲取相比,主要區別在於setHeadAndPropagate方法:

一個節點在獲取了同步狀態後,不僅把自己設定為頭節點,而且如果當前同步狀態>0||原head為null||原head的狀態<0||當前head為null||當前狀態<0,且下一個節點的型別為null||下一個節點型別為shared,則繼續喚醒下一個節點。

釋放通過呼叫releaseShared方法

 public final boolean releaseShared(int arg) {

//釋放同步狀態

        if (tryReleaseShared(arg)) {

//喚醒後續等待的節點

            doReleaseShared();

            return true;

        }

        return false;

    }

    private void doReleaseShared() {


//自旋

    for (;;) {

            Node h = head;

            if (h != null && h != tail) {

                int ws = h.waitStatus;

                if (ws == Node.SIGNAL) {

                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

                        continue; // loop to recheck cases

                  //喚醒後續節點

            unparkSuccessor(h);

                }

                else if (ws == 0 &&

                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

                    continue; // loop on failed CAS

            }

            if (h == head) // loop if head changed

                break;

        }

    }