1. 程式人生 > >Java 併發程式設計之 Condition 介面

Java 併發程式設計之 Condition 介面


> 本文部分摘自《Java 併發程式設計的藝術》
## 概述 任意一個 Java 物件,都擁有一個監視器方法,主要包括 wait()、wait(long timeout)、notify() 以及 notifyAll() 方法,這些方法與 synchronized 同步關鍵字配合,可以實現等待 - 通知模式。Condition 介面也提供了類似 Object 的監視器方法,與 Lock 配合可以實現等待 - 通知模式 Object 的監視器方法與 Condition 介面的對比: | 對比項 | Object 監視器方法 | Condition | | ---------------------------------------------------- | ------------------------- | ------------------------------------------------------------ | | 前置條件 | 獲取物件的監視器鎖 | 呼叫 Lock.lock() 獲取鎖呼叫 Lock.newCondition() 獲取 Condition 物件 | | 呼叫方法 | 直接呼叫如:object.wait() | 直接呼叫如:condition.await() | | 等待佇列個數 | 一個 | 多個 | | 當前執行緒釋放鎖並進入等待佇列 | 支援 | 支援 | | 當前執行緒釋放鎖並進入等待佇列,在等待狀態中不響應中斷 | 不支援 | 支援 | | 當前執行緒釋放鎖並進入超時等待狀態 | 支援 | 支援 | | 當前執行緒釋放鎖並進入等待狀態到將來的某個時間 | 不支援 | 支援 | | 喚醒等待佇列中的一個執行緒 | 支援 | 支援 | | 喚醒等待佇列中的全部執行緒 | 支援 | 支援 |
## 介面示例 Condition 定義了等待 - 通知兩種型別的方法,當前執行緒呼叫這些方法時,需要提前獲取到 Condition 物件關聯的鎖。Condition 物件是由 Lock 物件(呼叫 Lock 物件的 newCondition() 方法)建立,換句話說,Condition 是依賴 Lock 物件的 ```java public class ConditionUserCase { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException { lock.lock(); try { condition.await(); } finally { lock.unlock(); } } public void conditionSignal() { lock.lock(); try { condition.signal(); } finally { lock.unlock(); } } } ``` 當呼叫 await() 方法後,當前執行緒會釋放鎖並在此等待,而其他執行緒呼叫 Condition 物件的 signal() 方法,通知當前執行緒後,當前執行緒才從 await() 方法返回,並且在返回前已經獲取了鎖 Condition 的部分方法以及描述: | 方法名稱 | 描 述 | | ------------------------------------------------------------ | ------------------------------------------------------------ | | void await() throws InterruptedException | 當前執行緒進入等待狀態直到被通知(signal)或中斷。 | | void awaitUninterruptibly() | 當前執行緒進入等待狀態直到被通知,該方法不響應中斷。 | | long awaitNanos(long nanosTimeout) throws InterruptedException | 當前執行緒進入等待狀態直到被通知、中斷或者超時,返回值表示剩餘超時時間。 | | boolean awaitUntil(Date deadline) throws InterruptedException | 當前執行緒進入等待狀態直到被通知、中斷或者到某個時間。如果沒有到指定時間就被通知,方法返回 true,否則,表示到了指定時間,返回 false。 | | void signal() | 喚醒一個等待在 Condition 上的執行緒,該執行緒從等待方法返回前必須獲得與 Condition 相關聯的鎖。 | | void signalAll() | 喚醒所有等待在 Condition 上的執行緒,能夠從等待方法返回的執行緒必須獲得與 Condition 相關聯的鎖。 | 下面通過一個有界佇列的示例來深入理解 Condition 的使用方式 ```java public class BoundedQueue { private Object[] items; // 新增的下標,刪除的下標和資料當前數量 private int addIndex, removeIndex, count; private Lock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public BoundedQueue(int size) { items = new Object[size]; } /** * 新增一個元素,如果陣列滿,則新增執行緒進入等待狀態,直到有空位 */ public void add(T t) throws InterruptedException { lock.lock(); try { while (count == items.length) { notFull.await(); } items[addIndex] = t; if (++addIndex == items.length) { addIndex = 0; } ++count; notEmpty.signal(); } finally { lock.unlock(); } } /** * 由頭部刪除一個元素,如果陣列空,則刪除執行緒進入等待狀態,直到有新元素新增 */ @SuppressWarnings("unchecked") public T remove() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } Object x = items[removeIndex]; if (++removeIndex == items.length) { removeIndex = 0; } --count; notFull.signal(); return (T) x; } finally { lock.unlock(); } } } ```
## 實現分析 ConditionObject 是同步器 AbstractQueuedSynchronizer 的內部類,每個 Condition 物件都包含著一個佇列(等待佇列),該佇列是 Condition 物件實現等待 - 通知功能的關鍵 #### 1. 等待佇列 等待佇列是一個 FIFO 佇列,在佇列中的每個節點都包含了一個執行緒引用,該執行緒就是在 Condition 物件上等待的執行緒,如果一個執行緒呼叫了 Condition.await() 方法,那麼該執行緒就會釋放鎖,構造成節點並加入等待佇列並進入等待狀態 一個 Condition 包含一個等待佇列,Condition 擁有首尾節點的引用,新增節點只需要將原有的尾節點 nextWaiter 指向它,並更新尾節點即可。節點引用更新的過程並沒有使用 CAS 來保證,原因在於呼叫 await() 方法的執行緒必定是獲取了鎖的執行緒,也就是該過程是由鎖來保證執行緒安全的 ![](https://img2020.cnblogs.com/blog/1759254/202103/1759254-20210313133531820-1842888356.png) 在 Object 的監視器模型上,一個物件擁有一個同步佇列和等待佇列,而併發包中的 Lock 擁有一個同步佇列和多個等待佇列,其對應關係如圖所示: ![](https://img2020.cnblogs.com/blog/1759254/202103/1759254-20210313133546759-456750256.png) #### 2. 等待 呼叫 Condition 的 await() 方法,會使當前執行緒進入等待佇列並釋放鎖,同時執行緒狀態變為等待狀態。當從 await() 方法返回時,當前執行緒一定獲取了 Condition 相關聯的鎖 Condition 的 await() 方法如下所示: ```java public final void await() throws InterruptedException { // 檢測執行緒中斷狀態 if (Thread.interrupted()) throw new InterruptedException(); // 當前執行緒包裝為 Node 並加入等待佇列 Node node = addConditionWaiter(); // 釋放同步狀態,也就是釋放鎖 int savedState = fullyRelease(node); int interruptMode = 0; // 檢測該節點是否在同步佇列中,如果不在,則繼續等待 while (!isOnSyncQueue(node)) { // 掛起執行緒 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 競爭同步狀態 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清理條件佇列中的不是在等待條件的節點 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 對等待執行緒中斷,會丟擲異常 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } ``` #### 3. 通知 呼叫 Condition 的 signal() 方法,將會喚醒在等待佇列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步佇列中 Condition 的 signal() 方法程式碼如下所示: ```java public final void signal() { // 檢查當前執行緒是否獲取了鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 獲取等待佇列首節點,移動到同步佇列並喚醒 Node first = firstWaiter; if (first != null) doSignal(first); } ``` Condition 的 signAll() 方法,相當於對等待佇列中的每個結點均執行一個 signal() 方法,效果就是將等待佇列中所有節點全部移動到同步佇列中,並喚醒每個節點的線