1. 程式人生 > >JUC原始碼分析(三)--Condition原始碼分析

JUC原始碼分析(三)--Condition原始碼分析

背景介紹

JUC中關於鎖的大部分同步控制類都基於AQS(JUC-AbstractQueuedSynchronizer(AQS)原始碼分析),ReentrantLock也不例外,lock()和unlock()都是基於ReentrantLock中繼承AQS的子類Sync實現(JUC-ReentrantLock原始碼分析),但是在上篇部落格中只是簡單介紹了lock()和unlock(),這篇部落格就通過一段示例程式碼介紹下ReentrantLock的Condition功能.此外,還將分析下Node.waitStatue的值是如何變化的.

Condition實現思路

Condition是由AQS的內部類ConditionObject

完成的,ConditionObject是AQS中的非靜態內部類,所以AQS與ConditionObject是1:n的關係,1個AQS可以有多個ConditionObject,但ConditionObject只屬於一個AQS.
由於Condition同樣有同步器的語義,在ConditionObject中同樣維護著一個雙端雙向佇列,不過佇列中的每個節點封裝了一個等待signal訊號的執行緒(即呼叫await()之後的執行緒)
實現思路:

  • condition.await()–>將當前執行緒加入Condition佇列,釋放當前執行緒擁有的鎖資源,掛起當前執行緒
  • condition.signal()–>將Condition佇列的一個執行緒轉移至AQS佇列,喚醒該執行緒

程式碼

/**
 * @author pfjia
 * @since 2018/3/5 10:42
 */
public class ConditionDemo {
    private static Lock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) {
        Thread thread1 = new Thread(() -> {
            //1.waitThread執行lock.lock()
lock.lock(); System.out.println(Thread.currentThread().getName() + "正在執行"); try { TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "停止執行,等待一個signal"); //3.waitThread執行condition.await().釋放鎖,喚醒signalThread condition.await(); //7.執行condition.await()被阻塞之後的程式碼 Thread.sleep(100000000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "獲得一個signal,繼續執行"); lock.unlock(); }, "waitThread"); thread1.start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } Thread thread2 = new Thread(() -> { //2.signalThread執行lock.lock()(因為main執行緒中睡眠1s,而waitThread中睡眠2s,所以此語句先執行),signalThread被阻塞 lock.lock(); //4.lock.lock()中被阻塞的後半部分程式碼 System.out.println(Thread.currentThread().getName() + "正在執行"); //5.signalThread執行condition.signal(),將Condition佇列中的waitThread轉移至AQS佇列中 condition.signal(); System.out.println(Thread.currentThread().getName() + "傳送一個signal"); System.out.println(Thread.currentThread().getName() + "傳送一個signal後,結束"); //6.釋放鎖 lock.unlock(); }, "signalThread"); thread2.start(); } }

程式碼分析

程式碼中共有三個執行緒

  • main執行緒:負責生成waitThread和signalThread
  • waitThread:首先獲取鎖,而後呼叫condition.await()
  • signalThread:獲取鎖時被佔用,等待waitThread呼叫condition.await()釋放鎖後獲取鎖,而後呼叫condition.signal()

程式碼分析

我們根據程式碼的執行順序來分析下程式碼

waitThread:lock.lock()

使用CAS將AQS.state由0修改為1,成功獲取鎖

signalThread:lock.lock()

由於waitThread已經獲取鎖資源,因此signalThread加入AQS佇列後掛起

  • 呼叫addWaiter()將signalThread加入AQS佇列,此時AQS佇列如下(注意:head.waitStatus為0)
    AQS佇列
    • 呼叫acquireQueued(node,1)掛起當前執行緒後AQS佇列如下(注意:head.waitStatus為Node.SIGNAL)
      AQS佇列,頭結點狀態修改為-1
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //由於pred(即頭結點)的waitStatus=0,會進入此段程式碼,將head.waitStatus修改為Node.SIGNAL(即-1)
            //在acquireQueued()的死迴圈第二次時才會將signalThread掛起
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }

waitThread:condition.await()

await()共做了如下幾件事情:

  • 將waitThread新增至Condition佇列–>addConditionWaiter()
  • 將waitThread獲取的鎖資源全部釋放,喚醒AQS佇列的第一個執行緒–>fullyRelease()
  • 迴圈判斷waitThread是否在AQS佇列,若不在,則將waitThread掛起

前兩點容易理解,第三點是什麼含義呢?waitThread不是在Condition佇列嗎?為什麼要判斷waitThread是否在AQS對列?
答案在signalThread:condition.signal()中,讓我們帶著疑問繼續看下去.

  1. 呼叫addConditionWaiter()後,Condition佇列如下:(注意:waitStatue為Node.CONDITION,即-2)
    Condition等待佇列

  2. AQS.fullRelease(node)–>AQS.release(1)–>Sync.tryRelease(1),AQS.unparkSuccessor(node)
    釋放資源後呼叫AQS.unparkSuccessor(node)喚醒AQS佇列中的執行緒(即signalThread)

  3. AQS.isOnSyncQueue(node)–>LockSupport.park(this)
    判斷waitThread的node是否在SyncQueue中,由於waitThread不在SyncQueue中,掛起waitThread

signalThread:lock.lock()執行緒被阻塞的後半部分

其實就是執行acquireQueued()掛起執行緒後的程式碼,即將signalThread移出AQS佇列,操作完成後佇列如下:
這裡寫圖片描述

signalThread:condition.signal()

上面說到的第三件事情是迴圈判斷waitThread是否在AQS佇列,若不在,則將waitThread掛起,waitThread就是在condition.signal()中從Condition佇列移至AQS佇列的.

  1. ConditionObject.doSignal(firstWaiter)–>AQS.transferForSignal(firstWaiter)
    將Condition佇列中的waitThread從轉移至AQS佇列
    AQS佇列如下:
    這裡寫圖片描述
    Condition佇列如下:
    這裡寫圖片描述

signalThread:lock.unlock()

  • AQS.release(1)–>Sync.tryRelease(1),AQS.unparkSuccessor(node)
    釋放鎖,並喚醒在AQS佇列中的waitThread

waitThread:condition.await()被阻塞的後半部分

  • AQS.isOnSyncQueue(node)
    此時node已經在AQS佇列中,跳出while迴圈
  • AQS.acquireQueued(node,1)–>Sync.tryAcquire(1)
    嘗試獲取鎖成功,將waitThread移出AQS佇列

總結

  • Condition維護一個等待signal訊號的執行緒佇列
  • await()將當前執行緒加入Condition佇列,釋放當前執行緒擁有的鎖資源,掛起當前執行緒
  • signal()將Condition佇列的一個執行緒轉移至AQS佇列,喚醒該執行緒

不足

  • 未考慮多執行緒的情況
  • 未考慮Node.waitStatue的變化

參考