1. 程式人生 > >併發程式設計學習筆記之構建自定義的同步工具(十一)

併發程式設計學習筆記之構建自定義的同步工具(十一)

概述:

併發程式設計學習筆記之併發工具類(四)中,為大家介紹了幾種同步工具(同步工具就是依靠自己的狀態,調節執行緒是阻塞還是執行用的.),閉鎖、FutureTask、訊號量、關卡.

使用以上的同步工具大部分時候可以滿足我們的需求,但是如果沒能滿足我們需要的功能,可以使用語言和類庫提供的底層機制,包括內部佇列、限制的Condition物件和abstractQueueSynchronizer框架,來構建屬於自己的Synchronizer.

1.管理狀態依賴性

狀態依賴: 若一個操作存在基於狀態的先驗條件,則把它稱為是狀態依賴的(state-dependent).

單執行緒

的程式而言,操縱一個集合,如果集合中的元素不為空,就取出一個元素,如果這個先驗條件(集合不為空)不滿足,就不需要等了,失敗就行了.

但是在多執行緒的條件下,一個執行緒走到了檢驗這個集合是否為空的時候,可以阻塞一段時間,等待結果為真,因為此時可能有另外一個執行緒往裡面新增元素.

對於併發物件,依賴於狀態的方法有時可以在不能滿足先驗條件的情況下選擇失敗,不過更好的選擇是等待先驗條件為真.

等待先驗條件為真的實現方式

一個可阻塞的狀態依賴活動:

void blockingAction(){
            /*獲得鎖*/
        while(/*先驗條件,例如判斷佇列不為空*/){
            /*如果不滿足先驗條件,釋放鎖*/
            /*等待先驗條件為真*/
            /*如果執行緒被中斷或者等待超時,選擇失敗*/
            /*重新獲得鎖*/
        }
        /*執行任務*/
        /*再次獲得鎖*/
    }

鎖是在操作過程中被釋放與重獲的,這也讓這種加鎖的模式略顯與眾不同.

組成先驗條件的狀態變數必須由物件的鎖保護起來,這樣它們能在測試先驗條件的過程中保持不變.

如果先驗條件尚未滿足,就必須釋放鎖,讓其他執行緒可以修改物件的狀態.否則,先驗條件就永遠無發成真了.

再一次測試先驗條件之前,必須要重新獲得鎖.

接下來以一個BaseBoundedBuffer為基類,用不同的子類去繼承它,看一下幾種處理先驗條件的方式.

基類BaseBoundedBuffer:

public class BaseBoundedBuffer<V> {
    //快取的陣列
    private  final V[] buf;
    private int tail;
    private int head;
    private int count;

    /*
    * 通過建構函式,設定快取資料的長度
    * */
    protected BaseBoundedBuffer(int capacity) {
        this.buf = (V[]) new Object[capacity];
    }

    /*
    * 執行緒安全的put方法,同時因為是final的不能被子類重寫
    * */
    protected synchronized final void doPut(V v){
        //將v 放到快取的 tail位置,從0開始賦值
        buf[tail]  = v;
        // 自增tail,同時與快取的長度相比較,如果等於快取的長度
        // 把tail置為0,也就是說下次會從0開始覆蓋.
        if(++tail == buf.length){
            tail = 0;
        }
        // 統計的陣列容量+1
        ++count;
    }

    /*
    * 同上
    * */
    protected  synchronized  final V doTake(){
        /*從head拿取資料*/
        V v = buf[head];
        /*拿出去的資料置為0*/
        buf[head] = null;
        if (++head == buf.length){
            head = 0;
        }
        -- count;
        return v;
    }

    //判斷是否滿了,注意前驗條件必須被鎖保護,保證不會看到過期資料
    public synchronized  final boolean isFull(){
        //如果當前的容量count == 快取的長度,那就是滿了,返回true
        return count == buf.length;
    }

    //判斷是否為空
    public synchronized  final boolean isEmpty(){
        return count == 0 ;
    }

}

1.1 處理方式1: 將先驗條件失敗傳給呼叫者

子類的實現方式,如果不滿足先驗條件就拋異常:

public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer {

    protected GrumpyBoundedBuffer(int capacity) {
        super(capacity);
    }

    /*
    * 新增元素,如果快取滿了拋異常
    * */
    public synchronized  void put(V v) throws BufferFullException {
        if(isFull()){
            throw new BufferFullException();
        }
        doPut(v);
    }

    /*
    * 取出元素,如果集合是空的拋異常
    * */
    public synchronized  V take() throws BufferEmptyException {
        if(isEmpty()){
            throw new BufferEmptyException();
        }
        return (V) doTake();
    }
}

