1. 程式人生 > >AQS原始碼分析之獨佔鎖和共享鎖

AQS原始碼分析之獨佔鎖和共享鎖

簡介

AQS實現鎖機制並不是通過synchronized——給物件加鎖實現的,事實上它僅僅是一個工具類!它沒有使用更高階的機器指令,也不靠關鍵字,更不依靠JDK編譯時的特殊處理,僅僅作為一個普普通通的類就完成了程式碼塊的訪問控制。

AQS使用標記位+佇列的方式,記錄獲取鎖、競爭鎖、釋放鎖等一些類鎖操作。但更準確的說,AQS並不關心什麼是鎖,對於AQS來說它只是實現了一系列的用於判斷資源是否可以訪問的API,並且封裝了在訪問資源受限時,將請求訪問的執行緒加入佇列、掛起、喚醒等操作。AQS關心的問題如下:

  1. 資源不可訪問時,怎麼處理?
  2. 資源時可以被同時訪問,還是在同一時間只能被一個執行緒訪問?
  3. 如果有執行緒等不及資源了,怎麼從AQS佇列中退出?

至於資源能否被訪問的問題,則交給子類去實現。

站在使用者的角度,AQS的功能主要分為兩類:獨佔鎖共享鎖。在它的所有子類中,要麼實現了它的獨佔功能的API,要麼實現了共享功能的API,但不會同時使用兩套API,即使是ReentrantReadWriteLock,也是通過兩個內部類:讀鎖和寫鎖,分別使用兩套API來實現的。

  • 當AQS的子類實現獨佔功能時,如ReentrantLock,資源是否可以被訪問被定義為:只要AQS的state變數不為0,並且持有鎖的執行緒不是當前執行緒,那麼代表資源不可訪問。
  • 當AQS的子類實現共享功能時,如CountDownLatch,資源是否可以被訪問
    被定義為:只要AQS的state變數不為0,那麼代表資源不可以為訪問。

AQS類繼承結構圖

image

獨佔鎖

ReentrantLock是AQS獨佔功能的一個實現,通常的使用方式如下:

reentrantLock.lock();
// do something
reentrantLock.unlock();

ReentrantLock會保證執行do something在同一時間有且只有一個執行緒獲取到鎖,其餘執行緒全部掛起,直到該擁有鎖的執行緒釋放鎖,被掛起的執行緒被喚醒重新開始競爭鎖。

ReentrantLock的加鎖全部委託給內部代理類完成,ReentrantLock只是封裝了統一的一套API而已,而ReentrantLock又分為公平鎖

非公平鎖

abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
  • 公平鎖:每個執行緒搶佔鎖的順序為先後呼叫lock方法的順序,並依此順序獲得鎖,類似於排隊吃飯;
  • 非公平鎖:每個執行緒搶佔鎖的順序不變,誰運氣好,誰就獲得鎖,和呼叫lock方法的先後順序無關,類似後插入。

換句話說,公平鎖和非公平鎖的唯一的區別是在獲取鎖的時候是直接去獲取鎖,還是進入佇列排隊的問題。

獲取獨佔鎖的流程

image

FairSync的tryAcquire()方法分析

   protected final boolean tryAcquire(int acquires) {
       final Thread current = Thread.currentThread();/*獲取當前執行緒*/
       int c = getState(); /*獲取父類AQS中的標誌位*/
       if (c == 0) {
           if (!hasQueuedPredecessors()
                   /*說明佇列中沒有其他執行緒:沒有執行緒正在佔有鎖,那麼修改一下狀態位*/
                   //注意:這裡的acquires是在lock的時候傳遞來的,從上面的圖中可以知道,這個值是寫死的1
                   && compareAndSetState(0, acquires)) {
               // 如果通過CAS操作將狀態為更新成功則代表當前執行緒獲取鎖,
               // 因此,將當前執行緒設定到AQS的一個變數中,說明這個執行緒拿走了鎖。
               setExclusiveOwnerThread(current);
               return true;/*返回true,說明已拿到鎖*/
           }
       } else if (current == getExclusiveOwnerThread()) {
           /*如果當前state不為0,說明鎖已經被拿走,那麼判斷是否是當前執行緒拿走了鎖*/
           /*因為鎖是可重入的,可以重複lock,unlock*/
           int nextc = c + acquires;
           if (nextc < 0)
               throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
       }
       return false;
   }

