1. 程式人生 > >聊聊併發:(十一)concurrent包之Condition原始碼分析

聊聊併發:(十一)concurrent包之Condition原始碼分析

前言

在前幾篇文章中,

我們介紹了concurrent包中幾種鎖的實現機制,對其原始碼進行了分析,在介紹鎖的文章中,並沒有提及到Condition這個類,其實Condition的使用是與Lock繫結在一起的,本章,我們詳細瞭解一下Conditon的使用方式以及實現機制。

Condition介紹

想必大家對Object中的三個方法都很熟悉,wait()、notify()以及notifyAll(),使用這幾個方法,我們可以實現執行緒之間的通訊,以及一些設計模式的實現,然而使用Object中的這幾個方法,是基於物件監視器配合完成執行緒間的等待/通知機制,在一些場景下往往不是特別的靈活,我們不能控制到更細粒度的級別。

因此,在concurrent包中,Java的為我們提供了Condition類,可以幫助我們更加靈活的實現同樣的功能,並且具有更高的可控制性和擴充套件性。

我們看一下Condition中提供的幾個方法:

方法名稱 方法描述
void await() 造成當前執行緒在接到訊號或被中斷之前一直處於等待狀態
boolean await(long time, TimeUnit unit) 造成當前執行緒在接到訊號、被中斷或到達指定等待時間之前一直處於等待狀態
long awaitNanos(long nanosTimeout) 造成當前執行緒在接到訊號、被中斷或到達指定等待時間之前一直處於等待狀態
void awaitUninterruptibly() 造成當前執行緒在接到訊號之前一直處於等待狀態
boolean awaitUntil(Date deadline) 造成當前執行緒在接到訊號、被中斷或到達指定最後期限之前一直處於等待狀態
void signal() 喚醒一個等待執行緒
void signalAll() 喚醒所有等待執行緒

Condition提供的方法並不多,其中比較常用的是await()、signal()、signalAll(),分別對應Object中的wait()、notify()、notifyAll()方法,我們下面來分析一下Condition的內部實現機制。

Condition實現機制

我們先來看一下如何建立一個Condition物件:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

Condition物件的建立,是依賴於Lock中的newCondition()方法,也就是說,Condition的使用是與Lock繫結在一起的,事實上,newCondition()建立的是來自於AQS的ConditionObject物件,該類是AQS的一個內部類,Condition是要和lock配合使用的也就是condition和Lock是繫結在一起的,而lock的實現原理又依賴於AQS,自然而然ConditionObject是AQS的一個內部類

ReentrantLock:

final ConditionObject newCondition() {
    return new ConditionObject()方法創建出來的,那麼,同一個Lock物件,我們呼叫多次();
}

Condition等待佇列實現機制

在前面的文章中,我們介紹了AQS的實現機制,其中介紹了AQS實現原理,是內部維護了一個同步佇列,condition內部也是使用同樣的方式,內部維護了一個等待佇列,所有呼叫condition.await方法的執行緒會加入到等待佇列中,並且執行緒狀態轉換為等待狀態。

ConditionObject中有兩個很重要的變數:

/** First node of condition queue. */
private transient Node firstWaiter;

/** Last node of condition queue. */
private transient Node lastWaiter;

如果看過AQS實現的朋友可以很熟悉,AQS也採用了同樣的實現方式,這裡Node類使用的與AQS的Node一致,因此,可以通過firstWaiter、lastWaiter這兩個Node物件,去完成一個佇列的構建。

ConditionObject內部維護了一個單向的佇列,通過Node類進行構建,它的示意圖如下:

image

與AQS不同的是,AQS內部維護的是一個雙向的佇列,而ConditionObject內部維護的是單向的佇列。

看到這裡,不知道您有沒有發現一個問題點,ConditionObject既然是通過呼叫Lock的newCondition()方法創建出來的,那麼,同一個Lock物件,我們呼叫多次newCondition()方法,會怎麼樣呢?

事實上,每次呼叫newCondition()方法,在這個Lock物件上,都會創建出一個等待佇列,也就是說,AQS是支援一個鎖,同時存在多個等待佇列,這個與Object有很大的不同,一個Object的物件監視器上只能擁有一個同步佇列和一個等待佇列,即僅支援一個執行緒持有Object的鎖後,才可以進入等待狀態,而AQS是支援多個執行緒同時等待一個鎖。

我們還是用一張示意圖來進行說明:

image