這種方式雖然簡單,但是使用起來很麻煩.需要時刻捕獲異常.而且還需要呼叫者重新呼叫這個方法,重新嘗試put/take.

在客戶端呼叫take方法:

public static void main(String [] args) throws InterruptedException {
        GrumpyBoundedBuffer grumpyBoundedBuffer = new GrumpyBoundedBuffer();
        //迴圈呼叫
        while(true){
            try{
                //獲得物件v,如果成功break,跳出迴圈
                //如果失敗,捕獲異常休息1秒
                Object v = grumpyBoundedBuffer.take();
                break;
            } catch (BufferEmptyException e){
            //拋異常以後,休眠一段時間.再嘗試
                Thread.sleep(1000);
            }
        }

    }

上面的程式碼,在呼叫失敗的去情況下會選擇休眠一段時間,然後重新嘗試.也可以選擇不休眠的方式---被稱為忙等待自旋等待.

兩種方式各有利弊:

休眠: 可以避免消耗過多的CPU時間,但是容易睡過頭,引發響應慢的問題.

自旋等待: 短時間比較適合用這種方式,但是長時間會浪費系統的資源.

所以,客戶端程式碼身處於自旋產生的低CPU使用率和休眠產生的弱響應性之間的兩難境地.

有一種折中的方式是使用Thread.yield方法.讓當前執行緒讓出一定的時間給其他執行緒執行.

1.2 處理方式2:利用"輪詢加休眠"實現拙劣的阻塞

更好點的方式,"輪詢加休眠":

public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
    protected SleepyBoundedBuffer(int capacity) {
        super(capacity);
    }


    public void put(V v) throws InterruptedException {
        //無限嘗試將v新增入集合
            while(true){
                //獲得鎖
                synchronized (this){
                    //如果不空,就新增進集合,退出迴圈
                        if(!isFull()){
                            doPut(v);
                            return;
                        }
                }
                //否則釋放鎖,休眠一段時間,給其他執行緒一些修改的機會.
                Thread.sleep(1000);
            }
    }

    public V take() throws InterruptedException {
        while(true){
            synchronized (this){
                if(!isEmpty()){
                    return doTake();
                }
            }
            Thread.sleep(1000);
        }
    }
}

使用這種方式,呼叫者不必像之前那樣處理失敗和重試.

選擇休眠的時間間隔,是在響應性與CPU使用率之間作出的權衡;

休眠的間隔越小,響應性越好,但是CPU的消耗也越高.

休眠間隔是如何影響響應性的:
image
快取空間變為可用的時刻與執行緒被喚醒並在此檢查的時刻之間可能有延遲.

這就是使用這種方式的弊端,現在有一種更好的方式,條件佇列(condition queue).

條件佇列可以讓執行緒掛起,並且能夠保證當某個條件成為真時,執行緒可以及時地甦醒過來.

1.3 讓條件佇列來解決一切

條件佇列可以讓一組執行緒--稱作等待集--以某種方式等待相關條件變成真.它也由此得名.

不同於傳統的佇列,條件佇列裡面放的是等待相關條件的執行緒. 存放的是執行緒-畫重點

就像每個Java物件都能當做鎖一樣,每個物件也能當做條件佇列.Object的wait、notify、notifyAll方法構成了內部條件佇列的API.

一個物件的內部鎖與它的內部條件佇列是相關的: 為了能夠呼叫物件X中的任一個條件佇列方法,必須持有物件X的鎖(也就是說在synchronized塊中呼叫wait啊,notify啊,notifyAll啊這些方法,不在鎖裡呼叫會報錯).

這是因為"等待基於狀態的條件"機制必須和"維護狀態一致性"機制緊密地繫結在一起:除非你能檢查狀態,否則你不能等待條件(這裡後面,看程式碼就明白了,說的就是有if/while的條件判斷後面才能跟上wait方法),同時,除非你能改變狀態,否則你不能從等待(佇列)中釋放其他的執行緒(這裡說的就是,改變了先驗條件的狀態,才能呼叫notify或notifyAll方法);

先簡單的介紹一下wait和notify、notifyAll方法的使用:

  1. wait、notify和notifyAll都是Object類的方法.也就是說每個類都有這三個方法.
  2. 使用這三個方法的時候,一定要在被鎖保護的程式碼塊中,否則會報java.lang.IllegalMonitorStateException.
  3. wait和notify、notifyAll是配合使用的. 呼叫wait方法會掛起執行緒,釋放鎖,給其它執行緒一些機會,讓前驗條件變為真;notify/notifyAll會喚醒掛起的執行緒,獲取鎖,繼續執行.