如果當前執行緒獲取到鎖,tryAcquire()返回true,否則返回false,這時會返回到AQS的acquire()方法。

如果沒有獲取到鎖,那麼應該將當前執行緒放到佇列中去,只不過,在放之前,需要做些包裝。

AQS的addWaiter()方法分析

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        /*將節點加入到佇列尾部*/
        node.prev = pred;
        /*嘗試使用CAS方式修改尾節點,但是在【併發情況】下,可能修改失敗*/
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    /*修改失敗,說明有併發,那麼進入enq,以自旋方式修改*/
    enq(node);
    return node;
}

用當前執行緒去構造一個Node物件,mode是一個表示Node型別的欄位(獨佔的還是共享的)。構造好節點後,在佇列不為空的時候,使用CAS的方式將新的節點加入佇列尾部,如果修改失敗,則進入enq()方法,使用自旋的方式修改。

/**
 * 將節點通過自旋的方式插入到佇列尾部
 *
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (; ; ) {
        Node pred = tail;
        if (pred == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return pred;
            }
        }
    }
}

將執行緒的節點接入到隊裡中後,當然還需要做一件事:將當前執行緒掛起!這個事,由acquireQueued()來做。

AQS的acquireQueued()方法分析

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (; ; ) {
            /*獲得當前節點pred節點*/
            final Node p = node.predecessor();
            /*如果當前節點正好是第二個節點,那麼則再次嘗試獲取鎖*/
            if (p == head && tryAcquire(arg)) {
                /*獲取鎖成功,*/
                setHead(node); /*將當前節點設定為頭結點*/
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /*當前節點不是第二個節點 或者 再次獲取鎖失敗*/
            /*判斷是否需要掛起,在掛起後,判斷執行緒是否中斷*/
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

判斷此時是否能夠安全的掛起

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 當前節點已經被設定為等待喚醒的狀態,可以安全的掛起了
         */
        return true;
    if (ws > 0) {
        /*
         * 當前節點node的前任節點被取消,那麼【跳過】這些取消的節點,
         * 當跳過之後,重新嘗試獲取鎖
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 通過前面的判斷,waitStatus一定不是 SIGNAL 或 CANCELLED。
         * 推斷出一定是 0 or PROPAGATE
         * 呼叫者需要再次嘗試,在掛起之前能不能獲取到鎖,
         * 因此,將當前pred的狀態設為SIGNAL,再次嘗試獲取鎖之後,如果還沒有得到鎖那麼掛起
         *
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

掛起執行緒,並判斷此時執行緒是否被中斷。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    /*如果被中斷,則返回true,interrupted()方法返回後,中斷狀態被取消,變為false*/
    return Thread.interrupted();
}

到此為止,一個執行緒對於鎖的一次競爭才告一段落,結果有兩種:

  • 要麼成功獲取到鎖(不用進入到AQS佇列中);
  • 要麼獲取失敗,被掛起,等待下次喚醒後繼續迴圈嘗試獲取鎖。

值得注意的是,AQS的佇列為FIFO佇列,所以,每次被CPU假喚醒,且當前執行緒不是處在頭節點的位置,也是會被掛起的。AQS通過這樣的方式,實現了競爭的排隊策略。

釋放獨佔鎖流程分析

image

AQS的release()方法分析

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            /*將持有鎖的頭結點釋放成功後
            * 喚醒其後繼節點
            * */
            unparkSuccessor(h);
        return true;
    }
    return false;
}

嘗試釋放鎖,如果釋放成功,找到AQS的頭節點,呼叫unparkSuccessor()喚醒FIFO佇列中第一個等待鎖的節點。

