1. 程式人生 > >Java並發編程原理與實戰十九:AQS 剖析

Java並發編程原理與實戰十九:AQS 剖析

影響 clu cbo 大神 ping 方法 extc 共享鎖 一次

一、引言
在JDK1.5之前,一般是靠synchronized關鍵字來實現線程對共享變量的互斥訪問。synchronized是在字節碼上加指令,依賴於底層操作系統的Mutex Lock實現。
而從JDK1.5以後java界的一位大神—— Doug Lea 開發了AbstractQueuedSynchronizer(AQS)組件,使用原生java代碼實現了synchronized語義。換句話說,Doug Lea沒有使用更“高級”的機器指令,也不依靠JDK編譯時的特殊處理,僅用一個普普通通的類就完成了代碼塊的並發訪問控制,比那些費力不討好的實現不知高到哪裏去了。


java.util.concurrent包有多重要無需多言,一言以蔽之,是Doug Lea大爺對天下所有Java程序員的憐憫。
AQS定義了一套多線程訪問共享資源的同步器框架,是整個java.util.concurrent包的基石,Lock、ReadWriteLock、CountDowndLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor等都是在AQS的基礎上實現的。

二、原理

2.1實現原理

並發控制的核心是鎖的獲取與釋放,鎖的實現方式有很多種,AQS采用的是一種改進的CLH鎖。

2.2 CLH鎖

CLH(Craig, Landin, andHagersten locks)是一鐘自旋鎖,能確保無饑餓性,提供先來先服務的公平性。

何謂自旋鎖?它是為實現保護共享資源而提出一種鎖機制。其實,自旋鎖與互斥鎖比較類似,它們都是為了解決對某項資源的互斥使用。無論是互斥鎖,還是自旋鎖,在任何時刻,最多只能有一個保持者,也就是說,在任何時刻最多只能有一個執行單元獲得鎖。但是兩者在調度機制上略有不同。對於互斥鎖,如果資源已經被占用,資源申請者只能進入睡眠狀態。但是自旋鎖不會引起調用者睡眠,如果自旋鎖已經被別的執行單元保持,調用者就一直循環在那裏看是否該自旋鎖的保持者已經釋放了鎖,“自旋”一詞就是因此而得名

CLH鎖是一種基於鏈表的可擴展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅的狀態,如果發現前驅釋放了鎖就結束自旋。

CLH隊列中的結點QNode中含有一個locked字段,該字段若為true表示該線程需要獲取鎖,且不釋放鎖,為false表示線程釋放了鎖。結點之間是通過隱形的鏈表相連,之所以叫隱形的鏈表是因為這些結點之間沒有明顯的next指針,而是通過myPred所指向的結點的變化情況來影響myNode的行為。CLHLock上還有一個尾指針,始終指向隊列的最後一個結點。

當一個線程需要獲取鎖時,會創建一個新的QNode,將其中的locked設置為true表示需要獲取鎖,然後使自己成為隊列的尾部,同時獲取一個指向其前趨的引用myPred,然後該線程就在前趨結點的locked字段上旋轉,直到前趨結點釋放鎖。當一個線程需要釋放鎖時,將當前結點的locked域設置為false,同時回收前趨結點。如上圖所示,線程A需要獲取鎖,其myNode域為true,些時tail指向線程A的結點,然後線程B也加入到線程A後面,tail指向線程B的結點。然後線程A和B都在它的myPred域上旋轉,一旦它的myPred結點的locked字段變為false,它就可以獲取鎖。

2.3 AQS數據模型

AQS維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。

技術分享圖片

AQS的內部隊列是CLH同步鎖的一種變形。其主要從兩方面進行了改造,節點的結構與節點等待機制:
l 在結構上引入了頭結點和尾節點,分別指向隊列的頭和尾,嘗試獲取鎖、入隊列、釋放鎖等實現都與頭尾節點相關,

l 為了可以處理timeout和cancel操作,每個node維護一個指向前驅的指針。如果一個node的前驅被cancel,這個node可以前向移動使用前驅的狀態字段