使用條件佇列實現的方式:

public class BoundedBuffer<V> extends BaseBoundedBuffer <V>{
    protected BoundedBuffer(int capacity) {
        super(capacity);
    }

    /*
    * 注意這裡的synchronized,
    * 是必須的否則執行會報java.lang.IllegalMonitorStateException
    * 作用是檢查前驗條件時保護狀態的一致性.不會讀到過期資料
    * */
    public synchronized void put(V v) throws InterruptedException {
        /*注意這裡這裡是while迴圈
        * 不是單單一個簡單的if,這麼做有兩個理由
        *1. 因為從notify/notifyAll通知的這段時間
        * 很有可能前驗條件條件又由真變為假.所以迴圈判斷一次是有必要的
        * 2. notify/notifyAll的區別,notify是選取一個條件佇列中的執行緒通知,
        * 而notifyAll則是通知所有的條件佇列,當有多個前驗條件時,可能有一些沒有通過前驗條件的也會被通知
        * 所以需要再次判斷
        * */
        while(isFull()){
            /*掛起當前執行緒,釋放鎖,給其他執行緒一些機會
            * 使前驗條件為真
            * */
            wait();
        }
        /*存入資料*/
        doPut(v);
        /*通知,告訴下面的take方法裡面已經有資料了*/
        notifyAll();
    }

    public synchronized V take() throws InterruptedException {
        while (isEmpty()){
            wait();
        }
        V v = doTake();
        notifyAll();
        return v;
    }

}

註解說的很詳細了,注意兩個方法都是被鎖保護的,還有使用while迴圈而不是用if的理由.

這與之前的"輪詢加休眠"方式相比更高效,響應性更佳(不會"睡過頭").

wait方法也有限時的版本,為了避免死鎖的問題,可以使用限時版本的wait.

2.使用條件佇列

使用Java提供的類,比你自己去建立一個類要好,因為它經歷了重重考驗,證明了自己,而且考慮到方便性、簡單性你也應該這麼做.

但是有時候類庫沒有提供我們需要的同步工具,所以我們必須使用條件佇列自己構建一個同步工具,這時一定要小心謹慎,因為它很容易被用錯.

接下來看看使用條件佇列的一些注意事項.

2.1 條件謂詞

正確使用條件佇列的關鍵在於識別出物件可以等待的條件謂詞.

條件謂詞是先驗條件的第一站,它在一個操作與狀態之間建立起依賴關係.

舉個例子,什麼是條件謂詞:
在有限快取中,只有快取不為空時take才能執行,否則它必須等待.就take而言,它的條件謂詞是"快取不空",
類似的,put的條件謂詞是"快取不滿".

條件謂詞是由類的狀態變數構成的表示式.

看看之前的程式碼:

    //判斷是否滿了
    public synchronized  final boolean isFull(){
        //如果當前的容量count == 快取的長度,那就是滿了,返回true
        return count == buf.length;
    }

    //判斷是否為空
    public synchronized  final boolean isEmpty(){
        return count == 0 ;
    }

count == buf.length; 和 count == 0 ; 就是兩個條件謂詞

將條件謂詞和與之關聯的條件佇列,以及在條件佇列中等待的操作,都寫入文件.

在涉及了加鎖、wait方法和條件謂詞的條件等待中,存在著一種非常重要的三元關係.條件謂詞涉及狀態變數,而狀態變數是由鎖保護的,所以在測試條件謂詞之前,我們必須先持有鎖.鎖物件與條件佇列物件(wait和notify方法呼叫的物件)必須也是同一個物件.

每次呼叫wait都會隱式地與特定的條件謂詞相關聯.

當呼叫特定條件謂詞的wait時,呼叫者必須已經持有了與條件佇列相關的鎖,這個鎖必須同時還保護著組成條件謂詞的狀態變數.

2.2 過早地喚醒

注意,wait的返回並不一定意味著執行緒正在等待的條件謂詞已經變成真了.

一個單獨的內部條件佇列可以與多個條件謂詞共同使用.當有人呼叫notifyAll,從而喚醒了你的執行緒時,並不意味著你正在等待的條件謂詞現在變成真了.wait甚至可以"假裝"返回--不作為對任何執行緒呼叫notify的響應.(這就好比烤麵包機的線路連線有問題,導致麵包尚未烤好,鈴聲就自己響起來了).