由於ConditionObject是AQS的內部類,可以訪問AQS的內部屬性,因此可以共享AQS的同步資源的狀態,需要注意的一點是,在進入等待佇列的先決條件是,當前需要進入等待佇列的執行緒,已經獲取到鎖,否則會丟擲異常。

OK,介紹完ConditionObject的內部結構,接下來我們看一下ConditionObject是如何實現等待機制的。

Condition await()方法實現機制

我們來看一下Condition如何進入等待,這裡,我們拿ReentrantLock為例。

Lock lock = new ReentrantLock();
lock.lock();
Condition condition = lock.newCondition();
condition.await();

首先我們需要建立一個鎖物件,這裡我們建立一個ReentrantLock鎖,呼叫其lock()方法,獲取鎖資源,然後建立Condition物件,呼叫其await()方法,此時,當前獲取到鎖資源的執行緒,會被掛起,釋放掉鎖資源,進入等待佇列中,等待Condition的signal()訊號的喚起。我們進去,看一下await()方法的實現。

AbstractQueuedSynchronizer:

/**
 * 可中斷式的condition實現.
 * 1、如果當前執行緒被打斷, 丟擲InterruptedException異常.
 * 2、通過getState方法,儲存當前同步狀態.
 * 3、呼叫release()方法,將當前同步狀態作為入參,如果失敗,丟擲IllegalMonitorStateException.
 * 4、直到被喚醒或者打斷前, 保持阻塞狀態.
 * 5、通過呼叫acquire()方法重新爭搶同步資源,使用之前儲存的同步狀態作為入參.
 * 6、如果當在第四步被打斷,丟擲InterruptedException異常.
 */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //1、新增節點至同步佇列
    Node node = addConditionWaiter();
    //2、釋放當前執行緒持有的同步資源,並保持當前同步資源的狀態
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //3、如果當前執行緒不在同步佇列中
    while (!isOnSyncQueue(node)) {
        //掛起當前執行緒
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //4、自旋等待獲取到同步狀態(即獲取到lock)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

上面是await()方法的原始碼實現,大體流程可以參考註釋,我們對其中的幾個核心點進行分析:

  • 1、新增節點至同步佇列
    我們在前面已經說明了ConditionObject內部同步佇列的實現機制,await()方法的第一步,就是將當前執行緒,新增至同步佇列的尾部,我們來看一下具體的實現:

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    

    上面的程式碼就是新增等待佇列節點的實現,實現並不複雜,我們簡單說一下,首先獲取尾部等待節點,如果尾部等待節點不為空並且狀態不正確,清除,並重新指定尾部節點,如果狀態沒問題,新增一個節點加入到佇列中,放置到隊尾;如果尾部節點為空,證明這是一個空佇列,新增一個節點,放置到隊尾。

  • 2、釋放當前執行緒持有的同步資源,並保持當前同步資源的狀態將當前執行緒放置到等待佇列中後,可以準備釋放當前執行緒持有的同步資源,呼叫fullyRelease()方法來完成,我們看一下其實現:

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    

    這段邏輯比較簡單,呼叫AQS的release()方法,釋放掉當前執行緒持有的同步資源,並喚醒同步佇列中的下一個等待節點(具體實現邏輯請參考AQS實現機制一文),如果成功,返回當前執行緒的同步狀態,如果失敗,將當前等待節點的狀態置為CANCELLED。

    這裡需要注意的是,如果當前執行緒沒有持有當前鎖的持有者,那麼會丟擲IllegalMonitorStateException異常。

  • 3、如果當前執行緒不在同步佇列中,掛起執行緒
    這裡,有一個迴圈,通過isOnSyncQueue()判斷,當前節點是否在同步佇列中,我們來看一下isOnSyncQueue()的實現:

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }
    

    從上面的實現我們可以知道,當isOnSyncQueue()方法返回false的時候,迴圈會持續下去,並將當前執行緒掛起,只有當該方法返回true,或者當前等待節點被打斷的時候,迴圈才會結束。

    我們先來看什麼時候isOnSyncQueue()才會返回true,有兩種情況:當前節點存在下一個等待節點,或findNodeFromTail()方法返回true,而findNodeFromTail()返回true的情況只有呼叫了signal()方法的時候,這裡我們後面會講。

    總結一下:
    只有兩種情況下,迴圈再回退出,即當前等待節點被打斷,或呼叫了signal()或signalAll()方法的時候,迴圈退出,否則,會一直將當前執行緒掛起。

  • 4、自旋等待獲取到同步狀態(即獲取到lock)
    這裡就不過多解釋了,會呼叫AQS的acquireQueued()方法,自旋去嘗試獲取同步資源。

    對AQS這裡的實現不熟悉的朋友,可以參見前面的文章關於AQS原理的介紹。