l 在每個node裏面使用一個狀態字段來控制阻塞/喚醒,而不是自旋

l head結點使用的是傀儡結點

FIFO隊列中的節點有AQS的靜態內部類Node定義:

static final class Node {
 
    // 共享模式
    static final Node SHARED = new Node();
 
    // 獨占模式
    static final Node EXCLUSIVE = null;
 
    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
 
    /**
     * CANCELLED,值為1,表示當前的線程被取消
     * SIGNAL,值為-1,表示當前節點的後繼節點包含的線程需要運行,也就是unpark;
     * CONDITION,值為-2,表示當前節點在等待condition,也就是在condition隊列中;
     * PROPAGATE,值為-3,表示當前場景下後續的acquireShared能夠得以執行;
     * 值為0,表示當前節點在sync隊列中,等待著獲取鎖。
     */
    volatile int waitStatus;
 
    // 前驅結點
    volatile Node prev;
 
    // 後繼結點
    volatile Node next;
 
    // 與該結點綁定的線程
    volatile Thread thread;
 
    // 存儲condition隊列中的後繼節點
    Node nextWaiter;
 
    // 是否為共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
 
    // 獲取前驅結點
    final Node predecessor() throwsNullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
 
    Node() { // Used to establish initial heador SHARED marker
    }
 
