1. 程式人生 > >JUC併發程式設計基石AQS之主流程原始碼解析

JUC併發程式設計基石AQS之主流程原始碼解析

## 前言 由於AQS的原始碼太過凝練,而且有很多分支比如取消排隊、等待條件等,如果把所有的分支在一篇文章的寫完可能會看懵,所以這篇文章主要是從正常流程先走一遍,重點不在取消排隊等分支,之後會專門寫一篇取消排隊和等待條件的分支邏輯。讀原始碼千萬別在每個程式碼分支中來回遊走,先按一個正常的分支把流程看明白,之後再去重點關注其他分支,各個擊破。我相信看完正常流程,你再去分析其他分支會更加得心應手。本篇將主要方法名都做了目錄索引,檢視時可通過目錄快速跳到指定方法的邏輯。 ## 執行流程 AQS的執行流程大體為當執行緒獲取鎖失敗時,會加入到等待佇列中,在**等待佇列**中的執行緒會按照從頭至尾的順序依次再去嘗試獲取鎖執行。 當執行緒獲取鎖後如果還需要等待特定的條件才能執行,那麼執行緒就加入到**條件佇列**排隊,當等待的條件到來時**再從條件佇列中按照從頭至尾的順序加入到等待佇列**中,然後再按照等待佇列的執行流程去獲取鎖。所以AQS最核心的資料結構其實就兩個佇列,等待佇列和條件佇列,然後再加上一個獲取鎖的同步狀態。 ## AQS資料結構 AQS最核心的資料結構就三個 - **等待佇列** 原始碼中head和tail為等待佇列的頭尾節點,在通過前後指向則構成了等待佇列,為雙向連結串列,學名為CLH佇列。 ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210456579-856565466.png) - **條件佇列** ConditionObject中的firstWaiter和lastWaiter為等待佇列的頭尾節點,然後通過next指向構成了條件佇列,是個單向連結串列。 ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210520472-754565803.png) - **同步狀態** state為同步狀態,通過CAS操作來實現獲取鎖的操作。 ```java public abstract class AbstractQueuedSynchronizer{ /** * 等待佇列的頭節點 */ private transient volatile Node head; /** * 等待佇列的尾節點 */ private transient volatile Node tail; /** * 同步狀態 */ private volatile int state; public class ConditionObject implements Condition, java.io.Serializable { /** 條件佇列的頭節點 */ private transient Node firstWaiter; /** 條件佇列的尾節點 */ private transient Node lastWaiter; } } ``` ### Node節點 兩個佇列中的節點都是通過AQS中內部類Node來實現的。主要欄位: - waitStatus 當前節點的狀態,具體看原始碼列出的註釋。很重要,之後會在原始碼中講解。 - Node prev 等待佇列節點指向的前置節點 - Node next 待佇列節點指向的後置節點 - Node nextWaiter 條件佇列中節點指向的後置節點 - Thread thread 當前節點持有的執行緒 ```java static final class Node { /** */ static final Node SHARED = new Node(); /** */ static final Node EXCLUSIVE = null; /** 標明當前節點執行緒取消排隊 */ static final int CANCELLED = 1; /** 標明該節點的後置節點需要自己去喚醒 */ static final int SIGNAL = -1; /** 標明當前節點在等待某個條件,此時節點在條件佇列中 */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * 等待狀態,值對於上面的四個常量 */ volatile int waitStatus; /** * 等待佇列節點指向的前置節點 */ volatile Node prev; /** * 等待佇列節點指向的後置節點 */ volatile Node next; /** * 當前節點持有的執行緒 */ volatile Thread thread; /** * 條件佇列中節點指向的後置節點 */ Node nextWaiter; ``` ## 加鎖 上面說明的資料結構我們先大致有個印象,現在通過加鎖來一步步說明下具體的流程,上篇文章**JUC併發程式設計基石AQS之結構篇**,我們知道了AQS加鎖程式碼執行的是acquire方法,那麼我們從這個方法說起,從原始碼中看出執行流程為:tryAcquire——>addWaiter——>acquireQueued tryAcquire為自己實現的具體加鎖邏輯,當加鎖失敗時返回false,則會執行addWaiter,將執行緒加入到等待佇列中,Node.EXCLUSIVE為獨佔鎖的模式,即同時只能有一個執行緒獲取鎖去執行。 **例子說明** ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210553150-182392148.png) 首先假設有四個執行緒t0-t4呼叫tryAcquire獲取鎖,t0執行緒為天選之子獲取到了鎖,則t1-t4執行緒接著去執行addWaiter。 ### acquire ```java public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` ### addWaiter分支1 addWaiter方法,首先會初始化一個node節點,將當前執行緒設定到node節點中。然後判斷head和tail節點是否為空,head和tail節點是懶載入的,當AQS初始化時為null,則第一次進來時if (pred != null) 條件不成立,執行enq方法。 **例子說明** 假如t1和t2執行緒同時執行到該方法,head節點未初始化則執行enq。 ```java private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } ``` ### enq 此時可能多個執行緒會同時呼叫enq方法,所以該方法中也使用CAS操作。for (;;)是個死迴圈,首先會CAS操作初始化head節點,且**head節點是個空節點,沒有設定執行緒。**然後第二次迴圈時通過CAS操作將該節點設定我尾部節點,並將前置節點指向head,之後會跳出迴圈,返回生成的Node節點到addWaiter,從原始碼可以看到addWaiter方法後面沒有邏輯,之後會呼叫acquireQueued。 **例子說明** t1和t2執行緒同時執行,t1執行緒上天眷顧CAS成功,則流程為 - 初始化head ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210609555-882274264.png) - t1執行緒的node節點加入等待佇列 ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210651530-1199757265.png) - t2執行緒執行,node節點加入等待佇列 ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210702551-1406037531.png) ```java private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ``` ### addWaiter分支2 現在在來說t3和t4,t3和t4執行緒這時終於獲取到了cpu的執行權,此時head節點已經初始化,則進入條件中的程式碼,其實也是通過CAS操作將節點加入到等待佇列尾部,之後會呼叫acquireQueued。 **例子說明** 假如t3執行緒先CAS成功,之後t4成功,此時的資料結構為 ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210723518-381228880.png) ```java private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } ``` ### acquireQueued 這個方法有兩個邏輯,首先如果該節點的前置節點是head會走第一個if,再次去嘗試獲取鎖??? 獲取鎖成功,則將頭節點設定為自己,並返回到acquire方法,此時acquire方法執行完,代表獲取鎖成功,執行緒可以執行自己的邏輯了。這裡有下面幾個注意點 - p.next = null; // help GC 設定舊的head節點的後置節點為null - setHead方法 將t1節點設定為頭節點,因為頭節點是個空節點,所以設定t1執行緒節點執行緒為null,設定t1前置節點為null,此時舊的head節點已經沒有任何指向和關聯,可以被gc回收,所以上面那一步會寫個help GC 的註釋。 **例子說明** 現在t1執行緒的前置節點為頭結點,如果t1執行tryAcquire成功則結果為 ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210742531-1679283002.png) 當獲取鎖失敗或者前置節點不是頭節點都會走第二個if邏輯,首先會判斷當前執行緒是否需要掛起,如果需要則執行執行緒掛起。 ```java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } ``` ### shouldParkAfterFailedAcquire 判斷執行緒是否需要掛起,首先需要注意的是這個方法的引數是當前節點的前置節點。當執行緒需要掛起的時候,它需要把身後事安排明白,掛起後讓誰來把我喚醒。這個方法就主要做這個操作。我們再來看Node節點中的waitStatus狀態,這個狀態有一個Node.SIGNAL=-1,**代表了當前節點需要將後置節點喚醒**。這個理解可能有點繞。首先我們要理解一點,如果我需要被喚醒,那麼我就要設定我們的前置節點的狀態為Node.SIGNAL,這樣當我的前置節點發現waitStatus=Node.SIGNAL時,它才知道,我執行完後需要去喚醒後置節點讓後置節點去執行。所以這個方法是**當前節點去設定自己的前置節點的狀態為Node.SIGNAL**。 waitStatus初始化後是0, 第一次進入該方法,發現自己的前置節點不是Node.SIGNAL,需要先設定為Node.SIGNAL狀態 第二次進入時發現前置節點已經是Node.SIGNAL狀態,那麼我就可以安心的掛起了,有人會喚醒我的。 所以這個方法其實是兩個邏輯,先設定前置節點狀態,再判斷是否可以掛起。因為前面acquireQueued方法中for (;;) 也是個迴圈,所以會重複進入。 ```java private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } ``` ### parkAndCheckInterrupt 將自己的前置節點設定為可喚醒的狀態後進入該方法,執行緒掛起。 **例子說明** 此時t2-t4執行緒都執行到了此方法,則t2-t4執行緒都已經掛起不再執行,並且**head-t3**節點的waitStatus都為Node.SIGNAL,因為t4沒有後置節點。 ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210756816-1954238693.png) ```java private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } ``` ## 解鎖 ### release 解鎖方法的入口是AQS的release方法,首先會呼叫tryRelease方法,這個是AQS實現類自己實現的方法,去CAS改變state狀態,如果解鎖成功,則會進入if裡的程式碼,獲取head節點,判斷waitStatus!=0,如果等於0代表沒有後置節點需要去喚醒。之後呼叫unparkSuccessor方法。 ```java public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } ``` waitStatus>0時,代表為CANCELLED = 1狀態,即執行緒取消排隊,這個以後會細講。先將頭結點的waitStatus狀態設為初始值0,之後檢視後置節點的狀態,如果>0代表後置節點取消了排隊,不需要喚醒。但是當前節點需要去喚醒後續的節點讓後續節點再去執行,所以會從尾結點開始尋找找到離當前執行緒最近的一個且waitStatus<0的去喚醒。之後會呼叫LockSupport.unpark(s.thread);取消後續節點的掛起,讓後續節點繼續執行。 ### unparkSuccessor ```java private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } ``` **例子說明** 此時等待佇列的資料,當t0執行緒執行完成後執行解鎖操作,此時所有等待的執行緒都沒有取消等待。 ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210820697-368819274.png) 則t0執行緒會喚醒t1執行緒 ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210835093-895930851.png) 如果t1和t3執行緒取消的排隊時,t0執行緒會喚醒t2,**從後往前找離head最近的一個沒有取消派對的節點**。 ![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210848702-114717817.png) 執行緒執行到parkAndCheckInterrupt方法時被掛起,當被頭節點喚醒後會繼續執行,設定interrupted=true,表示被中斷,會繼續執行for迴圈邏輯,到現在一個正常的獲取鎖失敗——>加入等待佇列——>掛起——>被喚醒繼續執行的流程已經整體走了一遍。 本篇文章都是自己根據原始碼寫出的閱讀心得,可能有的地方沒有揣摩到Doug Lea大神的意圖,如果有理解不對的地方歡迎一起探討。 **如有不實,還望