1. 程式人生 > >併發程式設計之顯式條件

併發程式設計之顯式條件

我們之前介紹 synchronized 關鍵字語義的時候說過,synchronized 雖然不需要我們手動的加鎖和釋放鎖了,但不代表他沒有用到鎖。同時,我們說每個物件本身結構中也內建了阻塞佇列,執行緒持有器,鎖重入計數器等欄位。

所以,與其說是 synchronized 實現了「自動化的鎖機制」,不如說是 synchronized 藉助了我們 Java 中的物件結構實現了了「自動化的鎖機制」。

雖然,我們通過 synchronized 對執行緒實現了自動化的阻塞與喚醒,但是對於已經獲得鎖的執行緒來說,如果在他們的執行期間缺少了某些條件以繼續執行,比如呼叫了資料庫服務等待資料回顯,那麼我們從 CPU 的使用效率來看是不應該讓當前執行緒繼續持有 CPU 空等待的。

wait 方法使用在 synchronized 內部,專門用於將那些已經獲得鎖但由於缺乏某些條件不能繼續執行的執行緒阻塞到另一個佇列上,並釋放鎖及 CPU。同理,notify 方法就是從等待的佇列上釋放一個執行緒以標識它的條件可能滿足了,讓它嘗試重新競爭鎖。

而在我們的顯式鎖中,對應 wait/notify 語義的就是我們本篇要討論的『顯式條件』,我們一起來看看。

實現原理
在探究『顯式條件』的實現原理之前,我們先通過一個小的程式碼 demo,看看顯式條件是如何使用的。

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

Thread thread1 = new Thread(){
@Override
public void run(){
lock.lock();
System.out.println(“preparing waiting…”);
try {
condition.await();
} catch (InterruptedException e) {}
System.out.println(“waiting end…”);

    lock.unlock();
}

};
thread1.start();
為了縮短篇幅,沒給全方法體,只抽出來核心的部分程式碼,具體完整的程式碼,大家可以去 github 自行檢視下載。

得到一個顯式條件,還是很簡單的,我們只要通過 ReentrantLock 的 newCondition 方法即可獲得一個條件物件。接著,在獲取到鎖之後如果遇到某些條件不滿足,不能繼續執行了,直接呼叫 Condition 例項的 await 方法即可,釋放一個條件佇列上的執行緒呼叫 signal 即可,不再贅述。

這裡需要注意一點的是,正如同 synchronized 中使用 wait/notify 方法一樣,condition 的兩個方法 await 和 signal 也一樣是需要在 lock 方法之後呼叫的,也即是必須先獲得鎖才有機會被條件等待。

下面我們看實現原理,先從 newCondition 方法開始:

public Condition newCondition() {
return sync.newCondition();
}
直接透傳呼叫的 AQS 中的 newCondition 方法:

final ConditionObject newCondition() {
return new ConditionObject();
}
ConditionObject 是 AQS 中定義的一個內部類,並實現了 Condition 介面,是一個真正的顯式條件實現者。我們直接看看它的 await 方法:

await方法實現

我用不同顏色的實線將 await 方法切分成了四個模組,每一個模組我們進行一個整體上的概括,具體的程式碼實現大家自行研究,也歡迎加我微信討論。

分析之前首先需要明確等待佇列和阻塞佇列雖然共用的同一個節點類 Node,但是確實兩個完全不同的佇列。阻塞佇列使用 next 指標連結下一個節點,等待佇列使用的是 nextWaiter 指標連結下一個節點。

這一點需要有一個前提認識,不然等你分析具體程式碼實現的時候,你會不知所措的。 下面我們總結這個四個部分的大致邏輯:

將當前執行緒包裝成節點追加到等待佇列的尾部,因為這個時候還沒有釋放鎖,所以追加過程是無併發風險的,接著釋放自己持有的顯式鎖。
如果條件滿足了或是被其他執行緒移除出等待隊列了,那麼 isOnSyncQueue 就會返回 true,結束迴圈並嘗試第三步,否則將會被阻塞當前執行緒並等待喚醒。
從等待佇列中移除之後依然需要先嚐試獲取顯式鎖,接著才能返回到當初被阻塞的呼叫處。
處理中斷,丟擲異常或是設定中斷標誌位。
這就是 await 方法的實現邏輯,再簡潔一點的概括就是:

釋放持有的鎖–>阻塞自己等待被喚醒–>被喚醒後先嚐試獲取鎖–>處理下中斷–>方法返回

總的來說,並不難理解,接著我們看 signal 方法的實現邏輯:

public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
如果自己並沒有持有鎖而試圖去釋放等待佇列上的執行緒節點,直接丟擲異常,拒絕訪問。反之,拿到等待佇列頭節點,並呼叫 doSignal 釋放一個阻塞的節點執行緒。

doSignal 方法的實現如下:

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
方法邏輯主要分為兩個部分,一個是迴圈體,一個是迴圈條件。迴圈體做的事情就是置換出來第等待佇列上的第一個節點,讓它與佇列脫鉤。迴圈條件裡面的邏輯就是,嘗試將剛才脫鉤的節點轉移到阻塞佇列上。

如果轉移失敗了,只有一種可能,就是當前需要被轉移的節點的等待狀態不再是 CONDITION,也即是他的等待狀態已經被取消,所以我們就不需要關心它了,順著等待佇列找到下一個有效的節點,嘗試去轉移並喚醒其對應的執行緒。

整個迴圈只有成功釋放了一個節點執行緒,才能結束。

這樣,我們對於 condition 中的兩個核心的方法原理就分析完了,相信你一定有所瞭解了,至於其他的一些 signalAll,awaitNanos 以及 await 的幾個過載方法來說,他們大多離不開以上分析的兩個方法,這裡就不再贅述了。

生產者消費者模型實現
下面我們應用一下上面介紹的『顯式條件』,通過實現一個經典的併發模型場景,之前我們是通過 wait/notify 實現的,生產者和消費者公用了同一個條件等待佇列,相對來說是不太合適的,效率是不如我們的顯式條件的。

為什麼這麼說呢?

因為我們的顯式條件依附於顯式鎖,是可以建立多個的,所以對於生產者與消費者來說,我們可以建立兩個不同的條件等待佇列分別來阻塞條件不滿足的執行緒,喚醒的時候也可以「對症下藥」,不需要同時喚醒所有的生產者與消費者。這一點,我希望你能有所體會。

Repertory倉庫類

我們定義一個倉庫類,提供新增和消費產品的能力,當然這兩種方式是併發安全的。對於生產方法來說,當倉庫滿了則不能繼續生產產品,而需要在等待佇列上進行等待。

對於消費方法來說,當倉庫為空是則不能繼續消費產品,而需要在另一個等待佇列上進行等待。

當然,如果成功生產了一個產品,將嘗試喚醒所有消費者,告訴他們倉庫有產品了,反之,如果成功消費了一個產品,將嘗試喚醒所有的生產者,告訴他們倉庫中有空餘位置了,你們可以繼續生產了。

這樣,當我們同時建立大量的生產者與消費者,並讓他們併發呼叫這兩個方法,消費者與生產者的足跡會交替的出現在控制檯上。這部分程式碼我已經寫好了,並且測試過了,因為限於篇幅且不是核心程式碼,這裡就不貼出來了,大家可以自行嘗試編寫或從我的 github 上下載我已經完成的實現,歡迎你給我提出建議!