    Node(Thread thread, Node mode) { // Used byaddWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
 
    Node(Thread thread, int waitStatus) { //Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Node類中有兩個常量SHARE和EXCLUSIVE,顧名思義這兩個常量用於表示這個結點支持共享模式還是獨占模式,共享模式指的是允許多個線程獲取同一個鎖而且可能獲取成功,獨占模式指的是一個鎖如果被一個線程持有,其他線程必須等待。多個線程讀取一個文件可以采用共享模式,而當有一個線程在寫文件時不會允許另一個線程寫這個文件,這就是獨占模式的應用場景。

2.4 CAS操作

AQS有三個重要的變量:

 // 隊頭結點
    private transient volatile Node head;
 
    // 隊尾結點
    private transient volatile Node tail;
 
    // 代表共享資源
    private volatile int state;
 
    protected final int getState() {
        return state;
    }
 
    protected final void setState(int newState){
        state = newState;
    }
 
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this,stateOffset, expect, update);
    }

compareAndSetState方法是以樂觀鎖的方式更新共享資源。

獨占鎖是一種悲觀鎖,synchronized就是一種獨占鎖,會導致其它所有需要鎖的線程掛起,等待持有鎖的線程釋放鎖。而另一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是,每次不加鎖而是假設沒有沖突而去完成某項操作,如果因為沖突失敗就重試,直到成功為止。樂觀鎖用到的機制就是CAS,即Compare And Swap。

CAS 指的是現代 CPU 廣泛支持的一種對內存中的共享數據進行操作的一種特殊指令。這個指令會對內存中的共享數據做原子的讀寫操作。簡單介紹一下這個指令的操作過程:

首先,CPU 會將內存中將要被更改的數據與期望的值做比較。然後,當這兩個值相等時,CPU 才會將內存中的數值替換為新的值。否則便不做操作。最後,CPU 會將舊的數值返回。

這一系列的操作是原子的。它們雖然看似復雜,但卻是 Java 5 並發機制優於原有鎖機制的根本。簡單來說,CAS 的含義是“我認為原有的值應該是什麽,如果是,則將原有的值更新為新值,否則不做修改,並告訴我原來的值是多少”。

CAS通過調用JNI(Java Native Interface)調用實現的。JNI允許java調用其他語言,而CAS就是借助C語言來調用CPU底層指令實現的。Unsafe是CAS的核心類,它提供了硬件級別的原子操作

Doug Lea大神在java同步器中大量使用了CAS技術,鬼斧神工的實現了多線程執行的安全性。CAS不僅在AQS的實現中隨處可見,也是整個java.util.concurrent包的基石。

可以發現,head、tail、state三個變量都是volatile的。

volatile是輕量級的synchronized,它在多處理器開發中保證了共享變量的“可見性”。可見性的意思是當一個線程修改一個共享變量時,另外一個線程能讀到這個修改的值。如果一個字段被聲明成volatile,Java線程內存模型確保所有線程看到這個變量的值是一致的。

volatile變量也存在一些局限:不能用於構建原子的復合操作,因此當一個變量依賴舊值時就不能使用volatile變量。而CAS呢,恰恰可以提供對共享變量的原子的讀寫操作。

volatile保證共享變量的可見性,CAS保證更新操作的原子性,簡直是絕配!把這些特性整合在一起,就形成了整個concurrent包得以實現的基石。如果仔細分析concurrent包的源代碼實現,會發現一個通用化的實現模式:

1. 首先,聲明共享變量為volatile;

2. 然後,使用CAS的原子條件更新來實現線程之間的同步;

3. 同時,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的內存語義來實現線程之間的通信。

AQS,非阻塞數據結構和原子變量類(java.util.concurrent.atomic包中的類),這些concurrent包中的基礎類都是使用這種模式來實現的,而concurrent包中的高層類又是依賴於這些基礎類來實現的。從整體來看,concurrent包的實現示意圖如下:

技術分享圖片

三、源碼解讀 (java特種兵)

AQS的全稱為(AbstractQueuedSynchronizer),這個類也是在java.util.concurrent.locks下面。這個類似乎很不容易看懂,因為它僅僅是提供了一系列公共的方法,讓子類來調用。那麽要理解意思,就得從子類下手,反過來看才容易看懂。如下圖所示:

技術分享圖片

這麽多類,我們看那一個?剛剛提到過鎖(Lock),我們就從鎖開始吧。這裏就先以ReentrantLock排它鎖為例開始展開講解如何利用AQS的,然後再簡單介紹讀寫鎖的要點(讀寫鎖本身的實現十分復雜,要完全說清楚需要大量的篇幅來說明)。
首先來看看ReentrantLock的構造方法,它的構造方法有兩個,如下圖所示:

技術分享圖片

很顯然,對象中有一個屬性叫sync,有兩種不同的實現類,默認是“NonfairSync”來實現,而另一個“FairSync”它們都是排它鎖的內部類,不論用那一個都能實現排它鎖,只是內部可能有點原理上的區別。先以“NonfairSync”類為例,它的lock()方法是如何實現的呢?

技術分享圖片

lock()方法先通過CAS嘗試將狀態從0修改為1。若直接修改成功,前提條件自然是鎖的狀態為0,則直接將線程的OWNER修改為當前線程,這是一種理想情況,如果並發粒度設置適當也是一種樂觀情況。
若上一個動作未成功,則會間接調用了acquire(1)來繼續操作,這個acquire(int)方法就是在AbstractQueuedSynchronizer當中了。這個方法表面上看起來簡單,但真實情況比較難以看懂,因為第一次看這段代碼可能不知道它要做什麽!不急,一步一步來分解。
首先看tryAcquire(arg)這裏的調用(當然傳入的參數是1),在默認的“NonfairSync”實現類中,會這樣來實現:

技術分享圖片

○ 首先獲取這個鎖的狀態,如果狀態為0,則嘗試設置狀態為傳入的參數(這裏就是1),若設置成功就代表自己獲取到了鎖,返回true了。狀態為0設置1的動作在外部就有做過一次,內部再一次做只是提升概率,而且這樣的操作相對鎖來講不占開銷。
○ 如果狀態不是0,則判定當前線程是否為排它鎖的Owner,如果是Owner則嘗試將狀態增加acquires(也就是增加1),如果這個狀態值越界,則會拋出異常提示,若沒有越界,將狀態設置進去後返回true(實現了類似於偏向的功能,可重入,但是無需進一步征用)。
○ 如果狀態不是0,且自身不是owner,則返回false。

對tryAcquire()的調用判定中是通過if(!tryAcquire())作為第1個條件的,如果返回true,則判定就不會成立了,自然後面的acquireQueued動作就不會再執行了,如果發生這樣的情況是最理想的。
無論多麽樂觀,征用是必然存在的,如果征用存在則owner自然不會是自己,tryAcquire()方法會返回false,接著就會再調用方法:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)做相關的操作。
這個方法的調用的代碼更不好懂,需要從裏往外看,這裏的Node.EXCLUSIVE是節點的類型,看名稱應該清楚是排它類型的意思。接著調用addWaiter()來增加一個排它鎖類型的節點,這個addWaiter()的代碼是這樣寫的:

技術分享圖片

這裏創建了一個Node的對象,將當前線程和傳入的Node.EXCLUSIVE傳入,也就是說Node節點理論上包含了這兩項信息。代碼中的tail是AQS的一個屬性,剛開始的時候肯定是為null,也就是不會進入第一層if判定的區域,而直接會進入enq(node)的代碼,那麽直接來看看enq(node)的代碼。

看到了tail就應該猜到了AQS是鏈表吧,沒錯,而且它還應該有一個head引用來指向鏈表的頭節點,AQS在初始化的時候head、tail都是null,在運行時來回移動。此時,我們最少至少知道AQS是一個基於狀態(state)的鏈表管理方式。

技術分享圖片

首先這個是一個死循環,而且本身沒有鎖,因此可以有多個線程進來,假如某個線程進入方法,此時head、tail都是null,自然會進入if(t == null)所在的代碼區域,這部分代碼會創建一個Node出來名字叫h,這個Node沒有像開始那樣給予類型和線程,很明顯是一個空的Node對象,而傳入的Node對象首先被它的next引用所指向,此時傳入的node和某一個線程創建的h對象如下圖所示。

技術分享圖片

剛才我們很理想的認為只有一個線程會出現這種情況,如果有多個線程並發進入這個if判定區域,可能就會同時存在多個這樣的數據結構,在各自形成數據結構後,多個線程都會去做compareAndSetHead(h)的動作,也就是嘗試將這個臨時h節點設置為head,顯然並發時只有一個線程會成功,因此成功的那個線程會執行tail = node的操作,整個AQS的鏈表就成為:

技術分享圖片

有一個線程會成功修改head和tail的值,其它的線程會繼續循環,再次循環就不會進入if (t == null)的邏輯了,而會進入else語句的邏輯中。
在else語句所在的邏輯中,第一步是node.prev = t,這個t就是tail的臨時值,也就是首先讓嘗試寫入的node節點的prev指針指向原來的結束節點,然後嘗試通過CAS替換掉AQS中的tail的內容為當前線程的Node,無論有多少個線程並發到這裏,依然只會有一個能成功,成功者執行t.next = node,也就是讓原先的tail節點的next引用指向現在的node,現在的node已經成為了最新的結束節點,不成功者則會繼續循環。
簡單使用圖解的方式來說明,3個步驟如下所示,如下圖所示:

技術分享圖片

插入多個節點的時候,就以此類推了哦,總之節點都是在鏈表尾部寫入的,而且是線程安全的。
知道了AQS大致的寫入是一種雙向鏈表的插入操作,但插入鏈表節點對鎖有何用途呢,我們還得退回到前面的代碼中addWaiter方法最終返回了要寫入的node節點, 再回退到圖5-17中所在的代碼中需要將這個返回的node節點作為acquireQueued方法入口參數,並傳入另一個參數(依然是1),看看它裏面到底做了些什麽?請看下圖:

技術分享圖片

這裏也是一個死循環,除非進入if(p == head && tryAcquire(arg))這個判定條件,而p為node.predcessor()得到,這個方法返回node節點的前一個節點,也就是說只有當前一個節點是head的時候,進一步嘗試通過tryAcquire(arg)來征用才有機會成功。tryAcquire(arg)這個方法我們前面介紹過,成立的條件為:鎖的狀態為0,且通過CAS嘗試設置狀態成功或線程的持有者本身是當前線程才會返回true,我們現在來詳細拆分這部分代碼。
○ 如果這個條件成功後,發生的幾個動作包含:
(1) 首先調用setHead(Node)的操作,這個操作內部會將傳入的node節點作為AQS的head所指向的節點。線程屬性設置為空(因為現在已經獲取到鎖,不再需要記錄下這個節點所對應的線程了),再將這個節點的perv引用賦值為null。
(2) 進一步將的前一個節點的next引用賦值為null。
在進行了這樣的修改後,隊列的結構就變成了以下這種情況了,通過這樣的方式,就可以讓執行完的節點釋放掉內存區域,而不是無限制增長隊列,也就真正形成FIFO了:

技術分享圖片

○ 如果這個判定條件失敗
會首先判定:“shouldParkAfterFailedAcquire(p , node)”,這個方法內部會判定前一個節點的狀態是否為:“Node.SIGNAL”,若是則返回true,若不是都會返回false,不過會再做一些操作:判定節點的狀態是否大於0,若大於0則認為被“CANCELLED”掉了(我們沒有說明幾個狀態的值,不過大於0的只可能被CANCELLED的狀態),因此會從前一個節點開始逐步循環找到一個沒有被“CANCELLED”節點,然後與這個節點的next、prev的引用相互指向;如果前一個節點的狀態不是大於0的,則通過CAS嘗試將狀態修改為“Node.SIGNAL”,自然的如果下一輪循環的時候會返回值應該會返回true。
如果這個方法返回了true,則會執行:“parkAndCheckInterrupt()”方法,它是通過LockSupport.park(this)將當前線程掛起到WATING狀態,它需要等待一個中斷、unpark方法來喚醒它,通過這樣一種FIFO的機制的等待,來實現了Lock的操作。
相應的,可以自己看看FairSync實現類的lock方法,其實區別不大,有些細節上的區別可能會決定某些特定場景的需求,你也可以自己按照這樣的思路去實現一個自定義的鎖。
接下來簡單看看unlock()解除鎖的方式,如果獲取到了鎖不釋放,那自然就成了死鎖,所以必須要釋放,來看看它內部是如何釋放的。同樣從排它鎖(ReentrantLock)中的unlock()方法開始,請先看下面的代碼截圖:

技術分享圖片

通過tryRelease(int)方法進行了某種判定,若它成立則會將head傳入到unparkSuccessor(Node)方法中並返回true,否則返回false。首先來看看tryRelease(int)方法,如下圖所示:

技術分享圖片

這個動作可以認為就是一個設置鎖狀態的操作,而且是將狀態減掉傳入的參數值(參數是1),如果結果狀態為0,就將排它鎖的Owner設置為null,以使得其它的線程有機會進行執行。
在排它鎖中,加鎖的時候狀態會增加1(當然可以自己修改這個值),在解鎖的時候減掉1,同一個鎖,在可以重入後,可能會被疊加為2、3、4這些值,只有unlock()的次數與lock()的次數對應才會將Owner線程設置為空,而且也只有這種情況下才會返回true。
這一點大家寫代碼要註意了哦,如果是在循環體中lock()或故意使用兩次以上的lock(),而最終只有一次unlock(),最終可能無法釋放鎖。在本書的src/chapter05/locks/目錄下有相應的代碼,大家可以自行測試的哦。
在方法unparkSuccessor(Node)中,就意味著真正要釋放鎖了,它傳入的是head節點(head節點是已經執行完的節點,在後面闡述這個方法的body的時候都叫head節點),內部首先會發生的動作是獲取head節點的next節點,如果獲取到的節點不為空,則直接通過:“LockSupport.unpark()”方法來釋放對應的被掛起的線程,這樣一來將會有一個節點喚醒後繼續進入圖 5-24中的循環進一步嘗試tryAcquire()方法來獲取鎖,但是也未必能完全獲取到哦,因為此時也可能有一些外部的請求正好與之征用,而且還奇跡般的成功了,那這個線程的運氣就有點悲劇了,不過通常樂觀認為不會每一次都那麽悲劇。
再看看共享鎖,從前面的排它鎖可以看得出來是用一個狀態來標誌鎖的,而共享鎖也不例外,但是Java不希望去定義兩個狀態,所以它與排它鎖的第一個區別就是在鎖的狀態上,它用int來標誌鎖的狀態,int有4個字節,它用高16位標誌讀鎖(共享鎖),低16位標誌寫鎖(排它鎖),高16位每次增加1相當於增加65536(通過1 << 16得到),自然的在這種讀寫鎖中,讀鎖和寫鎖的個數都不能超過65535個(條件是每次增加1的,如果遞增是跳躍的將會更少)。在計算讀鎖數量的時候將狀態左移16位,而計算排它鎖會與65535“按位求與”操作,如下圖所示。