當控制流重新進入呼叫wait的程式碼時,它會重新請求與條件佇列相關聯的鎖.但是這時條件謂詞不一定為真,有兩種可能:

  1. 在notify/notifyAll通知的這段時間很有可能條件謂詞又由真變為假.
  2. notify是選取一個條件佇列中的執行緒通知,而notifyAll則是通知所有的條件佇列,所以被notifyAll通知的wait有可能前驗條件不為真.

所以呼叫wait的地方,要是用while(前驗條件)進行迴圈判斷.

當使用條件等待時(Object.wait或者Condition.await):

  • 永遠設定一個條件謂詞---一些對狀態的測試,執行緒執行前必須滿足它;
  • 永遠在呼叫wait前測試條件謂詞,並且從wait中返回後再次測試;
  • 永遠在迴圈中呼叫wait;
  • 確保構成條件謂詞的狀態變數被鎖保護,而這個鎖正是與條件佇列相關聯的;
  • 當呼叫wait、notify或者notifyAll時,要持有與條件佇列相關聯的鎖;並且,
  • 在檢查條件謂詞之後、開始執行被保護的邏輯之前,不要釋放鎖.

2.3 丟失的訊號

死鎖和活鎖是活躍度失敗的一種形式,另一種活躍度失敗的形式是丟失的訊號(missed signal).

當一個執行緒等待的特定條件為真,但是進入等待前檢查條件謂詞卻返回了假,我們稱這樣就出現了一個丟失的訊號.

執行緒在等待一個已經通知過的訊息,它有可能永遠等不到這個訊息.

例如: 未能在呼叫wait之前先檢測條件謂詞,就會導致訊號的丟失.

但是使用while()迴圈的方式,可以避免這種情況的發生.

2.4 通知(notify)

無論何時,當你在等待一個條件,一定要確保有人會在條件謂詞變為真時通知你.

在條件佇列API中有兩個方法--notify和notifyAll.無論呼叫哪一個,你都必須持有與條件佇列物件相關聯的鎖.

呼叫notify的結果是:JVM會從在這個條件佇列中等待的眾多執行緒中挑選一個,並把它喚醒;

呼叫notifyAll會喚醒所有正在這個條件佇列中等待的執行緒.

notify/notifyAll應該儘快釋放鎖,以確保在wait處阻塞的執行緒儘可能快的解除阻塞.

由於會有多個執行緒因為不同的原因在同一個條件佇列中等待,因此不用notifyAll而使用notify是危險的.這主要是因為單一的通知容易導致同類的執行緒丟失全部訊號.

notifyAll在大多數情況下都是由於notify的選擇.

舉個例子:
假設執行緒A因為謂詞PA而在條件佇列中等待,同時執行緒B因為謂詞PB也在同一個條件佇列中等待.
現在假設PB變成真,執行緒C執行一個單一的notify:JVM將從它所擁有的眾多執行緒中選擇一個並喚醒,如果A被選中,它隨後被喚醒,看到PA尚未變成真,轉而繼續等待.期間本應該可以執行的B卻沒有被喚醒.這不是嚴格意義上的"丟失訊號"--它更像一個"被劫持的(hijacked)"訊號---不過問題是一樣的:執行緒正在等待一個已經(或者本應該)發生過的訊號.

使用notify取代notifyAll的情況:

相同的等待者,只有一個條件謂詞與條件佇列相關,每個執行緒從wait返回後執行相同的邏輯,並且,一進一出,一個隊條件變數的通知,至多隻啟用一個執行緒執行.

大多數類都不滿足這些條件,因此普遍認可的做法是優先使用notifyAll,而不是單一的notify.儘管使用notifyAll而非notify可能有些低效,但是這樣做更容易確保你的類的行為是正確的.

我們可以將之前的put和take操作進行優化,之前是每次put/take時通知,現在可以先檢查是否已經為空/滿然後在進行通知:

  public synchronized V take() throws InterruptedException {
        while (isEmpty()){
            wait();
        }
        V v = doTake();
        boolean wasFull = isFull();
        //如果滿了,才通知
        if(wasFull){
            notifyAll();
        }
        return v;
    }

儘管"依據條件通知"可以提升效能,但它畢竟只是一種小技巧(而且還讓子類的實現變得複雜),應謹慎使用.

單一的通知(notify)和"依據條件通知"都是優化行為.通常進行優化時應該遵循"先讓它跑起來,再讓它快起來--如果它還沒有足夠快"的原則:錯誤地進行優化很容易給程式帶來無法預料的活躍度失敗.

2.5 入口協議和出口協議

對於每個依賴於狀態的操作,以及每個修改了其他狀態的操作(對於每一個修改狀態的操作,並且其他操作對該狀態有狀態依賴),都應該為其定義並文件化一個入口協議和出口協議.

