1. 程式人生 > >AbstractQueuedSynchronizer的實現分析(下)

AbstractQueuedSynchronizer的實現分析(下)

原文:http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer

前言

經過本系列的上半部分JDK1.8 AbstractQueuedSynchronizer的實現分析(上)的解讀,相信很多讀者已經對AbstractQueuedSynchronizer(下文簡稱AQS)的獨佔功能瞭然於胸,那麼這次我們通過對另一個工具類:CountDownLatch的分析來解讀AQS的另外一個功能:共享功能。

AQS共享功能的實現

在開始解讀AQS的共享功能前,我們再重溫一下CountDownLatch,CountDownLatch為java.util.concurrent包下的計數器工具類,常被用在多執行緒環境下,它在初始時需要指定一個計數器的大小,然後可被多個執行緒併發的實現減1操作,並在計數器為0後呼叫await方法的執行緒被喚醒,從而實現多執行緒間的協作。它在多執行緒環境下的基本使用方式為:

//main thread
      // 新建一個CountDownLatch,並指制定一個初始大小
      CountDownLatch countDownLatch = new CountDownLatch(3);
      // 呼叫await方法後,main執行緒將阻塞在這裡,直到countDownLatch 中的計數為0 
      countDownLatch.await();
      System.out.println("over");

     //thread1
     // do something 
     //...........
     //呼叫countDown方法,將計數減1
      countDownLatch.countDown();


     //thread2
     // do something 
     //...........
     //呼叫countDown方法,將計數減1
      countDownLatch.countDown();

       //thread3
     // do something 
     //...........
     //呼叫countDown方法,將計數減1
      countDownLatch.countDown();

注意,執行緒thread 1,2,3各自呼叫 countDown後,countDownLatch 的計數為0,await方法返回,控制檯輸入“over”,在此之前main thread 會一直沉睡。

可以看到CountDownLatch的作用類似於一個“欄柵”,在CountDownLatch的計數為0前,呼叫await方法的執行緒將一直阻塞,直到CountDownLatch計數為0,await方法才會返回,

而CountDownLatch的countDown()方法則一般由各個執行緒呼叫,實現CountDownLatch計數的減1。

知道了CountDownLatch的基本使用方式,我們就從上述DEMO的第一行new CountDownLatch(3)開始,看看CountDownLatch是怎麼實現的。

首先,看下CountDownLatch的構造方法:

和ReentrantLock類似,CountDownLatch內部也有一個叫做Sync的內部類,同樣也是用它繼承了AQS。

再看下Sync:

如果你看過本系列的上半部分,你對setState方法一定不會陌生,它是AQS的一個“狀態位”,在不同的場景下,代表不同的含義,比如在ReentrantLock中,表示加鎖的次數,在CountDownLatch中,則表示CountDownLatch的計數器的初始大小。

設定完計數器大小後CountDownLatch的構造方法返回,下面我們再看下CountDownLatch的await()方法:

呼叫了Sync的acquireSharedInterruptibly方法,因為Sync是AQS子類的原因,這裡其實是直接呼叫了AQS的acquireSharedInterruptibly方法:

從方法名上看,這個方法的呼叫是響應執行緒的打斷的,所以在前兩行會檢查下執行緒是否被打斷。接著,嘗試著獲取共享鎖,小於0,表示獲取失敗,通過本系列的上半部分的解讀, 我們知道AQS在獲取鎖的思路是,先嚐試直接獲取鎖,如果失敗會將當前執行緒放在佇列中,按照FIFO的原則等待鎖。而對於共享鎖也是這個思路,如果和獨佔鎖一致,這裡的tryAcquireShared應該是個空方法,留給子類去判斷:

再看看CountDownLatch:

如果state變成0了,則返回1,表示獲取成功,否則返回-1則表示獲取失敗。

看到這裡,讀者可能會發現, await方法的獲取方式更像是在獲取一個獨佔鎖,那為什麼這裡還會用tryAcquireShared呢?

回想下CountDownLatch的await方法是不是隻能在主執行緒中呼叫?答案是否定的,CountDownLatch的await方法可以在多個執行緒中呼叫,當CountDownLatch的計數器為0後,呼叫await的方法都會依次返回。 也就是說可以多個執行緒同時在等待await方法返回,所以它被設計成了實現tryAcquireShared方法,獲取的是一個共享鎖,鎖在所有呼叫await方法的執行緒間共享,所以叫共享鎖。

回到acquireSharedInterruptibly方法:

如果獲取共享鎖失敗(返回了-1,說明state不為0,也就是CountDownLatch的計數器還不為0),進入呼叫doAcquireSharedInterruptibly方法中,按照我們上述的猜想,應該是要將當前執行緒放入到佇列中去。

在這之前,我們再回顧一下AQS佇列的資料結構:AQS是一個雙向連結串列,通過節點中的next,pre變數分別指向當前節點後一個節點和前一個節點。其中,每個節點中都包含了一個執行緒和一個型別變數:表示當前節點是獨佔節點還是共享節點,頭節點中的執行緒為正在佔有鎖的執行緒,而後的所有節點的執行緒表示為正在等待獲取鎖的執行緒。如下圖所示:

黃色節點為頭節點,表示正在獲取鎖的節點,剩下的藍色節點(Node1、Node2、Node3)為正在等待鎖的節點,他們通過各自的next、pre變數分別指向前後節點,形成了AQS中的雙向連結串列。每個執行緒被加上型別(共享還是獨佔)後便是一個Node, 也就是本文中說的節點。

再看看doAcquireSharedInterruptibly方法:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); 
//將當前執行緒包裝為型別為Node.SHARED的節點,標示這是一個共享節點。
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
//如果新建節點的前一個節點,就是Head,說明當前節點是AQS佇列中等待獲取鎖的第一個節點,
//按照FIFO的原則,可以直接嘗試獲取鎖。
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); 
//獲取成功,需要將當前節點設定為AQS佇列中的第一個節點,這是AQS的規則//佇列的頭節點表示正在獲取鎖的節點
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && //檢查下是否需要將當前節點掛起
                    parkAndCheckInterrupt()) 
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這裡有幾點需要說明的:

1. setHeadAndPropagate方法:

首先,使用了CAS更換了頭節點,然後,將當前節點的下一個節點取出來,如果同樣是“shared”型別的,再做一個"releaseShared"操作。

看下doReleaseShared方法:

for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { 
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
//如果當前節點是SIGNAL意味著,它正在等待一個訊號,  
//或者說,它在等待被喚醒,因此做兩件事,1是重置waitStatus標誌位,2是重置成功後,喚醒下一個節點。
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
//如果本身頭節點的waitStatus是出於重置狀態(waitStatus==0)的,將其設定為“傳播”狀態。
//意味著需要將狀態向後一個節點傳播。
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }

為什麼要這麼做呢?這就是共享功能和獨佔功能最不一樣的地方,對於獨佔功能來說,有且只有一個執行緒(通常只對應一個節點,拿ReentantLock舉例,如果當前持有鎖的執行緒重複呼叫lock()方法,那根據本系列上半部分我們的介紹,我們知道,會被包裝成多個節點在AQS的佇列中,所以用一個執行緒來描述更準確),能夠獲取鎖,但是對於共享功能來說。

共享的狀態是可以被共享的,也就是意味著其他AQS佇列中的其他節點也應能第一時間知道狀態的變化。因此,一個節點獲取到共享狀態流程圖是這樣的:

比如現在有如下佇列:

當Node1呼叫tryAcquireShared成功後,更換了頭節點:

     Node1變成了頭節點然後呼叫unparkSuccessor()方法喚醒了Node2、Node2中持有的執行緒A出於上面流程圖的park node的位置,

執行緒A被喚醒後,重複黃色線條的流程,重新檢查呼叫tryAcquireShared方法,看能否成功,如果成功,則又更改頭節點,重複以上步驟,以實現節點自身獲取共享鎖成功後,喚醒下一個共享型別節點的操作,實現共享狀態的向後傳遞。

2.其實對於doAcquireShared方法,AQS還提供了集中類似的實現:

分別對應了:

  1. 帶引數請求共享鎖。 (忽略中斷)
  2. 帶引數請求共享鎖,且響應中斷。(每次迴圈時,會檢查當前執行緒的中斷狀態,以實現對執行緒中斷的響應)
  3. 帶引數請求共享鎖但是限制等待時間。(第二個引數設定超時時間,超出時間後,方法返回。)

比較特別的為最後一個doAcquireSharedNanos方法,我們一起看下它怎麼實現超時時間的控制的。

因為該方法和其餘獲取共享鎖的方法邏輯是類似的,我用紅色框圈出了它所不一樣的地方,也就是實現超時時間控制的地方。

可以看到,其實就是在進入方法時,計算出了一個“deadline”,每次迴圈的時候用當前時間和“deadline”比較,大於“dealine”說明超時時間已到,直接返回方法。

注意,最後一個紅框中的這行程式碼:

nanosTimeout > spinForTimeoutThreshold