Sync的tryRelease()方法分析

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //如果【釋放的執行緒】和【獲取鎖的執行緒】不是同一個,丟擲非法監視器狀態異常。
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();

    boolean free = false;
    // 因為是重入的關係,不是每次釋放鎖c都等於0,
    // 直到最後一次釋放鎖時,才通知AQS不需要再記錄哪個執行緒正在獲取鎖。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

AQS的unparkSuccessor()方法分析

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) {
        /*如果狀態為負(SIGNAL、PROPAGATE),那麼清除其狀態
        * 如果失敗,或者狀態被其他等待執行緒改變,也沒有關係
        * */
        compareAndSetWaitStatus(node, ws, 0);
    }
    /*
     * 一般情況下喚醒的執行緒是【頭結點】的【下一個節點】
     * 但是如果該節點被取消或者為null,
     * 那麼需要【從後往前遍歷】尋找一個【最早的】並且【沒有被取消】的節點
     */
    Node s = node.next;

    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) LockSupport.unpark(s.thread);
}

當FIFO佇列中等待鎖的第一個節點被喚醒之後,會返回到到節點所線上程的acquireQueued()方法中,繼續下一輪迴圈,這時當前節點正好時頭節點的第一個後繼節點,並且使用CAS修改狀態持有鎖成功,那麼當前節點則晉升為頭結點,並返回。

····

for (; ; ) {
    /*獲得當前節點pred節點*/
    final Node p = node.predecessor();
    /*如果當前節點正好是第二個節點,那麼則再次嘗試獲取鎖*/
    if (p == head && tryAcquire(arg)) {
        /*獲取鎖成功,*/
        setHead(node); /*將當前節點設定為頭結點*/
        p.next = null; // help GC
        failed = false;
        return interrupted;
    }
    /*當前節點不是第二個節點 或者 再次獲取鎖失敗*/
    /*判斷是否需要掛起,在掛起後,判斷執行緒是否中斷*/
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
        interrupted = true;
}

····

以上的分析是基於ReentrantLock內部的公平鎖來分析的,並且其lcok和unlock已經基本分析完畢,唯獨剩下一個非公平鎖nonfairSync。其實,它和公平鎖的唯一區別就是獲取鎖的方式不同,一個是按前後順序一次獲取鎖,一個是搶佔式的獲取鎖。

  • 非公平鎖的lock方法的處理方式:在lock的時候先直接CAS修改一次state變數(嘗試獲取鎖),成功就返回,不成功再排隊,從而達到不排隊直接搶佔的目的。
  • 而對於公平鎖:則是老老實實的開始就走AQS的流程排隊獲取鎖。如果前面有人呼叫過其lock方法,則排在佇列中前面,也就更有機會更早的獲取鎖,從而達到“公平”的目的。

共享鎖

共享功能的主要實現為CountDownLatch,CountDownLatch是一種靈活的閉鎖實現,它可以使一個或多個執行緒等待一組事件發生。閉鎖狀態包括一個計數器,該計數器被初始化為一個正數,表示需要等待的事件數量。countDown遞減計數器,表示有一個事件已經發生了,而await方法等待計數器達到零,這表示所有需要等待的時間都已經發生。如果計數器值非零,那麼await會一直阻塞直到計數器為零,或者等待執行緒中斷,或者等待超時。

等待獲取共享鎖的流程分析

image

CountDownLatch的await()方法分析

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

CountDownLatch的await()方法不會只在一個執行緒中呼叫,多個執行緒可以同時等待await()方法返回,所以CountDownLatch被設計成實現tryAcquireShared()方法,獲取的是一個共享鎖,鎖在所有呼叫await()方法的執行緒間共享,所以叫做共享鎖。

AQS的acquireSharedInterruptibly()方法分析

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    /*用於響應執行緒中斷,
    * 在前兩行會檢查執行緒是否被打斷
    * */
    if (Thread.interrupted())
        throw new InterruptedException();
    /*返回了-1,說明state不為0,也就是CountDownLatch的計數器還不為0*/
    if (tryAcquireShared(arg) < 0)/*嘗試獲取共享鎖,如果小於0,表示獲取失敗*/
        doAcquireSharedInterruptibly(arg);
}