OK,我們總結一下await()方法,當呼叫await()後,會講當前執行緒放入等待佇列中,在呼叫signal()或signAll()方法前或被打斷前,當前執行緒會一直被掛起。

哈哈,前面囉囉嗦嗦說著這麼多,其實核心實現就這麼個流程。

Condition signal()與signalAll()方法實現機制

前面分析完了Condition如何進入等待,我們再看一下Condition如何喚醒。

Lock lock = new ReentrantLock();
lock.lock();
Condition condition = lock.newCondition();
condition.await();
condition.signal();

condition的喚醒,需要在await()後執行,同樣的,也必須要獲取lock後才可以執行,否則會丟擲UnsupportedOperationException。

我們來看一下其實現機制:

AbstractQueuedSynchronizer:

public final void signal()方法的邏輯比較簡單,可以參見注釋,核心方法在於() {
    //1、檢測當前執行緒是否已經持有鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2、獲取等待佇列的頭結點
    Node first = firstWaiter;
    if (first != null)
        //3、喚醒
        doSignal(first);
}

signal()方法的邏輯比較簡單,可以參見注釋,核心方法在於doSignal(),我們看一下它的實現:

private void doSignal(Node first) {
    do {
        //如果頭部等待節點沒有下一個節點了 ,將尾部等待節點置為null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        //釋放
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

這裡沒有什麼可說的,核心在於transferForSignal()方法:

final boolean transferForSignal(Node node) {
    //1、重置節點狀態
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //2、將節點加入到AQS的同步佇列中
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

上面的程式碼是真正的signal()方法的核心邏輯,主要做了兩件事情:

  • 1、重置節點狀態
  • 2、將節點加入到AQS的同步佇列中

該方法會使得等待佇列中的頭節點即等待時間最長的那個節點移入到同步佇列,而移入到同步佇列後才有機會使得等待執行緒被喚醒,還記得我們上面關於await()方法的介紹時,提到過findNodeFromTail()這個方法,其只有節點被移動到同步佇列的時候,該方法才會返回true,await()才會退出迴圈。

signalAll與signal方法的區別體現在doSignalAll方法上,前面我們已經知道doSignal方法只會對等待佇列的頭節點進行操作,而doSignalAll的原始碼為:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

可以看出實現機制基本差不多,區別在於,doSignalAll會迴圈喚醒等待佇列中的全部就節點。

Condition應用場景

Condition最常見的應用場景就是解決“生產者與消費者問題”,假設我們有兩個執行緒,A執行緒與B執行緒,當A執行緒獲取到lock鎖後,呼叫await()方法,而B執行緒,也獲取同一個鎖後,呼叫signal()方法,可以使得A執行緒繼續執行,我們用一個小的demo演示一下:

public class ConditionAwaitSignalDemo {

    private static Lock lock = new ReentrantLock();

    private static Condition condition = lock.newCondition();

    private static volatile boolean terminal = false;

    public static void main(String[] args) {
        Thread waiterThread = new Thread(new WaiterThread());
        Thread signalThread = new Thread(new SignalThread());
        waiterThread.start();
        signalThread.start();
    }

    private static class WaiterThread implements Runnable {
        @Override
        public void run() {
            lock.lock();
            try {
                while (!terminal) {
                    System.out.println("等待ing,當前執行緒:" + Thread.currentThread().getName());
                    condition.await();
                }
                System.out.println("結束等待,當前執行緒:" + Thread.currentThread().getName());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    private static class SignalThread implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                terminal = true;
                condition.signal();
            } finally {
                lock.unlock();
            }
        }
    }
}

輸出:

等待ing,當前執行緒:Thread-0
結束等待,當前執行緒:Thread-0

結語

本篇我們瞭解了Condition的使用以及實現機制,理解Condition對於我們使用concurrent包中鎖會有很大的幫助,本篇介紹的比較粗糙,很多Condition的其他方法沒有介紹到,但是其他方法都是大同小異,希望朋友們讀完本篇後,也仔細去讀一下Condition的原始碼實現,理解一下其中實現的精髓所在。

謝謝閱讀!

下篇預告:concurrent包併發輔助類之CyclicBarrier分析