技術分享圖片

寫鎖的功能與“ReentrantLock”基本一致,區域在於它會在tryAcquire操作的時候,判定狀態的時候會更加復雜一些(因此有些時候它的性能未必好)。
讀鎖也會寫入隊列,Node的類型被改為:“Node.SHARED”這種類型,lock()時候調用的是AQS的acquireShared(int)方法,進一步調用tryAcquireShared()操作裏面只需要檢測是否有排它鎖,如果沒有則可以嘗試通過CAS修改鎖的狀態,如果沒有修改成功,則會自旋這個動作(可能會有很多線程在這自旋開銷CPU)。如果這個自旋的過程中檢測到排它鎖競爭成功,那麽tryAcquireShared()會返回-1,從而會走如排它鎖的Node類似的流程,可能也會被park住,等待排它鎖相應的線程最終調用unpark()動作來喚醒。
這就是Java提供的這種讀寫鎖,不過這並不是共享鎖的詮釋,在共享鎖裏面也有多種機制 ,或許這種讀寫鎖只是其中一種而已。在這種鎖下面,讀和寫的操作本身是互斥的,但是讀可以多個一起發生。這樣的鎖理論上是非常適合應用在“讀多寫少”的環境下(當然我們所講的讀多寫少是讀的比例遠遠大於寫,而不是多一點點),理論上講這樣鎖征用的粒度會大大降低,同時系統的瓶頸會減少,效率得到總體提升。
在本節中我們除了學習到AQS的內在,還應看到Java通過一個AQS隊列解決了許多問題,這個是Java層面的隊列模型,其實我們也可以利用許多隊列模型來解決自己的問題,甚至於可以改寫模型模型來滿足自己的需求.

關於Lock及AQS的一些補充:
1、 Lock的操作不僅僅局限於lock()/unlock(),因為這樣線程可能進入WAITING狀態,這個時候如果沒有unpark()就沒法喚醒它,可能會一直“睡”下去,可以嘗試用tryLock()、tryLock(long , TimeUnit)來做一些嘗試加鎖或超時來滿足某些特定場景的需要。例如有些時候發現嘗試加鎖無法加上,先釋放已經成功對其它對象添加的鎖,過一小會再來嘗試,這樣在某些場合下可以避免“死鎖”哦。
2、 lockInterruptibly() 它允許拋出InterruptException異常,也就是當外部發起了中斷操作,程序內部有可能會拋出這種異常,但是並不是絕對會拋出異常的,大家仔細看看代碼便清楚了。
3、 newCondition()操作,是返回一個Condition的對象,Condition只是一個接口,它要求實現await()、awaitUninterruptibly()、awaitNanos(long)、await(long , TimeUnit)、awaitUntil(Date)、signal()、signalAll()方法,AbstractQueuedSynchronizer中有一個內部類叫做ConditionObject實現了這個接口,它也是一個類似於隊列的實現,具體可以參考源碼。大多數情況下可以直接使用,當然覺得自己比較牛逼的話也可以參考源碼自己來實現。
4、 在AQS的Node中有每個Node自己的狀態(waitStatus),我們這裏歸納一下,分別包含:
SIGNAL 從前面的代碼狀態轉換可以看得出是前面有線程在運行,需要前面線程結束後,調用unpark()方法才能激活自己,值為:-1
CANCELLED 當AQS發起取消或fullyRelease()時,會是這個狀態。值為1,也是幾個狀態中唯一一個大於0的狀態,所以前面判定狀態大於0就基本等價於是CANCELLED的意思。
CONDITION 線程基於Condition對象發生了等待,進入了相應的隊列,自然也需要Condition對象來激活,值為-2。
PROPAGATE 讀寫鎖中,當讀鎖最開始沒有獲取到操作權限,得到後會發起一個doReleaseShared()動作,內部也是一個循環,當判定後續的節點狀態為0時,嘗試通過CAS自旋方式將狀態修改為這個狀態,表示節點可以運行。
狀態0 初始化狀態,也代表正在嘗試去獲取臨界資源的線程所對應的Node的狀態。

