1. 程式人生 > >[Java併發] AQS抽象佇列同步器原始碼解析--獨佔鎖釋放過程

[Java併發] AQS抽象佇列同步器原始碼解析--獨佔鎖釋放過程

[Java併發] AQS抽象佇列同步器原始碼解析--獨佔鎖獲取過程

上一篇已經講解了AQS獨佔鎖的獲取過程,接下來就是對AQS獨佔鎖的釋放過程進行詳細的分析說明,廢話不多說,直接進入正文...

鎖釋放入口release(int arg)

首先進行說明下,能夠正常執行到release方法這裡來的執行緒都是獲取到鎖的,從下面程式碼可以看出釋放鎖步驟只有兩個重要的方法:tryRelease 與unparkSuccessor ,tryRelease嘗試釋放鎖,unparkSuccessor喚醒後繼節點所封裝的執行緒。

public final boolean release(int arg) {
    // 嘗試釋放鎖
    if (tryRelease(arg)) {
        Node h = head;
        // 如果頭節點不為空,並且waitStatus不為0則喚醒後繼節點
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        // 無論是否執行喚醒後繼節點,總會返回true
        return true;
    }
    // 釋放失敗
    return false;
}

接下來就開始分析tryRelease 與unparkSuccessor這兩個主要的方法。

嘗試釋放鎖tryRelease(int arg)

tryRelease方法在AQS是預設不實現具體邏輯的,如下:

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

因此,我們就拿ReentrantLock的tryRelease的具體實現加以說明,

protected final boolean tryRelease(int releases) {
    // 釋放後的鎖的計數(可重入鎖)
    int c = getState() - releases;
    // 當前釋放鎖的必須為鎖持有的執行緒
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 當鎖計數為0時說明已鎖已完全釋放,將AQS中佔有執行緒設為空
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

喚醒後繼節點unparkSuccessor

 private void unparkSuccessor(Node node) {// 喚醒後繼節點
     int ws = node.waitStatus;
     // waitStatus,直接將waitStatus設為0
     if (ws < 0)
         compareAndSetWaitStatus(node, ws, 0);
     Node s = node.next;
     // waitStatus > 0 ,說明該節點已被取消,從後往前遍歷找到未被取消距離該節點最近的節點並喚醒
     if (s == null || s.waitStatus > 0) {
         s = null;
         for (Node t = tail; t != null && t != node; t = t.prev)
             if (t.waitStatus <= 0)
                 s = t;
     }
     if (s != null)
         // 喚醒執行緒
         LockSupport.unpark(s.thread);
 }

unparkSuccessor的方法執行邏輯:

1.如果頭節點waitStatus < 0,就直接將waitStatus 設為0

2.從後往前遍歷,找出waitStatus <=0 的節點,並且是離頭節點最近的節點,也就是頭節點的後繼節點

3.找到待喚醒的後繼節點後喚醒該節點對應的執行緒。

以上就是本節要講的主要內容了,下次再會.....

等等,等等...

以為AQS獨佔鎖的釋放過程就此結束了嗎?沒那麼簡單。

重新往前看下鎖釋放的程式碼,不知道有沒有發現問題?

問題

1.tryRelease方法為什麼不用CAS進行減少鎖計數

2.unparkSuccessor方法中為什麼只判斷頭節點waitStatus<0時,將waitStatus設為0,那麼waitStatus>0的情況怎麼不判斷

3.unparkSuccessor中if (s == null || s.waitStatus > 0) {... },為什麼需要判斷waitStatus >0

4.為什麼需要從後往前遍歷找到離頭節點最近的並且waitStatus<=0的後繼節點進行執行緒喚醒,不可以從前往後遍歷嗎?

接下來我們逐一的對以上上個問題進行解釋。

問題1 tryRelease方法為什麼不用CAS進行減少鎖計數

這個問題其實是最簡單的一個問題,就是前面也提到的,能夠執行到release方法這裡來的執行緒都是已經獲取到鎖的執行緒,並且獨佔鎖也只能是一個執行緒,因此不需要進行CAS進行比較後才賦值。

問題2 unparkSuccessor方法中為什麼只判斷頭節點waitStatus<0時,將waitStatus設為0,那麼waitStatus>0的情況怎麼不判斷

不知道大家還記不記得上一篇分析的內容,重新回顧一下shouldParkAfterFailedAcquire方法,當前驅節點的waitStatus>0時,我們會遍歷剔除掉waitStatus>0的節點,因此,當前頭節點waitStatus一定不會大於0

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 如果ws == Node.SIGNAL,則說明當前執行緒已經準備好被喚醒,因此現在可以被阻塞,之後等待被喚醒
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // 如果ws > 0,說明當前節點已經被取消,因此迴圈剔除ws>0的前驅節點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //如果ws<=0,則將標誌位設定為Node.SIGNAL,當還不可被阻塞,需要的等待下次執行shouldParkAfterFailedAcquire判斷是否需要阻塞
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

問題3 unparkSuccessor中if (s == null || s.waitStatus > 0) {... },為什麼需要判斷waitStatus >0

在執行過程中,頭節點的第一個後繼節點的waitStatus >0 時就是節點是被取消的,有可能時因為獲取鎖超時被取消,因此我們需要跳過該節點的,所以需要重新找下個需要被喚醒的節點,而如果頭節點的第一個後繼節點的waitStatus<=0直接喚醒。

問題4 為什麼需要從後往前遍歷找到離頭節點最近的並且waitStatus<=0的後繼節點進行執行緒喚醒,不可以從前往後遍歷嗎?

這個問題我們需要重新回顧上一篇的一個方法addWaiter 跟 enq方法

addWaiter 方法
private Node addWaiter(Node mode) {// 首先嚐試快速新增到隊尾,失敗再正常執行新增到隊尾
    Node node = new Node(Thread.currentThread(), mode);
    // 快速方式嘗試直接新增到隊尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果快速新增到隊尾失敗則執行enq(node)新增到隊尾
    enq(node);
    return node;
}
enq方法
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 新增到佇列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

從這兩個方法可以取出新增節點到同步節點隊尾的關鍵部分進行分析

node.prev = pred; // 步驟1
if (compareAndSetTail(pred, node)) {
    pred.next = node; // 步驟2
    return node;
}

從上面程式碼可以看出在執行插入節點的過程中,總是先執行node.prev = pred,然後再執行pred.next = node,因此關於問題4的答案就可以解釋了:

如果我們從頭往後遍歷的話,再併發的環境下如果新增新節點的話可能node.prev = pre已經執行了,但pred.next=node 還未執行,但此時也已經開始執行了unparkSuccessor方法,所以會導致新新增的節點可能沒被遍歷到,但如果是從後往前遍歷的話就不會有該問題。

以上就是AQS獨佔鎖的釋放過程,如果有什麼問題,歡迎各位不吝指正