入口協議就是操作的條件謂詞;

出口協議涉及到要檢查任何被操作改變的狀態變數,確認它們是否引起其他一些條件謂詞變為真,如果是,通知相關的條件佇列.

AbstractQueuedSynchronizer採用了出口協議的概念,位於java.util.concurrent包下的大部分狀態依賴類都構建於它之上.

它沒有讓Synchronizer類自己去執行通知,而是要求同步方法返回一個值,讓這個值說明它的動作是否可能已經阻塞了一個或多個執行緒.這種顯示API的要求,可以避免發生在某些狀態轉換的過程中"忘記"執行通知.

3. 顯示的Condition物件

Condition是具體的內部條件佇列,和顯示鎖在某種角度上看差不多.

內部條件佇列有一些缺陷.每個內部鎖只能有一個與之相關聯的條件佇列,這意味著多個執行緒可能為了不同的條件謂詞在同一個條件佇列中等待,而且大多數常見的鎖模式都會暴露條件佇列物件.

如果你想編寫一個含有多個條件謂詞的併發物件,或者你想獲得比條件佇列的可見性之外更多的控制權,那麼顯示的Lock和Condition的實現類提供了一個比內部鎖和條件佇列更加靈活的選擇.

一個Condition和一個單獨的Lock相關聯,就像條件佇列和單獨的內部鎖相關聯一樣;

呼叫與Condition相關聯的Lock的Lock.newCondition方法,可以建立一個Condition.

如同Lock提供了比內部加鎖要豐富得多的特徵集一樣,Condition也提供了比內部條件佇列要豐富得多的特徵集:每個鎖可以有多個等待集(因await掛起的執行緒的集合)、可中斷/不可中斷的條件等待、基於時限的等待以及公平/非公平佇列之間的選擇.

不同於內部條件佇列,你可以讓每個Lock都有任意數量的Condition物件.Condition物件繼承了與之相關的鎖的公平性特性;如果是公平的鎖,執行緒會依照FIFO的順序從Condition.await中被釋放.

注意事項!!!:

wait、notify和notifyAll在Condition物件中的對等體是await、signal和signalAll.

但是,Condition繼承與Object,這意味著它也有wait和notify方法.

一定要確保使用了正確的版本--await和signal!

使用condition的例項:

public class ConditionBoundedBuffer<T> {
    protected  final Lock lock = new ReentrantLock();

    private final Condition notFull = lock.newCondition();

    private final Condition notEmpty = lock.newCondition();

    private final T[] items = (T[]) new Object[100];

    private int tail,head,count;


    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while(count == items.length){
                notFull.await();
            }
            items[tail] = x;
            if(++tail == items.length){
                tail = 0;
            }
            ++count;
            notEmpty.signal();
        }finally {
            lock.unlock();
        }

    }


    public T take() throws InterruptedException {
        lock.lock();
        try {
        while(count == 0){
            notEmpty.await();
        }
        T x = items[head];
        items[head] = null;
        if( ++head == items.length){
            head = 0;
        }
        -- count;
            notFull.signal();
            return  x;
        }finally {
            lock.unlock();
        }

    }
}

使用兩個Condition,notFull和notEmpty,明確地表示"非滿"與"非空"兩個條件謂詞.

使用Condition的方式具有更好的可讀性.Condition簡化了使用單一通知的條件.使用更有效的signal,而不是signalAll,這就會減少相當數量的上下文切換,而且每次快取操作都會出發對鎖的請求.

就像內建的鎖和條件佇列一樣,當使用顯示的Lock和Condition時,也必須要滿足鎖、條件謂詞和條件變數之間的三元關係:

涉及條件謂詞的變數必須由Lock保護,檢查條件謂詞時以及呼叫await和signal時,必須持有Lock物件.

顯示的Condition和內部條件佇列之間的選擇:

與在ReentrantLcok和Synchronized之間進行選擇是一樣的:如果你需要使用一些高階特性,比如公平佇列或者讓每個鎖對應多個等待集,這時使用Condition要好於使用內部條件佇列.(如果你需要使用ReentrantLock的高階特性,並已在使用它,那麼你已經做出來選擇.)

4. 剖析Synchronizer

ReentrantLock和Semaphore 有很多共同點,扮演了"閥門"的角色,每次只允許有限條目的執行緒通過它;

執行緒到達閥門後,可以允許通過(lock或acquire成功返回),可以等待(lock或acquire阻塞),也可以被取消(tryLock或tryAcquire返回false,指明在允許的時間內,鎖或者"許可"不可用).