Sync的tryAcquireShared()方法分析

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

AQS的doAcquireSharedInterruptibly()方法分析

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    /*將當前執行緒包裝為一個共享節點*/
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (; ; ) {
            final Node p = node.predecessor();
            if (p == head) {
                //如果新建節點的前一個節點,就是Head,
                //說明當前節點是AQS佇列中等待獲取鎖的第一個節點,
                //按照FIFO的原則,可以直接嘗試獲取鎖。
                int r = tryAcquireShared(arg);
                if (r >= 0) {

                    /*如果獲取成功,則需要將當前節點設定為AQS佇列中的第一個節點*/
                    /*佇列的頭節點表示正在獲取鎖的節點*/
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //檢查下是否需要將當前節點掛起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

獲得共享鎖的流程分析

image

AQS的releaseShared()方法分析

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (; ; ) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

死迴圈更新state的值,實現state的減1操作,之所以用死迴圈是為了確保state值的更新成功。

從上文的分析中可知,如果state的值為0,在CountDownLatch中意味:所有的子執行緒已經執行完畢,這個時候可以喚醒呼叫await()方法的執行緒了,而這些執行緒正在AQS的佇列中,並被掛起的,所以下一步應該去喚醒AQS佇列中的頭節點了(AQS的佇列為FIFO佇列),然後由頭節點去依次喚醒AQS佇列中的其他共享節點。

AQS的doReleaseShared()方法分析

private void doReleaseShared() {
    for (; ; ) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                //如果當前節點是SIGNAL意味著,它正在等待一個訊號。或者說,它在等待被喚醒,因此做兩件事
                /*重置waitStatus標誌位,如果失敗則重試*/
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases

                /*重置成功後,喚醒等待獲取共享鎖的第一個節點*/
                unparkSuccessor(h);
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                //如果本身頭節點的waitStatus是處於重置狀態(waitStatus==0)的,將其設定為“傳播”狀態。
                //意味著需要將狀態向後一個節點傳播。
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

當FIFO佇列中等待共享鎖的第一個節點被頭結點喚醒之後,會返回到到節點對應執行緒的doAcquireSharedInterruptibly()方法中,並繼續迴圈,這時候當前節點的前驅節點正好時頭結點,並且能夠獲得共享鎖,這時會執行setHeadAndPropagate()方法,將當前節點設定為頭結點,並繼續喚醒下一個節點。

AQS的setHeadAndPropagate()方法分析

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

首先,使用CAS更換了頭結點,然後,將當前節點的下一個節點取出來。如果下一個節點同樣是shared型別,再做一個releaseShared()操作。這時可以回到第二步,依次喚醒AQS佇列中其他共享節點。

總而言之,AQS關於共享鎖方面的實現方式:如果獲取共享鎖失敗後,將請求共享鎖的執行緒封裝成Node物件放入AQS的佇列中,並掛起Node物件對應的執行緒,實現請求鎖執行緒的等待操作。待共享鎖可以被獲取後,從頭節點開始,依次喚醒頭節點及其以後的所有共享型別的節點。實現共享狀態的傳播。

共享鎖與獨佔鎖的對比

  1. 與AQS的獨佔功能一樣,共享鎖是否可以被獲取的判斷為空方法,交由子類去實現。
  2. 與AQS的獨佔功能不同,當鎖被頭節點獲取後,獨佔功能是隻有頭節點獲取鎖,其餘節點的執行緒繼續沉睡,等待鎖被釋放後,才會喚醒下一個節點的執行緒,而共享功能是隻要頭節點獲取鎖成功,就在喚醒自身節點對應的執行緒的同時,繼續喚醒AQS佇列中的下一個節點的執行緒,每個節點在喚醒自身的同時還會喚醒下一個節點對應的執行緒,以實現共享狀態的“向後傳播”,從而實現共享功能。

參考資料