從變數的字面意思可知,這是拿超時時間和超時自旋的最小作比較,在這裡Doug Lea把超時自旋的閾值設定成了1000ns,即只有超時時間大於1000ns才會去掛起執行緒,否則,再次迴圈,以實現“自旋”操作。這是“自旋”在AQS中的應用之處。

看完await方法,我們再來看下countDown()方法:

呼叫了AQS的releaseShared方法,並傳入了引數1:

同樣先嚐試去釋放鎖,tryReleaseShared同樣為空方法,留給子類自己去實現,以下是CountDownLatch的內部類Sync的實現:

死迴圈更新state的值,實現state的減1操作,之所以用死迴圈是為了確保state值的更新成功。

從上文的分析中可知,如果state的值為0,在CountDownLatch中意味:所有的子執行緒已經執行完畢,這個時候可以喚醒呼叫await()方法的執行緒了,而這些執行緒正在AQS的佇列中,並被掛起的,

所以下一步應該去喚醒AQS佇列中的頭節點了(AQS的佇列為FIFO佇列),然後由頭節點去依次喚醒AQS佇列中的其他共享節點。

如果tryReleaseShared返回true,進入doReleaseShared()方法:

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { 
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
//如果當前節點是SIGNAL意味著,它正在等待一個訊號,
 //或者說,它在等待被喚醒,因此做兩件事,1是重置waitStatus標誌位,2是重置成功後,喚醒下一個節點。
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
//如果本身頭節點的waitStatus是出於重置狀態(waitStatus==0)的,將其設定為“傳播”狀態。
//意味著需要將狀態向後一個節點傳播。
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
  }

當執行緒被喚醒後,會重新嘗試獲取共享鎖,而對於CountDownLatch執行緒獲取共享鎖判斷依據是state是否為0,而這個時候顯然state已經變成了0,因此可以順利獲取共享鎖並且依次喚醒AQS隊裡中後面的節點及對應的執行緒。

總結

本文從CountDownLatch入手,深入分析了AQS關於共享鎖方面的實現方式:

如果獲取共享鎖失敗後,將請求共享鎖的執行緒封裝成Node物件放入AQS的佇列中,並掛起Node物件對應的執行緒,實現請求鎖執行緒的等待操作。待共享鎖可以被獲取後,從頭節點開始,依次喚醒頭節點及其以後的所有共享型別的節點。實現共享狀態的傳播。

這裡有幾點值得注意:

  1. 與AQS的獨佔功能一樣,共享鎖是否可以被獲取的判斷為空方法,交由子類去實現。
  2. 與AQS的獨佔功能不同,當鎖被頭節點獲取後,獨佔功能是隻有頭節點獲取鎖,其餘節點的執行緒繼續沉睡,等待鎖被釋放後,才會喚醒下一個節點的執行緒,而共享功能是隻要頭節點獲取鎖成功,就在喚醒自身節點對應的執行緒的同時,繼續喚醒AQS佇列中的下一個節點的執行緒,每個節點在喚醒自身的同時還會喚醒下一個節點對應的執行緒,以實現共享狀態的“向後傳播”,從而實現共享功能。

以上的分析都是從AQS子類的角度去看待AQS的部分功能的,而如果直接看待AQS,或許可以這麼去解讀:

首先,AQS並不關心“是什麼鎖”,對於AQS來說它只是實現了一系列的用於判斷“資源”是否可以訪問的API,並且封裝了在“訪問資源”受限時將請求訪問的執行緒的加入佇列、掛起、喚醒等操作, AQS只關心“資源不可以訪問時,怎麼處理?”、“資源是可以被同時訪問,還是在同一時間只能被一個執行緒訪問?”、“如果有執行緒等不及資源了,怎麼從AQS的佇列中退出?”等一系列圍繞資源訪問的問題,而至於“資源是否可以被訪問?”這個問題則交給AQS的子類去實現。

當AQS的子類是實現獨佔功能時,例如ReentrantLock,“資源是否可以被訪問”被定義為只要AQS的state變數不為0,並且持有鎖的執行緒不是當前執行緒,則代表資源不能訪問。

當AQS的子類是實現共享功能時,例如:CountDownLatch,“資源是否可以被訪問”被定義為只要AQS的state變數不為0,說明資源不能訪問。

這是典型的將規則和操作分開的設計思路:規則子類定義,操作邏輯因為具有公用性,放在父類中去封裝。

當然,正式因為AQS只是關心“資源在什麼條件下可被訪問”,所以子類還可以同時使用AQS的共享功能和獨佔功能的API以實現更為複雜的功能。