更進一步,它們都允許可中斷的、不可中斷的、可限時的請求嘗試,它們也都允許選擇公平、非公平的等待執行緒佇列.

之所以有這麼多的共同點,是因為它們的實現都用到了一個共同的基類,AbstractQueuedSynchronizer(AQS)

AQS是一個用來構建鎖和Synchronizer的框架.使用AQS能夠簡單且高效的構造出應用廣泛的大量的Synchronizer.

不僅ReentrantLock和Semaphore是構建於AQS的,其他的還有CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask.

一個使用內部鎖,實現semaphore功能的例子:

public class SemaphoreOnLock {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private  int permit;

    public SemaphoreOnLock(int permit) {
        lock.lock();
        try {
            //條件謂詞加鎖保護
            this.permit = permit;
        }finally {
            lock.unlock();
        }

    }

    public void acquire() throws InterruptedException {
        lock.lock();
        try {
            //沒有許可集就阻塞
            if(permit<=0){
                condition.await();
            }
            -- permit;

        }finally {
            lock.unlock();
        }
    }


    public void release(){
        lock.lock();
        try {
            if (permit<=0){
                condition.signal();
            }
            ++permit;
        }finally {
            lock.unlock();
        }

    }
}

使用Semaphore也同樣可以實現內部鎖的功能,把許可集設定為1.

在SemaphoreOnLock中,請求許可的操作在兩個地方可能會阻塞:

  1. 訊號量的狀態正在被鎖保護著
  2. 許可不可用時

使用AQS構建的Synchronizer只可能在一個點上發生阻塞,這樣降低了上下文的開銷,並提高了吞吐量.

5. AbstractQueuedSynchronizer

一個基於AQS的Synchronizer所執行的基本操作,是一些不同形式的獲取(acquire)和釋放(release).

獲取操作是狀態依賴的操作,總能夠阻塞:以synchronized和semaphore舉例,獲取就是獲取鎖或者許可,並且呼叫者可能不得不去等待,直到Synchronizer處於可發生的狀態.

  • CountDownLatch的獲取意味著"等待,直到閉鎖到達它的終止態"
  • FutureTask則意味著"等待,直到任務已經完成".

"釋放"不是一個可阻塞的操作:"釋放"可以允許執行緒在請求執行前阻塞.(被獲取阻塞)

AQS管理同步類中的狀態:它管理一個關於狀態資訊的單一整數(比如返回負數代表錯誤,0代表獨佔鎖,正數代表非獨佔鎖),狀態資訊可以通過protected型別的getState,setState和compareAndSetState等方法操作.(compareAndSetState下篇部落格會詳細介紹)

不同的同步裝置用狀態表達不同的意思:

  • ReentrantLock用它來表現擁有它的執行緒已經請求了多少次鎖
  • Semaphore用它來表現剩餘的許可數
  • FutureTask用它來表現任務的狀態(尚未開始、執行、完成和取消).

Synchronizer也可以自己管理一些額外的狀態變數:

  • ReentrantLock儲存了當前鎖的所有者的追蹤資訊,這樣它就能區分出是重進入的(reentrant)還是競爭的(contended)條件鎖.

AQS的獲取操作可能是獨佔的,例如ReentrantLock,同一時刻只能有一個執行緒獲取;也可能是非獨佔的,就像Semaphore和CountDownLatch一樣.這取決於不同的Synchronizer.

一個獲取分為兩步:

  1. Synchronizer判斷當前狀態是否被獲得;如果是,就讓執行緒執行,如果不是,獲取操作阻塞或失敗.例如:想獲取鎖,鎖必須是未被佔有的;而如果想成功地獲取閉鎖,閉鎖必須未處於終止狀態.

  2. 獲取同步裝置以後,可能需要更新同步裝置的狀態;一個想獲取Synchronizer的執行緒會影響到其他執行緒是否能夠獲取它.例如:
  • 獲取鎖的操作將鎖的狀態從"未被佔有"改變為"已被佔有";
  • 從Semaphore中獲取許可的操作會減少剩餘許可的數量.
  • 另一方面,一個執行緒對閉鎖的請求操作卻不會影響到其它執行緒是否能夠獲取他,所以獲取閉鎖的操作不會改變閉鎖的狀態(前面兩個鎖和訊號量,都會在消耗完之後阻止其他執行緒繼續獲取,但是閉鎖可以無限獲取,不影響其他執行緒)

支援獨佔獲取的Synchronizer應該實現tryAcquire、tryRelease和isHeldExclusively這幾個受保護的方法.

支援共享獲取的Synchronizer應該實現tryAcquireShared和tryReleaseShared.

