1. 程式人生 > >【併發程式設計】 圖文深入解析Java顯式鎖底層原始碼 —— condition 實現執行緒排程

【併發程式設計】 圖文深入解析Java顯式鎖底層原始碼 —— condition 實現執行緒排程

一、回顧 AQS 資源的鎖定與釋放

上篇文章(文章中有詳細的原始碼解讀) 說到,AQStryRelease 失敗後,資源的鎖定與釋放,正向流程大體可以分為以下6個階段。

1-2:當其他執行緒佔據了鎖定的資源,另一個執行緒進行獲取時,會進入 FIFO 佇列,如果佇列未初始化,則進入初始化。

3:進入了 FIFO 佇列之後,開始自旋,並不斷嘗試將前一個節點的 waitStatus 替換成 -1 == SIGNAL

4:最後一次自旋,前一個節點的 waitStatus 已經是 -1 == SIGNAL,則進入阻塞模式`

5:當持有資源鎖定的執行緒呼叫了 releasestate

從 1 改為 0之後,本執行緒被喚醒,繼續自旋

6:如果本執行緒節點的 prev 節點為 HEAD,則有機會進行一次資源獲取,如果獲取成功(將 state 由 0 改為 1 ),則將自己置為頭結點,自旋結束。


注意:

為了防止混淆,本文將 AQS 的佇列(上篇文章,也是上圖說到的佇列)稱為 On Sync Queue(藍底+淺藍色Node),本文中伴隨 Condition 物件出現的佇列稱為 Condition Queue(藍底+黑色Node


二、AQS 之 Condition 簡單 Demo

上篇文章,我們只說到了 waitStatus 的初始狀態 0 以及 SIGNAL = -1

,表明鎖定資源持有者在釋放資源時需要通知 next 節點進行 unPark,即喚醒 next 節點。

而接下來這一小節將著重分析 CONDITION = -2 的情況,-2 的意思是代表當前這個節點在 Condition Queue 中排隊,等待通知 (signal)。

Condition 的建立十分簡單,在原來的顯式鎖上呼叫 newCondition() 即可,使用方法和我們熟知的 waitnotify 類似,condition 為我們提供了 awaitsignal 方法,但是它可以做更加細粒度的控制,我們看看下面這個簡單的 Demo。

/**
 * Created by Anur IjuoKaruKas on 2019/6/4
 */
public class Condition {

    private ReentrantLock reentrantLock = new ReentrantLock();

    private java.util.concurrent.locks.Condition meetWaiter = reentrantLock.newCondition();

    private java.util.concurrent.locks.Condition fruitWaiter = reentrantLock.newCondition();

    private void buyMeet() throws InterruptedException {
        try {
            reentrantLock.lock();
            print("前去買肉發現沒貨");
            meetWaiter.await();
            print("被通知:肉進貨了~");
        } finally {
            reentrantLock.unlock();
        }
    }

    private void buyFruit() throws InterruptedException {
        try {
            reentrantLock.lock();
            print("前去水果發現沒貨");
            fruitWaiter.await();
            print("被通知:水果進貨了~");
        } finally {
            reentrantLock.unlock();
        }
    }

    private void meetIn() {
        try {
            reentrantLock.lock();
            print("通知:肉進貨了~");
            meetWaiter.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    private void fruitIn() {
        try {
            reentrantLock.lock();
            print("通知:水果進貨了~");
            fruitWaiter.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Condition condition = new Condition();

        new Thread(() -> {
            try {
                condition.buyFruit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                condition.buyFruit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                condition.buyMeet();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        Thread.sleep(1000);
        condition.fruitIn();

        Thread.sleep(1000);
        condition.meetIn();
    }

    public static void print(String print) {
        System.out.println(String.format("時間 - %s\t\t%s\t\t%s", new Date(), Thread.currentThread(), print));
    }
}

====================================================
時間 - Wed Jun 05 10:22:14 CST 2019		Thread[Thread-0,5,main]		前去水果發現沒貨
時間 - Wed Jun 05 10:22:14 CST 2019		Thread[Thread-2,5,main]		前去買肉發現沒貨
時間 - Wed Jun 05 10:22:14 CST 2019		Thread[Thread-1,5,main]		前去水果發現沒貨
時間 - Wed Jun 05 10:22:15 CST 2019		Thread[main,5,main]		通知:水果進貨了~
時間 - Wed Jun 05 10:22:15 CST 2019		Thread[Thread-0,5,main]		被通知:水果進貨了~
時間 - Wed Jun 05 10:22:16 CST 2019		Thread[main,5,main]		通知:肉進貨了~
時間 - Wed Jun 05 10:22:16 CST 2019		Thread[Thread-2,5,main]		被通知:肉進貨了~

例子雖然舉的比較粗俗...... 但是核心就是執行緒的排程,我們可以在某些條件下使得顯示鎖阻塞,且通過某些條件被喚醒。

可以看到,我們可以分別為 fruitWaiter 或者 meetWaiter 進行細粒度的喚醒 signal(其實還有個 signalAll)。至於 condition 的使用我們這裡不做過多贅述。

三、AQS 之 Condition 正向流程原始碼解析

我們先縱覽一下 await 方法:(不考慮執行緒被 interrupt 的情況)

  1. 呼叫 addConditionWaiter();,這一步實際上和我們前面說的 FIFO 佇列很像,操作的是 Condition Queue
  2. 呼叫 int savedState = fullyRelease(node);,如果對前面說的 release 有印象的話,那麼這個就很好理解了,一般我們一次 release,正常實現都是使得 state --,對應 acquire 使得 state ++。而這個 fullyRelease 則是一次性釋放掉所有 state,直接讓 state 歸零,並儲存 state 狀態。
  3. isOnSyncQueue 則是進行一系列判斷、阻塞與自旋,它是控制 condition 阻塞的核心程式碼(實際上很簡單)。

  1. 被其他持鎖執行緒 signal 進行通知彈出 Condition Queue ,且進入 On Sync Queue

  1. 回到我們上篇文章說的 tryAcquire 自旋(本文第一章階段3-6),實際上到這一步,condition 阻塞已經完畢了,接下來回歸我們的正常流程,可以理解為,此時被某個執行緒通知喚醒,但是一喚醒我們不能並不能立刻獲得資源,正常的流程還是要走的。
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();// 1、進入 condition 佇列
            int savedState = fullyRelease(node);// 2、釋放資源,並記錄 state
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {// 3、condition 佇列阻塞,直到被其他持鎖執行緒 signal(或者被 interrupt)才會停止自旋。
                LockSupport.park(this); // 阻塞
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;// 被 interrupt
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)// /5、重新自旋,開始申請鎖定資源
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // 6、如果有必要的話,修改 condition 佇列
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

下面我們逐一對其進行原始碼分析:


1、addConditionWaiter 另一個 FIFO 佇列!

程式碼十分簡單:

  1. unlinkCancelledWaiters 迭代清理所有 waitState 不為 Node.CONDITION 的節點,並重新設定尾節點。
  2. 新建一個 Node,並將其塞到尾部。Node 物件上篇文章已經講過,其實它就是一個搭載了一些狀態,以及當前執行緒的一個例項。
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

2、靈活的 state 應用:fullyRelease

程式碼更加簡單,如果你還記得上篇文章所說的,正常實現都是使得 state --,對應 acquire 使得 state ++的話。這裡實際上就是一夜回到解放前,release 所有 state

release的實現就不多說了,上篇文章裡已經說得很清楚了。

   final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

3、阻塞、且等待被喚醒:isOnSyncQueue 相關

先說說這個 while 迴圈,先不看 isOnSyncQueue 的實現,外面邏輯很清晰,只要 isOnSyncQueue 為假,執行緒就會阻塞(park)。後續則是進行是否被 interrupt 的判斷,如果被 interrupt,則跳出迴圈,否則在 isOnSyncQueue 為真之前,執行緒會不停的被阻塞、喚醒、阻塞、喚醒。

            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

接著再看看 isOnSyncQueue 是如何實現的。通過前面我們可以知道,結果為真,就可以跳出 while 迴圈。使得結果為真的條件只有兩個:

  1. 如果 node.next != null ,則結果為真。
  2. 本執行緒的 Node 已經位於 On Sync Queue了:findNodeFromTail 方法是一個簡單的查詢方法,但它是從 On Sync Queuetail 節點,不斷往前尋找,如果找到了本 Node,則結果為真。
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }

Condition Queue 的解析中,到目前為止只出現了 FirstWaiterLastWaiter、以及 Node 本身的成員變數 nextWaiter。為什麼會出現在上篇文章 On Sync Queue 中涉及的 nextprev 等 "指標" 作為判斷條件呢?(參考本文第一章那幾張圖)

4、通知可以離開 Condition Queue 了,但實際上並不喚醒:signal 實現分析

帶著上面的疑問,我們來到了 signal() 方法

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

signal 方法的門面比較簡單,isHeldExclusively 需要自己實現,ReentrantLock 內部的 Sync 實現是判斷當前執行緒是否持有鎖定資源,也就是判斷 getExclusiveOwnerThread() == Thread.currentThread(); 鎖的執行緒持有者和當前是否相等。

isHeldExclusively 的設計十分靈活,如果必要的話,我們可以實現一個不需要持有鎖執行緒便可進行 signalAQS 實現,即:不做任何判斷直接返回 true 即可。注意,要使用 condition 必須實現此方法!!


signal 方法的核心是 doSignal(first);,我們重點看看在這裡做了什麼:

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
  • do 階段主要是將 Condition Queue 隊頭節點的 nextWaiter 變成新的隊頭,並同時將 nextWaiter 引用擦除(情況1),如果沒有 nextWaiter,則將佇列清空(情況2)。

  • while 階段則是兩個常規判斷,(first = firstWaiter) != null 很好理解,類似遞迴呼叫,不做贅述。關鍵看看 !transferForSignal(first)。總結一下就是當 transferForSignal 為真或者佇列已經空了,則跳出 while 迴圈。


transferForSignal 主要做了如下操作:

  1. 將當前 Node 的狀態由 CONDITION == -2 改為 0,失敗則返回 flase
  2. enq(node);,這個其實就是上篇文章說道的 addWaiter 的核心操作,就是將當前 Node 塞進 On Sync Queue
  3. 優化操作,如果上個節點剛好 cancel (ws > 0) 了,或者 CAS 失敗,則將當前節點直接喚醒( 其實就是給了 condition 一個優先去競爭原子 state 的機會 )。
    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

整理一下 signal 的邏輯,可以用下圖表示,用一句話簡單總結就是將 Condition Queue 的頭部取出,拿到 On Sync Queue 的尾部。

5、老生常談:tryAcquire

這一步不多說,略過,上篇文章已經解釋的很清楚了。

四、AQS 之 Condition 總覽

AQSOn Sync QueueCondition Queue 的關係可以如下表示:

  1. 每個 Condition 都有自己的 Condition Queue,且多個 Condition Queue 之間的 await()signal() 方法相互不影響。
  2. 當某個持有鎖的執行緒呼叫了某個 Conditionawait() 方法以後,會釋放掉鎖,且進入該 Condition 所對應的 Condition Queue 的隊尾。
  3. 當有某個執行緒呼叫了某個 Conditionsignal() 方法後,該 Condition 所對應的 Condition Queue 隊頭出列,緊接著進入到 On Sync Queue 隊尾。注意,該節點並不會被直接喚醒,只是進了 On Sync Queue 隊尾。


文章皆是基於原始碼一步步分析,沒有參考過多資料,如有錯誤,請指出!!


另外歡迎來 Q 群討論技術相關(目前基本沒人)[左二維碼]~

如果覺得寫得好還可以關注一波訂閱號喲 ~ 部落格和訂閱號同步更新 [右二維碼]~


參考資料:

JDK12 原始碼

另外小夥伴可以思考一下:

  1. 在節點從 Condition Queue 出隊時,如果上個節點剛好 cancel (ws > 0) 了,或者 CAS 失敗,則將當前節點直接喚醒,這個優化是為什麼?
  2. 本文沒有提到 ConditionsignalAll() 方法,呼叫這個方法後,會