比如:ReentrantReadWriteLock,我們知道ReentrantReadWriteLock的中也有一個叫Sync的內部類繼承了AQS,而AQS的佇列可以同時存放共享鎖和獨佔鎖,對於ReentrantReadWriteLock來說分別代表讀鎖和寫鎖,當佇列中的頭節點為讀鎖時,代表讀操作可以執行,而寫操作不能執行,因此請求寫操作的執行緒會被掛起,當讀操作依次推出後,寫鎖成為頭節點,請求寫操作的執行緒被喚醒,可以執行寫操作,而此時的讀請求將被封裝成Node放入AQS的佇列中。如此往復,實現讀寫鎖的讀寫交替進行。

而本系列文章上半部分提到的FutureTask,其實思路也是:封裝一個存放執行緒執行結果的變數A,使用AQS的獨佔API實現執行緒對變數A的獨佔訪問,判斷規則是,執行緒沒有執行完畢:call()方法沒有返回前,不能訪問變數A,或者是超時時間沒到前不能訪問變數A(這就是FutureTask的get方法可以實現獲取執行緒執行結果時,設定超時時間的原因)。

綜上所述,本系列文章從AQS獨佔鎖和共享鎖兩個方面深入分析了AQS的實現方式和獨特的設計思路,希望對讀者有啟發,下一篇文章,我們將繼續JDK 1.8下 J.U.C (java.util.concurrent)包中的其他工具類,敬請期待。

感謝郭蕾對本文的策劃和審校。

給InfoQ中文站投稿或者參與內容翻譯工作,請郵件至[email protected]。也歡迎大家通過新浪微博(@InfoQ)或者騰訊微博(@InfoQ)關注我們,並與我們的編輯和其他讀者朋友交流。

相關推薦

深度解析Java8 – AbstractQueuedSynchronizer實現分析

本文首發在infoQ    作者:劉錕洋 前言 經過本系列的上半部分JDK1.8 AbstractQueuedSynchronizer的實現分析(上)的解讀,相信很多讀者已經對AbstractQueuedSynchronizer(下文簡稱AQS)的獨佔功能瞭然於胸,那麼,這次我們再借助另一個工

深度解析Java 8:AbstractQueuedSynchronizer實現分析

前言 經過本系列的上半部分JDK1.8 AbstractQueuedSynchronizer的實現分析(上)的解讀,相信很多讀者已經對AbstractQueuedSynchronizer(下文簡稱AQS)的獨佔功能瞭然於胸,那麼這次我們通過對另一個工具類:CountDo

AbstractQueuedSynchronizer實現分析

原文:http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer 前言 經過本系列的上半部分JDK1.8 AbstractQueuedSynchronizer的實現分析(上)的解讀,相信很多讀者已經對A

【資料結構與演算法-java實現】二 複雜度分析:最好、最壞、平均、均攤時間複雜度的概念

上一篇文章學習了:如何分析、統計演算法的執行效率和資源消耗? 點選連結檢視上一篇文章:複雜度分析上 今天的文章學習以下內容: 最好情況時間複雜度 最壞情況時間複雜度 平均情況時間複雜度 均攤時間複雜度 1、最好與最壞情況時間複雜度 我們首先

深度解析Java8 – AbstractQueuedSynchronizer實現分析

前言: Java中的FutureTask作為可非同步執行任務並可獲取執行結果而被大家所熟知,通常可以使用future.get()來獲取執行緒的執行結果,線上程執行結束之前,get方法會一直阻塞狀態,直到call()返回,其優點是使用執行緒非同步執行任務的情況下還可以獲取到執行緒的執行結果,但

深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer實現分析

前言 Java中的FutureTask作為可非同步執行任務並可獲取執行結果而被大家所熟知。通常可以使用future.get()來獲取執行緒的執行結果,線上程執行結束之前,get方法會一直阻塞狀態,直到call()返回,其優點是使用執行緒非同步執行任務的情況下還可以獲取到

Linux核心--網路棧實現分析--網路層之IP協議

本文分析基於Linux Kernel 1.2.13作者:閆明注:標題中的”(上)“,”(下)“表示分析過程基於資料包的傳遞方向:”(上)“表示分析是從底層向上分析、”(下)“表示分析是從上向下分析。上篇博文分析傳輸層最終會呼叫函式ip_queue_xmit()函式,將傳送資料

ThreadPoolExecutor的應用和實現分析—— 任務處理相關源碼分析

stateless 自身 tran als row exce 繼承 break attribute 轉自:http://www.tuicool.com/articles/rmqYjq 前面一篇文章從Executors中的工廠方法入手,已經對ThreadPoolExecuto