Synchronizer的子類會根據其acquire和release的語意,使用getState、setState以及compareAndSetState來檢查並更新狀態,然後通過返回的狀態值告訴基類這次"獲取"或"釋放"的嘗試是否成功.

ReentrantLock的內部類Sync,注意它的nonfairTryAcquire和tryRelease,
都是根據getState來進行判斷:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

下面的程式碼中的tryAcquireShared返回不同的值,代表不同的結果

  • 從tryAcquireShared返回一個負數,說明獲取操作失敗;
  • 返回零說明Synchronizer是被獨佔獲取的;
  • 返回正值說明Synchronizer是被非獨佔獲取的.
static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

對於tryRelease和tryReleaseShared方法來說,如果能夠釋放一些正在嘗試獲取Synchronizer的執行緒,解除這些執行緒的阻塞,那麼這兩個方法會返回true.

5.1 一個簡單的閉鎖

public class OneShotLatch {
    private final Sync sync = new Sync();

    public void signal(){
        sync.releaseShared(0);
    }


    public void await() throws InterruptedException {
        //此方法會請求tryAcquireShared()
        sync.acquireSharedInterruptibly(0);
    }


    private class Sync extends AbstractQueuedSynchronizer{

        @Override
        protected int tryAcquireShared(int ignored){
            // 如果閉鎖開啟則成功(state == 1),否則失敗
            return (getState() == 1) ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int ignored){
            //閉鎖現在已開啟
            setState(1);
            //現在其他執行緒可以獲得比索
            return true;
        }
    }
}

最初閉鎖是關閉的;任何呼叫await的執行緒都會阻塞,直到開啟閉鎖. 一旦閉鎖被一個signal呼叫開啟,等待中的執行緒就會被釋放,而且隨後到達閉鎖的執行緒也會被允許執行.

signal方法詳解:

呼叫signal方法,呼叫releaseShare把閉鎖的狀態開啟, 通過返回值表明Synchronizer處於完全被釋放的狀態.讓AQS要求所有等待中的執行緒嘗試去重新請求Synchronizer,並且由於tryAcquireShared會返回成功,所以這次請求會成功.

通過AQS提供的限時版本的獲取方法,可以給OneShotLatch提供顯示的請求操作以及檢查閉鎖狀態的能力.

以上的方法是通過委託實現的,直接擴充套件AQS也是可以的,但是存在其弊端:

  • 破壞OneShotLatch介面的簡潔性(只有兩個方法)
  • 雖然AQS的公共方法不允許呼叫者破壞閉鎖的狀態,呼叫者仍然很容易誤用它.

java.util.concurrent中沒有一個Synchronizer是直接擴充套件AQS的,它們都委託了AQS的私有內部子類.

6. java.util.concurrent的Synchronizer類中的AQS

簡單的看看這些類是如何使用AQS的

6.1 ReentrantLock

ReentrantLock使用同步狀態持有鎖獲取操作的計數,還維護一個owner變數來持有當前擁有的執行緒識別符號.

只有當前執行緒剛剛獲取到鎖,或者剛剛釋放了鎖的時候,才會修改owner.

非公平的ReentrantLock中tryAcquire的實現:

protected boolean tryAcquire(int ignored){
        final Thread current = Thread.currentThread();
        int c = getState();
        if(c == 0){
            if(compareAndSetState(0,1)){
                owner = current;
                return true;
            }
        }else if(current == owner){
            setState(c+1);
            return true;
        }
        return false ;
    }

在tryRelease中,它檢查owner域以確保當前執行緒在執行一個unlock操作之前,已經擁有了鎖;

在tryAcquire中,它使用這個域來區分重進入的獲取操作嘗試與競爭的獲取操作嘗試.

當一個執行緒嘗試獲取鎖時,tryAcquire會首先請求鎖的狀態:

  • 如果鎖未被佔有,它會嘗試更新鎖的狀態,表明鎖被佔有.因為狀態可能在被觀察後的幾條指令中被修改,所以tryAcquire使用compareAndSetState來嘗試原子地更新狀態,表明這個鎖已經被佔有,並確保狀態自最後一次觀察後沒有被修改過.
  • 如果鎖狀態表明它已經被佔有,如果當前執行緒是鎖的持有者,那麼獲取操作計數會遞增;如果當前執行緒不是鎖的持有者,那麼獲取操作的嘗試會失敗.

6.2 Semaphore和CountDownLatch

