深度解析Java8 – AbstractQueuedSynchronizer的實現分析(下)
前言
經過本系列的上半部分JDK1.8 AbstractQueuedSynchronizer的實現分析(上)的解讀,相信很多讀者已經對AbstractQueuedSynchronizer(下文簡稱AQS)的獨佔功能瞭然於胸,那麼,這次我們再借助另一個工具類:CoutDownLatch,換個角度看看AQS的另外一個重要功能——共享功能的實現。
AQS共享功能的實現
在開始解讀AQS的共享功能前,我們再重溫一下CountDownLatch,CountDownLatch為java.util.concurrent包下的計數器工具類,常被用在多執行緒環境下,它在初始時需要指定一個計數器的大小,然後可被多個執行緒併發的實現減1操作,並在計數器為0後呼叫await方法的執行緒被喚醒,從而實現多執行緒間的協作。它在多執行緒環境下的基本使用方式為:
//main thread // 新建一個CountDownLatch,並制定一個初始大小 CountDownLatch countDownLatch = new CountDownLatch(3); // 呼叫await方法後,main執行緒將阻塞在這裡,直到countDownLatch 中的計數為0 countDownLatch.await(); System.out.println("over"); //thread1 // do something //........... //呼叫countDown方法,將計數減1 countDownLatch.countDown(); //thread2 // do something //........... //呼叫countDown方法,將計數減1 countDownLatch.countDown(); //thread3 // do something //........... //呼叫countDown方法,將計數減1 countDownLatch.countDown();
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //將當前執行緒包裝為型別為Node.SHARED的節點,標示這是一個共享節點。 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) {//如果新建節點的前一個節點,就是Head,說明當前節點是AQS佇列中等待獲取鎖的第一個節點,按照FIFO的原則,可以直接嘗試獲取鎖。 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); //獲取成功,需要將當前節點設定為AQS佇列中的第一個節點,這是AQS的規則,佇列的頭節點表示正在獲取鎖的節點 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && //檢查下是否需要將當前節點掛起 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }這裡有幾點需要說明的: 1. setHeadAndPropagate方法:
首先,使用了CAS更換了頭節點,然後,將當前節點的下一個節點取出來,如果同樣是“shared”型別的,再做一個”releaseShared”操作。看下doReleaseShared方法:
for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //如果當前節點是SIGNAL意味著,它正在等待一個訊號, //或者說,它在等待被喚醒,因此做兩件事, //1是重置waitStatus標誌位,2是重置成功後,喚醒下一個節點。 continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果本身頭結點的waitStatus是出於重置狀態(waitStatus==0)的,將其設定為“傳播”狀態。意味著需要將狀態向後一個節點傳播。 continue; // loop on failed CAS } if (h == head) // loop if head changed break; }為什麼要這麼做呢?這就是共享功能和獨佔功能最不一樣的地方,對於獨佔功能來說,有且只有一個執行緒(通常只對應一個節點,拿ReentantLock舉例,如果當前持有鎖的執行緒重複呼叫lock()方法, 那根據本系列上半部分我們的介紹,我們知道,會被包裝成多個節點在AQS的佇列中,所以用一個執行緒來描述更準確),能夠獲取鎖,但是對於共享功能來說。 共享的狀態是可以被共享的,也就是意味著其他AQS佇列中的其他節點也應能第一時間知道狀態的變化。因此,一個節點獲取到共享狀態流程圖是這樣的: 比如現在有如下佇列: 當Node1呼叫tryAcquireShared成功後,更換了頭節點:
Node1變成了頭節點然後呼叫unparkSuccessor()方法喚醒了Node2,Node2中持有的執行緒A出於上面流程圖的park node的位置,
執行緒A被喚醒後,重複黃色線條的流程,重新檢查呼叫tryAcquireShared方法,看能否成功,如果成功,則又更改頭結點,重複以上步驟,以實現節點自身獲取共享鎖成功後,喚醒下一個共享型別結點的操作,實現共享狀態的向後傳遞。
2.其實對於doAcquireShared方法,AQS還提供了集中類似的實現:
分別對應了:
1. 帶引數請求共享鎖。 (忽略中斷)
2. 帶引數請求共享鎖,且響應中斷。(每次迴圈時,會檢查當前執行緒的中斷狀態,以實現對執行緒中斷的響應)
3. 帶引數請求共享鎖但是限制等待時間。(第二個引數設定超時時間,超出時間後,方法返回。)
比較特別的為最後一個doAcquireSharedNanos方法,我們一起看下它怎麼實現超時時間的控制的。
因為該方法和其餘獲取共享鎖的方法邏輯是類似的,我用紅色框圈出了它所不一樣的地方,也就是實現超時時間控制的地方。
可以看到,其實就是在進入方法時,計算出了一個“deadline”,每次迴圈的時候用當前時間和“deadline”比較,大於“dealine”說明超時時間已到,直接返回方法。
注意,最後一個紅框中的這行程式碼:
nanosTimeout > spinForTimeoutThreshold
從變數的字面意思可知,這是拿超時時間和超時自旋的最小閥值作比較,在這裡Doug Lea把超時自旋的閥值設定成了1000ns,即只有超時時間大於1000ns才會去掛起執行緒,否則,再次迴圈,以實現“自旋”操作。這是“自旋”在AQS中的應用之處。
看完await方法,我們再來看下countDown()方法:
呼叫了AQS的releaseShared方法,並傳入了引數1: 同樣先嚐試去釋放鎖,tryReleaseShared同樣為空方法,留給子類自己去實現,以下是CountDownLatch的內部類Sync的實現:死迴圈更新state的值,實現state的減1操作,之所以用死迴圈是為了確保state值的更新成功。
從上文的分析中可知,如果state的值為0,在CountDownLatch中意味:所有的子執行緒已經執行完畢,這個時候可以喚醒呼叫await()方法的執行緒了,而這些執行緒正在AQS的佇列中,並被掛起的,
所以下一步應該去喚醒AQS佇列中的頭結點了(AQS的佇列為FIFO佇列),然後由頭節點去依次喚醒AQS佇列中的其他共享節點。如果tryReleaseShared返回true,進入doReleaseShared()方法:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //如果當前節點是SIGNAL意味著,它正在等待一個訊號, //或者說,它在等待被喚醒,因此做兩件事, //1是重置waitStatus標誌位,2是重置成功後,喚醒下一個節點。 continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果本身頭結點的waitStatus是出於重置狀態(waitStatus==0)的,將其設定為“傳播”狀態。意味著需要將狀態向後一個節點傳播。 continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }當執行緒被喚醒後,會重新嘗試獲取共享鎖,而對於CountDownLatch執行緒獲取共享鎖判斷依據是state是否為0,而這個時候顯然state已經變成了0,因此可以順利獲取共享鎖並且依次喚醒AQS隊裡中後面的節點及對應的執行緒。
總結
本文從CountDownLatch入手,深入分析了AQS關於共享鎖方面的實現方式: 如果獲取共享鎖失敗後,將請求共享鎖的執行緒封裝成Node物件放入AQS的佇列中,並掛起Node物件對應的執行緒,實現請求鎖執行緒的等待操作。待共享鎖可以被獲取後,從頭節點開始,依次喚醒頭節點及其以後的所有共享型別的節點。實現共享狀態的傳播。這裡有幾點值得注意:
1. 與AQS的獨佔功能一樣,共享鎖是否可以被獲取的判斷為空方法,交由子類去實現。
2. 與AQS的獨佔功能不同,當鎖被頭節點獲取後,獨佔功能是隻有頭節點獲取鎖,其餘節點的執行緒繼續沉睡,等待鎖被釋放後,才會喚醒下一個節點的執行緒,而共享功能是隻要頭節點獲取鎖成功,就在喚醒自身節點對應的執行緒的同時,繼續喚醒AQS佇列中的下一個節點的執行緒,每個節點在喚醒自身的同時還會喚醒下一個節點對應的執行緒,以實現共享狀態的“向後傳播”,從而實現共享功能。
以上的分析都是從AQS子類的角度去看待AQS的部分功能的,而如果直接看待AQS,或許可以這麼去解讀:
首先,AQS並不關心“是什麼鎖”,對於AQS來說它只是實現了一系列的用於判斷“資源”是否可以訪問的API,並且封裝了在“訪問資源”受限時將請求訪問的執行緒的加入佇列、掛起、喚醒等操作, AQS只關心“資源不可以訪問時,怎麼處理?”、“資源是可以被同時訪問,還是在同一時間只能被一個執行緒訪問?”、“如果有執行緒等不及資源了,怎麼從AQS的佇列中退出?”等一系列圍繞資源訪問的問題,而至於“資源是否可以被訪問?”這個問題則交給AQS的子類去實現。
當AQS的子類是實現獨佔功能時,例如ReentrantLock,“資源是否可以被訪問”被定義為只要AQS的state變數不為0,並且持有鎖的執行緒不是當前執行緒,則代表資源不能訪問。
當AQS的子類是實現共享功能時,例如:CountDownLatch,“資源是否可以被訪問”被定義為只要AQS的state變數不為0,說明資源不能訪問。這是典型的將規則和操作分開的設計思路:規則子類定義,操作邏輯因為具有公用性,放在父類中去封裝。當然,正式因為AQS只是關心“資源在什麼條件下可被訪問”,所以子類還可以同時使用AQS的共享功能和獨佔功能的API以實現更為複雜的功能。
比如:ReentrantReadWriteLock,我們知道ReentrantReadWriteLock的中也有一個叫Sync的內部類繼承了AQS,而AQS的佇列可以同時存放共享鎖和獨佔鎖,對於ReentrantReadWriteLock來說分別代表讀鎖和寫鎖,當佇列中的頭節點為讀鎖時,代表讀操作可以執行,而寫操作不能執行,因此請求寫操作的執行緒會被掛起,當讀操作依次推出後,寫鎖成為頭節點,請求寫操作的執行緒被喚醒,可以執行寫操作,而此時的讀請求將被封裝成Node放入AQS的佇列中。如此往復,實現讀寫鎖的讀寫交替進行。
而本系列文章上半部分提到的FutureTask,其實思路也是:封裝一個存放執行緒執行結果的變數A,使用AQS的獨佔API實現執行緒對變數A的獨佔訪問,判斷規則是,執行緒沒有執行完畢:call()方法沒有返回前,不能訪問變數A,或者是超時時間沒到前不能訪問變數A(這就是FutureTask的get方法可以實現獲取執行緒執行結果時,設定超時時間的原因)。
綜上所述,本系列文章從AQS獨佔鎖和共享鎖兩個方面深入分析了AQS的實現方式和獨特的設計思路,希望對讀者有啟發,下一篇文章,我們將繼續JDK 1.8下 J.U.C (java.util.concurrent)包中的其他工具類,敬請期待。