1. 程式人生 > >Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放

Java併發包原始碼學習系列:CLH同步佇列及同步資源獲取與釋放

[toc] ## 本篇學習目標 - 回顧CLH同步佇列的結構。 - 學習獨佔式資源獲取和釋放的流程。 ## CLH佇列的結構 我在[Java併發包原始碼學習系列:AbstractQueuedSynchronizer#同步佇列與Node節點](https://www.cnblogs.com/summerday152/p/14238284.html#同步佇列與node節點)已經粗略地介紹了一下CLH的結構,本篇主要解析該同步佇列的相關操作,因此在這邊再回顧一下: AQS通過內建的FIFO同步雙向佇列來完成資源獲取執行緒的排隊工作,內部通過節點head【實際上是虛擬節點,真正的第一個執行緒在head.next的位置】和tail記錄隊首和隊尾元素,佇列元素型別為Node。 ![](https://img2020.cnblogs.com/blog/1771072/202101/1771072-20210107013340062-698786209.png) - 如果當前執行緒獲取同步狀態失敗(鎖)時,AQS 則會將當前執行緒以及等待狀態等資訊構造成一個節點(Node)並將其加入同步佇列,同時會阻塞當前執行緒 - 當同步狀態釋放時,則會把節點中的執行緒喚醒,使其再次嘗試獲取同步狀態。 > 接下來將要通過**AQS以獨佔式的獲取和釋放資源**的具體案例來詳解內建CLH阻塞佇列的工作流程,接著往下看吧。 ## 資源獲取 ```java public final void acquire(int arg) { if (!tryAcquire(arg) && // tryAcquire由子類實現,表示獲取鎖,如果成功,這個方法直接返回了 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果獲取失敗,執行 selfInterrupt(); } ``` - tryAcquire(int)是AQS提供給子類實現的鉤子方法,子類可以自定義實現獨佔式獲取資源的方式,獲取成功則返回true,失敗則返回false。 - 如果tryAcquire方法獲取資源成功就直接返回了,失敗的化就會執行`acquireQueued(addWaiter(Node.EXCLUSIVE), arg))`的邏輯,我們可以將其進行拆分,分為兩步: - addWaiter(Node.EXCLUSIVE):將該執行緒包裝成為獨佔式的節點,加入佇列中。 - acquireQueued(node,arg):如果當前節點是等待節點的第一個,即head.next,就嘗試獲取資源。如果該方法返回true,則會進入`selfInterrupt()`的邏輯,進行阻塞。 接下來我們分別來看看`addWaiter`和`acquireQueued`兩個方法。 ### 入隊Node addWaiter(Node mode) 根據傳入的mode引數決定獨佔或共享模式,為當前執行緒建立節點,併入隊。 ```java // 其實就是把當前執行緒包裝一下,設定模式,形成節點,加入佇列 private Node addWaiter(Node mode) { // 根據mode和thread建立節點 Node node = new Node(Thread.currentThread(), mode); // 記錄一下原尾節點 Node pred = tail; // 尾節點不為null,佇列不為空,快速嘗試加入隊尾。 if (pred != null) { // 讓node的prev指向尾節點 node.prev = pred; // CAS操作設定node為新的尾節點,tail = node if (compareAndSetTail(pred, node)) { // 設定成功,讓原尾節點的next指向新的node,實現雙向連結 pred.next = node; // 入隊成功,返回 return node; } } // 快速入隊失敗,進行不斷嘗試 enq(node); return node; } ``` 幾個注意點: - 入隊的操作其實就是將執行緒通過指定模式包裝為Node節點,如果佇列尾節點不為null,利用CAS嘗試快速加入隊尾。 - 快速入隊失敗的原因有兩個: - 佇列為空,即還沒有進行初始化。 - CAS設定尾節點的時候失敗。 - 在第一次快速入隊失敗後,將會走到enq(node)邏輯,不斷進行嘗試,直到設定成功。 ### 不斷嘗試Node enq(final Node node) ```java private Node enq(final Node node) { // 自旋,俗稱死迴圈,直到設定成功為止 for (;;) { // 記錄原尾節點 Node t = tail; // 第一種情況:佇列為空,原先head和tail都為null, // 通過CAS設定head為哨兵節點,如果設定成功,tail也指向哨兵節點 if (t == null) { // Must initialize // 初始化head節點 if (compareAndSetHead(new Node())) // tail指向head,下個執行緒來的時候,tail就不為null了,就走到了else分支 tail = head; // 第二種情況:CAS設定尾節點失敗的情況,和addWaiter一樣,只不過它在for(;;)中 } else { // 入隊,將新節點的prev指向tail node.prev = t; // CAS設定node為尾部節點 if (compareAndSetTail(t, node)) { //原來的tail的next指向node t.next = node; return t; } } } } ``` enq的過程是自選設定隊尾的過程,如果設定成功,就返回。如果設定失敗,則一直嘗試設定,理念就是,我總能等待設定成功那一天。 我們還可以發現,head是延遲初始化的,在第一個節點嘗試入隊的時候,head為null,這時使用了`new Node()`建立了一個不代表任何執行緒的節點,作為虛擬頭節點,且我們需要注意它的waitStatus初始化為0,這一點對我們之後分析有指導意義。 如果是CAS失敗導致重複嘗試,那就還是讓他繼續CAS好了。 ### boolean acquireQueued(Node, int) ```java // 這個方法如果返回true,程式碼將進入selfInterrupt() final boolean acquireQueued(final Node node, int arg) { // 注意預設為true boolean failed = true; try { // 是否中斷 boolean interrupted = false; // 自旋,即死迴圈 for (;;) { // 得到node的前驅節點 final Node p = node.predecessor(); // 我們知道head是虛擬的頭節點,p==head表示如果node為阻塞佇列的第一個真實節點 // 就執行tryAcquire邏輯,這裡tryAcquire也需要由子類實現 if (p == head && tryAcquire(arg)) { // tryAcquire獲取成功走到這,執行setHead出隊操作 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 走到這有兩種情況 1.node不是第一個節點 2.tryAcquire爭奪鎖失敗了 // 這裡就判斷 如果當前執行緒爭鎖失敗,是否需要掛起當前這個執行緒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 死迴圈退出,只有tryAcquire獲取鎖失敗的時候failed才為true if (failed) cancelAcquire(node); } } ``` ### 出隊void setHead(Node) CLU同步佇列遵循FIFO,首節點的執行緒釋放同步狀態後,喚醒下一個節點。將隊首節點出隊的操作實際上就是,將head指標指向將要出隊的節點就可以了。 ```java private void setHead(Node node) { // head指標指向node head = node; // 釋放資源 node.thread = null; node.prev = null; } ``` ### boolean shouldParkAfterFailedAcquire(Node,Node) ```java /** * 走到這有兩種情況 1.node不是第一個節點 2.tryAcquire爭奪鎖失敗了 * 這裡就判斷 如果當前執行緒爭鎖失敗,是否需要掛起當前這個執行緒 * * 這裡pred是前驅節點, node就是當前節點 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驅節點的waitStatus int ws = pred.waitStatus; // 前驅節點為SIGNAL【-1】直接返回true,表示當前節點可以被直接掛起 if (ws == Node.SIGNAL) return true; // ws>0 CANCEL 說明前驅節點取消了排隊 if (ws > 0) { // 下面這段迴圈其實就是跳過所有取消的節點,找到第一個正常的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 將該節點的後繼指向node,建立雙向連線 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. * 官方說明:走到這waitStatus只能是0或propagate,預設情況下,當有新節點入隊時,waitStatus總是為0 * 下面用CAS操作將前驅節點的waitStatus值設定為signal */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回false,接著會再進入迴圈,此時前驅節點為signal,返回true return false; } ``` 針對前驅節點的waitStatus有三種情況: > 等待狀態不會為 `Node.CONDITION` ,因為它用在 ConditonObject 中 1. ws==-1,即為Node.SIGNAL,表示當前節點node可以被直接掛起,在pred執行緒釋放同步狀態時,會對node執行緒進行喚醒。 2. ws > 0,即為Node.CANCELLED,說明前驅節點已經取消了排隊【可能是超時,可能是被中斷】,則需要找到前面沒有取消的前驅節點,一直找,直到找到為止。 3. ws == 0 or ws == Node.PROPAGATE: - 預設情況下,當有新節點入隊時,waitStatus總是為0,用CAS操作將前驅節點的waitStatus值設定為signal,下一次進來的時候,就走到了第一個分支。 - 當釋放鎖的時候,會將佔用鎖的節點的ws狀態更新為0。 > PROPAGATE表示共享模式下,前驅節點不僅會喚醒後繼節點,同時也可能會喚醒後繼的後繼。 我們可以發現,這個方法在第一次走進來的時候是不會返回true的。原因在於,返回true的條件時前驅節點的狀態為SIGNAL,而第一次的時候還沒有給前驅節點設定SIGNAL呢,只有在CAS設定了狀態之後,第二次進來才會返回true。 那SIGNAL的意義到底是什麼呢? >這裡引用:[併發程式設計——詳解 AQS CLH 鎖 # 為什麼 AQS 需要一個虛擬 head 節點](https://www.jianshu.com/p/4682a6b0802d) > >waitStatus這裡用ws簡稱,每個節點都有ws變數,用於表示該節點的狀態。初始化的時候為0,如果被取消為1,signal為-1。 > >如果某個節點的狀態是signal的,那麼在該節點釋放鎖的時候,它需要喚醒下一個節點。 > >因此,每個節點在休眠之前,如果沒有將前驅節點的ws設定為signal,那麼它將永遠無法被喚醒。 > >因此我們會發現上面當前驅節點的ws為0或propagate的時候,採用cas操作將ws設定為signal,目的就是讓上一個節點釋放鎖的時候能夠通知自己。 ### boolean parkAndCheckInterrupt() ```java private final boolean parkAndCheckInterrupt() { // 掛起當前執行緒 LockSupport.park(this); return Thread.interrupted(); } ``` shouldParkAfterFailedAcquire方法返回true之後,就會呼叫該方法,掛起當前執行緒。 `LockSupport.park(this)`方法掛起的執行緒有兩種途徑被喚醒:1.被unpark() 2.被interrupt()。 需要注意這裡的Thread.interrupted()會清除中斷標記位。 ### void cancelAcquire(node) 上面tryAcquire獲取鎖失敗的時候,會走到這個方法。 ```java private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; // 將節點的執行緒置空 node.thread = null; // 跳過所有的取消的節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. // 這裡在沒有併發的情況下,preNext和node是一致的 Node predNext = pred.next; // Can use unconditional write instead of CAS here. 可以直接寫而不是用CAS // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. // 設定node節點為取消狀態 node.waitStatus = Node.CANCELLED; // 如果node為尾節點就CAS將pred設定為新尾節點 if (node == tail && compareAndSetTail(node, pred)) { // 設定成功之後,CAS將pred的下一個節點置為空 compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && // pred不是首節點 ((ws = pred.waitStatus) == Node.SIGNAL || // pred的ws為SIGNAL 或 可以被CAS設定為SIGNAL (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // pred執行緒非空 // 儲存node 的下一個節點 Node next = node.next; // node的下一個節點不是cancelled,就cas設定pred的下一個節點為next if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 上面的情況除外,則走到這個分支,喚醒node的下一個可喚醒節點執行緒 unparkSuccessor(node); } node.next = node; // help GC } } ``` ## 釋放資源 ### boolean release(int arg) ```java public final boolean release(int arg) { if (tryRelease(arg)) { // 子類實現tryRelease方法 // 獲得當前head Node h = head; // head不為null並且head的等待狀態不為0 if (h != null && h.waitStatus != 0) // 喚醒下一個可以被喚醒的執行緒,不一定是next哦 unparkSuccessor(h); return true; } return false; } ``` - tryRelease(int)是AQS提供給子類實現的鉤子方法,子類可以自定義實現獨佔式釋放資源的方式,釋放成功並返回true,否則返回false。 - unparkSuccessor(node)方法用於喚醒等待佇列中下一個可以被喚醒的執行緒,不一定是下一個節點next,比如它可能是取消狀態。 - head 的ws必須不等於0,為什麼呢?當一個節點嘗試掛起自己之前,都會將前置節點設定成SIGNAL -1,就算是第一個加入佇列的節點,在獲取鎖失敗後,也會將虛擬節點設定的 ws 設定成 SIGNAL,而這個判斷也是防止多執行緒重複釋放,接下來我們也能看到釋放的時候,將ws設定為0的操作。 ### void unparkSuccessor(Node node) ```java private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // 如果node的waitStatus<0為signal,CAS修改為0 // 將 head 節點的 ws 改成 0,清除訊號。表示,他已經釋放過了。不能重複釋放。 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 喚醒後繼節點,但是有可能後繼節點取消了等待 即 waitStatus == 1 Node s = node.next; // 如果後繼節點為空或者它已經放棄鎖了 if (s == null || s.waitStatus >
0) { s = null; // 從隊尾往前找,找到沒有沒取消的所有節點排在最前面的【直到t為null或t==node才退出迴圈嘛】 for (Node t = tail; t != null && t != node; t = t.prev) // 如果>0表示節點被取消了,就一直向前找唄,找到之後不會return,還會一直向前 if (t.waitStatus <= 0) s = t; } // 如果後繼節點存在且沒有被取消,會走到這,直接喚醒後繼節點即可 if (s != null) LockSupport.unpark(s.thread); } ``` ## 參考閱讀 - [http://concurrent.redspider.group/article/02/11.html](http://concurrent.redspider.group/article/02/11.html) - [一行一行原始碼分析清楚AbstractQueuedSynchronizer](https://javadoop.com/post/AbstractQueuedSynchronizer) - [【死磕 Java 併發】—– J.U.C 之 AQS:同步狀態的獲取與釋放](http://cmsblogs.com/