代碼上的理解註釋,可以參考:http://ifeve.com/juc-aqs-reentrantlock/

四 、AQS和CAS 對比

CAS(Compare And Swap)

什麽是CAS

CAS(Compare And Swap),即比較並交換。是解決多線程並行情況下使用鎖造成性能損耗的一種機制,CAS操作包含三個操作數——內存位置(V)、預期原值(A)和新值(B)。如果內存位置的值與預期原值相匹配,那麽處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在CAS指令之前返回該位置的值。CAS有效地說明了“我認為位置V應該包含值A;如果包含該值,則將B放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。

在JAVA中,sun.misc.Unsafe 類提供了硬件級別的原子操作來實現這個CAS。 java.util.concurrent 包下的大量類都使用了這個 Unsafe.java 類的CAS操作。至於 Unsafe.java 的具體實現這裏就不討論了。

CAS典型應用

java.util.concurrent.atomic 包下的類大多是使用CAS操作來實現的(eg. AtomicInteger.java,AtomicBoolean,AtomicLong)。下面以 AtomicInteger.java 的部分實現來大致講解下這些原子類的實現。

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;
 
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
 
    private volatile int value;// 初始int大小
    // 省略了部分代碼...
 
    // 帶參數構造函數,可設置初始int大小
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }
    // 不帶參數構造函數,初始int大小為0
    public AtomicInteger() {
    }
 
    // 獲取當前值
    public final int get() {
        return value;
    }
 
    // 設置值為 newValue
    public final void set(int newValue) {
        value = newValue;
    }
 
    //返回舊值,並設置新值為 newValue
    public final int getAndSet(int newValue) {
        /**
        * 這裏使用for循環不斷通過CAS操作來設置新值
        * CAS實現和加鎖實現的關系有點類似樂觀鎖和悲觀鎖的關系
        * */
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))
                return current;
        }
    }
 
    // 原子的設置新值為update, expect為期望的當前的值
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
 
    // 獲取當前值current,並設置新值為current+1
    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }
 
    // 此處省略部分代碼,余下的代碼大致實現原理都是類似的
}

一般來說在競爭不是特別激烈的時候,使用該包下的原子操作性能比使用 synchronized 關鍵字的方式高效的多(查看getAndSet(),可知如果資源競爭十分激烈的話,這個for循環可能換持續很久都不能成功跳出。不過這種情況可能需要考慮降低資源競爭才是)。
在較多的場景我們都可能會使用到這些原子類操作。一個典型應用就是計數了,在多線程的情況下需要考慮線程安全問題。通常第一映像可能就是:

public class Counter {
    private int count;
    public Counter(){}
    public int getCount(){
        return count;
    }
    public void increase(){
        count++;
    }
}

上面這個類在多線程環境下會有線程安全問題,要解決這個問題最簡單的方式可能就是通過加鎖的方式,調整如下:

public class Counter {
    private int count;
    public Counter(){}
    public synchronized int getCount(){
        return count;
    }
    public synchronized void increase(){
        count++;
    }
}

這類似於悲觀鎖的實現,我需要獲取這個資源,那麽我就給他加鎖,別的線程都無法訪問該資源,直到我操作完後釋放對該資源的鎖。我們知道,悲觀鎖的效率是不如樂觀鎖的,上面說了Atomic下的原子類的實現是類似樂觀鎖的,效率會比使用 synchronized 關系字高,推薦使用這種方式,實現如下:

public class Counter {
    private AtomicInteger count = new AtomicInteger();
    public Counter(){}
    public int getCount(){
        return count.get();
    }
    public void increase(){
        count.getAndIncrement();
    }
}

AQS(AbstractQueuedSynchronizer)

什麽是AQS

AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用於實現基於FIFO等待隊列的阻塞鎖和相關的同步器的一個同步框架。這個抽象類被設計為作為一些可用原子int值來表示狀態的同步器的基類。如果你有看過類似 CountDownLatch 類的源碼實現,會發現其內部有一個繼承了 AbstractQueuedSynchronizer 的內部類 Sync。可見 CountDownLatch 是基於AQS框架來實現的一個同步器.類似的同步器在JUC下還有不少。(eg. Semaphore)

AQS用法

如上所述,AQS管理一個關於狀態信息的單一整數,該整數可以表現任何狀態。比如, Semaphore 用它來表現剩余的許可數,ReentrantLock 用它來表現擁有它的線程已經請求了多少次鎖;FutureTask 用它來表現任務的狀態(尚未開始、運行、完成和取消)

 To use this class as the basis of a synchronizer, redefine the
 * following methods, as applicable, by inspecting and/or modifying
 * the synchronization state using {@link #getState}, {@link
 * #setState} and/or {@link #compareAndSetState}:
 *
 * <ul>
 * <li> {@link #tryAcquire}
 * <li> {@link #tryRelease}
 * <li> {@link #tryAcquireShared}
 * <li> {@link #tryReleaseShared}
 * <li> {@link #isHeldExclusively}
 * </ul>

如JDK的文檔中所說,使用AQS來實現一個同步器需要覆蓋實現如下幾個方法,並且使用getState,setState,compareAndSetState這幾個方法來設置獲取狀態
1. boolean tryAcquire(int arg)
2. boolean tryRelease(int arg)
3. int tryAcquireShared(int arg)
4. boolean tryReleaseShared(int arg)
5. boolean isHeldExclusively()

以上方法不需要全部實現,根據獲取的鎖的種類可以選擇實現不同的方法,支持獨占(排他)獲取鎖的同步器應該實現tryAcquiretryReleaseisHeldExclusively而支持共享獲取的同步器應該實現tryAcquireSharedtryReleaseSharedisHeldExclusively。下面以 CountDownLatch 舉例說明基於AQS實現同步器, CountDownLatch 用同步狀態持有當前計數,countDown方法調用 release從而導致計數器遞減;當計數器為0時,解除所有線程的等待;await調用acquire,如果計數器為0,acquire 會立即返回,否則阻塞。通常用於某任務需要等待其他任務都完成後才能繼續執行的情景。源碼如下:

public class CountDownLatch {
    /**
     * 基於AQS的內部Sync
     * 使用AQS的state來表示計數count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
 
        Sync(int count) {
            // 使用AQS的getState()方法設置狀態
            setState(count);
        }
 
        int getCount() {
            // 使用AQS的getState()方法獲取狀態
            return getState();
        }
 
        // 覆蓋在共享模式下嘗試獲取鎖
        protected int tryAcquireShared(int acquires) {
            // 這裏用狀態state是否為0來表示是否成功,為0的時候可以獲取到返回1,否則不可以返回-1
            return (getState() == 0) ? 1 : -1;
        }
 
        // 覆蓋在共享模式下嘗試釋放鎖
        protected boolean tryReleaseShared(int releases) {
            // 在for循環中Decrement count直至成功;
            // 當狀態值即count為0的時候,返回false表示 signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
 
    private final Sync sync;
 
    // 使用給定計數值構造CountDownLatch
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
 
    // 讓當前線程阻塞直到計數count變為0,或者線程被中斷
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 
    // 阻塞當前線程,除非count變為0或者等待了timeout的時間。當count變為0時,返回true
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
 
    // count遞減
    public void countDown() {
        sync.releaseShared(1);
    }
 
    // 獲取當前count值
    public long getCount() {
        return sync.getCount();
    }
 
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

參考資料:

http://ifeve.com/java-special-troops-aqs/

Java並發編程原理與實戰十九:AQS 剖析