Semaphore使用AQS型別的同步狀態持有當前可用許可的數量,tryAcquireShared方法首先計算剩餘許可的數量

  • 如果沒有足夠的許可,會返回一個值,表明獲取操作失敗.
  • 如果還有充足的許可剩餘,tryAcquireShared會使用compareAndSetState,嘗試原子地遞減許可的計數.

如果成功會返回一個值,表明獲取操作成功.

返回值同樣加入了是否允許其他共享獲取嘗試能否成功的資訊,如果可以的話,其他等待的執行緒同樣會解除阻塞.

無論是沒有足夠的許可,還是tryAcquireShared可以原子地更新許可數,以響應獲取操作,while迴圈都會終止.儘管任何給定的compareAndSetState呼叫,都可能由於與另一個執行緒的競爭而失敗,這使它會重試,在重試過合理的次數後,兩個終止條件的一個會變成真.

類似地,tryReleaseShared會遞增許可計數,這會潛在地解除等待中的執行緒的阻塞,不斷地重試直到成功地更新.tryReleaseShared的返回值表明,釋放操作是否可以解除其它執行緒的阻塞.

Semaphore的tryAcquireShared和tryAcquireShared方法:

protected int tryAcquireShared(int acquires){
        while(true){
            int available = getState();
            int remaining = available - acquires;
            if(remaining < 0 || compareAndSetState(available,remaining)){
                return remaining;
            }
        }
    }

    protected boolean tryReleaseShared(int releases){
        while(true){
            int p = getState();
            if(compareAndSetState(p,p+releases)){
                return true;
            }
        }
    }

CountDownLatch使用AQS的方式與Semaphore相似:同步狀態持有當前的計數.

countDown方法呼叫release,後者會導致計數器遞減,並且在計數器已經到達零的時候,解除所有等待執行緒的阻塞,release無法阻塞執行緒,也就是無論呼叫多少次countDown方法都不會阻塞執行緒,只有呼叫await的時候,並且未消耗點許可集的時候,才會造成阻塞;await呼叫acquire,如果計數器已經到達零,acquire會立即返回,否則它會被阻塞.

6.3 FutureTask

Future.get的語意非常類似於閉鎖--如果發生了某些事件(FutureTask表現的任務的完成或取消),執行緒就可以執行,否則執行緒會留在佇列中,直到有事件發生.

FutureTask使用AQS型別的同步狀態來持有任務的狀態--執行、完成或取消.

FutureTask也維護了一些額外的狀態變數,來持有計算的結果或者丟擲的異常.它還維護了一個引用,指向正在執行計算任務的執行緒(如果它當前正處於執行狀態),這樣如果任務被取消,就可以中斷該執行緒.

6.4 ReentrantReadWriteLock

ReadWriteLock的介面要求了兩個鎖---一個讀者鎖和一個寫者鎖.

但是在基於AQS的ReentrantReadWriteLock實現中,一個單獨的AQS子類管理了讀和寫的加鎖.

ReentrantReadWriteLock使用一個16位的狀態為寫鎖(write-lock)計數,使用另一個16位的狀態為讀鎖(read-lock)計數.

對讀鎖的操作使用共享的獲取與釋放的方法;對寫鎖的操作使用獨佔的獲取與釋放的方法.

AQS在內部維護一個等待執行緒的佇列,持續追蹤一個執行緒是否被獨佔請求,或者被共享訪問.

在ReentrantReadWriteLock中,當鎖可用時,如果位於佇列頭部的執行緒同時也正在準備訪問,執行緒會得到鎖;

如果位於佇列頭部的執行緒正在準備訪問,那麼佇列中所有首個執行緒之前的執行緒都會得到鎖.

總結

如果你需要實現一個依賴於狀態的類---如果不能滿足依賴於狀態的前提條件,類的方法必須阻塞.

最佳的策略通常是將它構建於現有的庫類之上,比如Semaphore、BlockingQueue或者CountDownLatch.

但是,有時現有的庫類不能提供足夠的功能;在這種情況之下,你可以使用內部條件佇列、顯式Condition物件或者AbstractQueuedSynchronizer,來構建屬於自己的Synchronizer.

由於"管理狀態的獨立性"機制必須緊密依賴於"確保狀態一致性"機制,所以內部條件佇列與內部鎖緊密地繫結到了一起.

類似地,顯式的Condition與顯示的Lock也是緊密地繫結到一起的,相比於內部條件佇列,它還提供了一個可擴充套件的特徵集,包括"多等待集每鎖",可中斷或不可中斷的條件等待,公平或非公平的佇列,以及基於最終時限的等待.

 

鄭州不孕不育正規醫院

鄭州不育不孕醫院

鄭州不孕不育哪個好

鄭州不孕不育哪家好