Java的LockSupport.park()實現分析轉載

兩個 這也 his access 需要 tracking orm return 指令 LockSupport類是Java6(JSR166-JUC)引入的一個類,提供了基本的線程同步原語。LockSupport實際上是調用了Unsafe類裏的函數,歸結到Unsafe裏,只有

阿裏雲PolarDB及其共享存儲PolarFS技術實現分析

並發 存儲層 操作 先來 相關操作 關於 vld lan 負載均衡 PolarDB是阿裏雲基於MySQL推出的雲原生數據庫(Cloud Native Database)產品,通過將數據庫中計算和存儲分離,多個計算節點訪問同一份存儲數據的方式來解決目前MySQL數據庫存在的運

C++筆記 第九課 函式過載分析---狄泰學院

如果在閱讀過程中發現有錯誤,望評論指正,希望大家一起學習,一起進步。 學習C++編譯環境:Linux 第九課 函式過載分析(下) 1.過載與指標 下面的函式指標將儲存哪個函式的地址?第一個 函式過載遇上函式指標 將過載函式名賦值給函式指標時 1.根據過載規則挑選與函式指標引

【資料運營】在運營中,為什麼文字分析遠比數值型分析重要?一個實際案例,五點分析

https://www.pmcaff.com/article/index/408451832537216?from=profile 本文是《資料分析中,文字分析遠比數值型分析重要!》的下篇,以一個實際案例來聊聊文字分析在實際運營中如何落地。行為脈絡如下:先簡要講述文字分析的分支---情緒分析的基本原

資料結構和算法系列3--複雜度分析

複雜度分析的4個概念 1.最壞情況時間複雜度:程式碼在最理想情況下執行的時間複雜度。 2.最好情況時間複雜度:程式碼在最壞情況下執行的時間複雜度。 3.平均時間複雜度:用程式碼在所有情況下執行的次數的加權平均值表示。 4.均攤時間複雜度:在程式碼執行的所有複雜度情況中絕大部分是低級別的複

阿里雲PolarDB及其共享儲存PolarFS技術實現分析

PolarDB是阿里雲基於MySQL推出的雲原生資料庫(Cloud Native Database)產品,通過將資料庫中計算和儲存分離,多個計算節點訪問同一份儲存資料的方式來解決目前MySQL資料庫存在的運維和擴充套件性問題;通過引入RDMA和SPDK等新硬體來改造傳統的網路和IO協議棧來極大提升資料庫效能。

第四章 語法分析——LR文法

文章目錄 概述 基本概念 移動-歸約語法分析技術 SLR 增廣文法 狀態內部擴充套件 狀態之間的擴充套件 構建分析表 ACTION 構造SLR語法分析表

flowable EngineConfiguration的實現分析2

EngineConfiguration的實現類是一個抽象類:AbstractEngineConfiguration 一、引擎配置的分類 繼承 AbsractEngineConfiguration的子類實現不同方面的功能,包括: 1、身份管理引擎配置 2、表單引擎配置

matlab第八課:影象分析

目標: 影象閾值 背景評估 聯通區域標記 一、影象閾值 graythresh():找出一個影象的最佳閾值是什麼 im2bw():轉變影象為二值影象 I = imread('rice.png'); level=graythresh(I); % 獲

編譯原理 第三章 詞法分析

3.6 有窮自動機(非常重要) 3.6.1 不確定的有窮自動機(重要) 例:  狀態0是開始狀態, 在狀態0上輸入符號b會進入狀態0,輸入a可能進去狀態0也有可能進入狀態1。所以對於狀態0來說一個確定的輸入符號a他有兩種離開狀態,這就是一種不確定的狀態。   &nbs

C++筆記 第五十一課 C++物件模型分析---狄泰學院

如果在閱讀過程中發現有錯誤,望評論指正,希望大家一起學習,一起進步。 學習C++編譯環境:Linux 第五十一課 C++物件模型分析(下) 1.繼承物件模型 在C++編譯器的內部類可以理解為結構體 子類是由父類成員疊加子類新成員得到的 51-1 繼承物件模型初探 #

Redisson 分散式鎖實現分析

  設計分散式鎖要注意的問題 互斥 分散式系統中執行著多個節點,必須確保在同一時刻只能有一個節點的一個執行緒獲得鎖,這是最基本的一點。 死鎖 分散式系統中,可能產生死鎖的情況要相對複雜一些。分散式系統是處在複雜網路環境中的,當一個節點獲取到鎖,如果它在釋放鎖之前掛掉了,