1. 程式人生 > >Java併發程式設計-Lock和condition的原理及AQS的運用

Java併發程式設計-Lock和condition的原理及AQS的運用

AQS的全稱為(AbstractQueuedSynchronizer),這個類也是在java.util.concurrent.locks下面。這個類似乎很不容易看懂,因為它僅僅是提供了一系列公共的方法,讓子類來呼叫。那麼要理解意思,就得從子類下手,反過來看才容易看懂。如下圖所示:
QQ圖片20140110194431

圖 5-15 AQS的子類實現

這麼多類,我們看那一個?剛剛提到過鎖(Lock),我們就從鎖開始吧。這裡就先以ReentrantLock排它鎖為例開始展開講解如何利用AQS的,然後再簡單介紹讀寫鎖的要點(讀寫鎖本身的實現十分複雜,要完全說清楚需要大量的篇幅來說明)。
首先來看看ReentrantLock的構造方法,它的構造方法有兩個,如下圖所示:

QQ圖片20140110194534
圖 5-16 排它鎖的構造方法
很顯然,物件中有一個屬性叫sync,有兩種不同的實現類,預設是“NonfairSync”來實現,而另一個“FairSync”它們都是排它鎖的內部類,不論用那一個都能實現排它鎖,只是內部可能有點原理上的區別。先以“NonfairSync”類為例,它的lock()方法是如何實現的呢?
QQ圖片20140110194615
圖 5-17 排它鎖的lock方法
lock()方法先通過CAS嘗試將狀態從0修改為1。若直接修改成功,前提條件自然是鎖的狀態為0,則直接將執行緒的OWNER修改為當前執行緒,這是一種理想情況,如果併發粒度設定適當也是一種樂觀情況。
若上一個動作未成功,則會間接呼叫了acquire(1)來繼續操作,這個acquire(int)方法就是在AbstractQueuedSynchronizer當中了。這個方法表面上看起來簡單,但真實情況比較難以看懂,因為第一次看這段程式碼可能不知道它要做什麼!不急,一步一步來分解。
首先看tryAcquire(arg)這裡的呼叫(當然傳入的引數是1),在預設的“NonfairSync”實現類中,會這樣來實現:
QQ圖片20140110194650

媽呀,這程式碼好費勁,胖哥第一回看也是覺得這樣,細心看看也不是想象當中那麼難:

○ 首先獲取這個鎖的狀態,如果狀態為0,則嘗試設定狀態為傳入的引數(這裡就是1),若設定成功就代表自己獲取到了鎖,返回true了。狀態為0設定1的動作在外部就有做過一次,內部再一次做只是提升概率,而且這樣的操作相對鎖來講不佔開銷。
○ 如果狀態不是0,則判定當前執行緒是否為排它鎖的Owner,如果是Owner則嘗試將狀態增加acquires(也就是增加1),如果這個狀態值越界,則會丟擲異常提示,若沒有越界,將狀態設定進去後返回true(實現了類似於偏向的功能,可重入,但是無需進一步徵用)。
○ 如果狀態不是0,且自身不是owner,則返回false。

回到圖 5-17中對tryAcquire()的呼叫判定中是通過if(!tryAcquire())作為第1個條件的,如果返回true,則判定就不會成立了,自然後面的acquireQueued動作就不會再執行了,如果發生這樣的情況是最理想的。
無論多麼樂觀,徵用是必然存在的,如果徵用存在則owner自然不會是自己,tryAcquire()方法會返回false,接著就會再呼叫方法:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)做相關的操作。
這個方法的呼叫的程式碼更不好懂,需要從裡往外看,這裡的Node.EXCLUSIVE是節點的型別,看名稱應該清楚是排它型別的意思。接著呼叫addWaiter()來增加一個排它鎖型別的節點,這個addWaiter()的程式碼是這樣寫的:
QQ圖片20140110194812
圖 5-19 addWaiter的程式碼
這裡建立了一個Node的物件,將當前執行緒和傳入的Node.EXCLUSIVE傳入,也就是說Node節點理論上包含了這兩項資訊。程式碼中的tail是AQS的一個屬性,剛開始的時候肯定是為null,也就是不會進入第一層if判定的區域,而直接會進入enq(node)的程式碼,那麼直接來看看enq(node)的程式碼。

看到了tail就應該猜到了AQS是連結串列吧,沒錯,而且它還應該有一個head引用來指向連結串列的頭節點,AQS在初始化的時候head、tail都是null,在執行時來回移動。此時,我們最少至少知道AQS是一個基於狀態(state)的連結串列管理方式。

QQ圖片20140110194853

圖 5-20 enq(Node)的原始碼
這段程式碼就是連結串列的操作,某些同學可能很牛,一下就看懂了,某些同學一掃而過覺得知道大概就可以了,某些同學可能會莫不著頭腦。胖哥為了給第三類同學來“開開葷”,簡單講解下這個程式碼。
首先這個是一個死迴圈,而且本身沒有鎖,因此可以有多個執行緒進來,假如某個執行緒進入方法,此時head、tail都是null,自然會進入if(t == null)所在的程式碼區域,這部分程式碼會建立一個Node出來名字叫h,這個Node沒有像開始那樣給予型別和執行緒,很明顯是一個空的Node物件,而傳入的Node物件首先被它的next引用所指向,此時傳入的node和某一個執行緒建立的h物件如下圖所示。
QQ圖片20140110194922
圖 5-21 臨時的h物件建立後的與傳入的Node指向關係
剛才我們很理想的認為只有一個執行緒會出現這種情況,如果有多個執行緒併發進入這個if判定區域,可能就會同時存在多個這樣的資料結構,在各自形成資料結構後,多個執行緒都會去做compareAndSetHead(h)的動作,也就是嘗試將這個臨時h節點設定為head,顯然併發時只有一個執行緒會成功,因此成功的那個執行緒會執行tail = node的操作,整個AQS的連結串列就成為:

QQ圖片20140110194957

圖 5-22 AQS被第一個請求成功的執行緒初始化後
有一個執行緒會成功修改head和tail的值,其它的執行緒會繼續迴圈,再次迴圈就不會進入if (t == null)的邏輯了,而會進入else語句的邏輯中。
在else語句所在的邏輯中,第一步是node.prev = t,這個t就是tail的臨時值,也就是首先讓嘗試寫入的node節點的prev指標指向原來的結束節點,然後嘗試通過CAS替換掉AQS中的tail的內容為當前執行緒的Node,無論有多少個執行緒併發到這裡,依然只會有一個能成功,成功者執行t.next = node,也就是讓原先的tail節點的next引用指向現在的node,現在的node已經成為了最新的結束節點,不成功者則會繼續迴圈。
簡單使用圖解的方式來說明,3個步驟如下所示,如下圖所示:

QQ圖片20140110194957

圖 5-23 插入一個節點步驟前後動作
插入多個節點的時候,就以此類推了哦,總之節點都是在連結串列尾部寫入的,而且是執行緒安全的。
知道了AQS大致的寫入是一種雙向連結串列的插入操作,但插入連結串列節點對鎖有何用途呢,我們還得退回到前面圖 5-19的程式碼中addWaiter方法最終返回了要寫入的node節點, 再回退到圖5-17中所在的程式碼中需要將這個返回的node節點作為acquireQueued方法入口引數,並傳入另一個引數(依然是1),看看它裡面到底做了些什麼?請看下圖:
QQ圖片20140110195059

圖 5-24 acquireQueued的方法內容
這裡也是一個死迴圈,除非進入if(p == head && tryAcquire(arg))這個判定條件,而p為node.predcessor()得到,這個方法返回node節點的前一個節點,也就是說只有當前一個節點是head的時候,進一步嘗試通過tryAcquire(arg)來徵用才有機會成功。tryAcquire(arg)這個方法我們前面介紹過,成立的條件為:鎖的狀態為0,且通過CAS嘗試設定狀態成功或執行緒的持有者本身是當前執行緒才會返回true,我們現在來詳細拆分這部分程式碼。
○ 如果這個條件成功後,發生的幾個動作包含:
(1) 首先呼叫setHead(Node)的操作,這個操作內部會將傳入的node節點作為AQS的head所指向的節點。執行緒屬性設定為空(因為現在已經獲取到鎖,不再需要記錄下這個節點所對應的執行緒了),再將這個節點的perv引用賦值為null。
(2) 進一步將的前一個節點的next引用賦值為null。
在進行了這樣的修改後,佇列的結構就變成了以下這種情況了,通過這樣的方式,就可以讓執行完的節點釋放掉記憶體區域,而不是無限制增長佇列,也就真正形成FIFO了:

QQ圖片20140110195124
圖 5-25 CAS成功獲取鎖後,佇列的變化
○ 如果這個判定條件失敗
會首先判定:“shouldParkAfterFailedAcquire(p , node)”,這個方法內部會判定前一個節點的狀態是否為:“Node.SIGNAL”,若是則返回true,若不是都會返回false,不過會再做一些操作:判定節點的狀態是否大於0,若大於0則認為被“CANCELLED”掉了(我們沒有說明幾個狀態的值,不過大於0的只可能被CANCELLED的狀態),因此會從前一個節點開始逐步迴圈找到一個沒有被“CANCELLED”節點,然後與這個節點的next、prev的引用相互指向;如果前一個節點的狀態不是大於0的,則通過CAS嘗試將狀態修改為“Node.SIGNAL”,自然的如果下一輪迴圈的時候會返回值應該會返回true。
如果這個方法返回了true,則會執行:“parkAndCheckInterrupt()”方法,它是通過LockSupport.park(this)將當前執行緒掛起到WATING狀態,它需要等待一箇中斷、unpark方法來喚醒它,通過這樣一種FIFO的機制的等待,來實現了Lock的操作。
相應的,可以自己看看FairSync實現類的lock方法,其實區別不大,有些細節上的區別可能會決定某些特定場景的需求,你也可以自己按照這樣的思路去實現一個自定義的鎖。
接下來簡單看看unlock()解除鎖的方式,如果獲取到了鎖不釋放,那自然就成了死鎖,所以必須要釋放,來看看它內部是如何釋放的。同樣從排它鎖(ReentrantLock)中的unlock()方法開始,請先看下面的程式碼截圖:

QQ圖片20140110195158

圖 5-26 unlock方法間接呼叫AQS的release(1)來完成
通過tryRelease(int)方法進行了某種判定,若它成立則會將head傳入到unparkSuccessor(Node)方法中並返回true,否則返回false。首先來看看tryRelease(int)方法,如下圖所示:
QQ圖片20140110195239

圖 5-27 tryRelease(1)方法
這個動作可以認為就是一個設定鎖狀態的操作,而且是將狀態減掉傳入的引數值(引數是1),如果結果狀態為0,就將排它鎖的Owner設定為null,以使得其它的執行緒有機會進行執行。
在排它鎖中,加鎖的時候狀態會增加1(當然可以自己修改這個值),在解鎖的時候減掉1,同一個鎖,在可以重入後,可能會被疊加為2、3、4這些值,只有unlock()的次數與lock()的次數對應才會將Owner執行緒設定為空,而且也只有這種情況下才會返回true。
這一點大家寫程式碼要注意了哦,如果是在迴圈體中lock()或故意使用兩次以上的lock(),而最終只有一次unlock(),最終可能無法釋放鎖。在本書的src/chapter05/locks/目錄下有相應的程式碼,大家可以自行測試的哦。
在方法unparkSuccessor(Node)中,就意味著真正要釋放鎖了,它傳入的是head節點(head節點是已經執行完的節點,在後面闡述這個方法的body的時候都叫head節點),內部首先會發生的動作是獲取head節點的next節點,如果獲取到的節點不為空,則直接通過:“LockSupport.unpark()”方法來釋放對應的被掛起的執行緒,這樣一來將會有一個節點喚醒後繼續進入圖 5-24中的迴圈進一步嘗試tryAcquire()方法來獲取鎖,但是也未必能完全獲取到哦,因為此時也可能有一些外部的請求正好與之徵用,而且還奇蹟般的成功了,那這個執行緒的運氣就有點悲劇了,不過通常樂觀認為不會每一次都那麼悲劇。
再看看共享鎖,從前面的排它鎖可以看得出來是用一個狀態來標誌鎖的,而共享鎖也不例外,但是Java不希望去定義兩個狀態,所以它與排它鎖的第一個區別就是在鎖的狀態上,它用int來標誌鎖的狀態,int有4個位元組,它用高16位標誌讀鎖(共享鎖),低16位標誌寫鎖(排它鎖),高16位每次增加1相當於增加65536(通過1 << 16得到),自然的在這種讀寫鎖中,讀鎖和寫鎖的個數都不能超過65535個(條件是每次增加1的,如果遞增是跳躍的將會更少)。在計算讀鎖數量的時候將狀態左移16位,而計算排它鎖會與65535“按位求與”操作,如下圖所示。

QQ圖片20140110195308

圖 5-28 讀寫鎖中的數量計算及限制
寫鎖的功能與“ReentrantLock”基本一致,區域在於它會在tryAcquire操作的時候,判定狀態的時候會更加複雜一些(因此有些時候它的效能未必好)。
讀鎖也會寫入佇列,Node的型別被改為:“Node.SHARED”這種型別,lock()時候呼叫的是AQS的acquireShared(int)方法,進一步呼叫tryAcquireShared()操作裡面只需要檢測是否有排它鎖,如果沒有則可以嘗試通過CAS修改鎖的狀態,如果沒有修改成功,則會自旋這個動作(可能會有很多執行緒在這自旋開銷CPU)。如果這個自旋的過程中檢測到排它鎖競爭成功,那麼tryAcquireShared()會返回-1,從而會走如排它鎖的Node類似的流程,可能也會被park住,等待排它鎖相應的執行緒最終呼叫unpark()動作來喚醒。
這就是Java提供的這種讀寫鎖,不過這並不是共享鎖的詮釋,在共享鎖裡面也有多種機制 ,或許這種讀寫鎖只是其中一種而已。在這種鎖下面,讀和寫的操作本身是互斥的,但是讀可以多個一起發生。這樣的鎖理論上是非常適合應用在“讀多寫少”的環境下(當然我們所講的讀多寫少是讀的比例遠遠大於寫,而不是多一點點),理論上講這樣鎖徵用的粒度會大大降低,同時系統的瓶頸會減少,效率得到總體提升。
在本節中我們除了學習到AQS的內在,還應看到Java通過一個AQS佇列解決了許多問題,這個是Java層面的佇列模型,其實我們也可以利用許多佇列模型來解決自己的問題,甚至於可以改寫模型模型來滿足自己的需求,在本章的5.6.1節中將會詳細介紹。
關於Lock及AQS的一些補充:
1、 Lock的操作不僅僅侷限於lock()/unlock(),因為這樣執行緒可能進入WAITING狀態,這個時候如果沒有unpark()就沒法喚醒它,可能會一直“睡”下去,可以嘗試用tryLock()、tryLock(long , TimeUnit)來做一些嘗試加鎖或超時來滿足某些特定場景的需要。例如有些時候發現嘗試加鎖無法加上,先釋放已經成功對其它物件新增的鎖,過一小會再來嘗試,這樣在某些場合下可以避免“死鎖”哦。
2、 lockInterruptibly() 它允許丟擲InterruptException異常,也就是當外部發起了中斷操作,程式內部有可能會丟擲這種異常,但是並不是絕對會丟擲異常的,大家仔細看看程式碼便清楚了。
3、 newCondition()操作,是返回一個Condition的物件,Condition只是一個介面,它要求實現await()、awaitUninterruptibly()、awaitNanos(long)、await(long , TimeUnit)、awaitUntil(Date)、signal()、signalAll()方法,AbstractQueuedSynchronizer中有一個內部類叫做ConditionObject實現了這個介面,它也是一個類似於佇列的實現,具體可以參考原始碼。大多數情況下可以直接使用,當然覺得自己比較牛逼的話也可以參考原始碼自己來實現。
4、 在AQS的Node中有每個Node自己的狀態(waitStatus),我們這裡歸納一下,分別包含:
SIGNAL 從前面的程式碼狀態轉換可以看得出是前面有執行緒在執行,需要前面執行緒結束後,呼叫unpark()方法才能啟用自己,值為:-1
CANCELLED 當AQS發起取消或fullyRelease()時,會是這個狀態。值為1,也是幾個狀態中唯一一個大於0的狀態,所以前面判定狀態大於0就基本等價於是CANCELLED的意思。
CONDITION 執行緒基於Condition物件發生了等待,進入了相應的佇列,自然也需要Condition物件來啟用,值為-2。
PROPAGATE 讀寫鎖中,當讀鎖最開始沒有獲取到操作許可權,得到後會發起一個doReleaseShared()動作,內部也是一個迴圈,當判定後續的節點狀態為0時,嘗試通過CAS自旋方式將狀態修改為這個狀態,表示節點可以執行。
狀態0 初始化狀態,也代表正在嘗試去獲取臨界資源的執行緒所對應的Node的狀態。

Codition的實現

在java.util.concurrent包中,有兩個很特殊的工具類,Condition和ReentrantLock,使用過的人都知道,ReentrantLock(重入鎖)是jdk的concurrent包提供的一種獨佔鎖的實現。它繼承自Dong Lea的 AbstractQueuedSynchronizer(同步器),確切的說是ReentrantLock的一個內部類繼承了AbstractQueuedSynchronizer,ReentrantLock只不過是代理了該類的一些方法,可能有人會問為什麼要使用內部類在包裝一層? 我想是安全的關係,因為AbstractQueuedSynchronizer中有很多方法,還實現了共享鎖,Condition(稍候再細說)等功能,如果直接使ReentrantLock繼承它,則很容易出現AbstractQueuedSynchronizer中的API被誤用的情況。

言歸正傳,今天,我們討論下Condition工具類的實現。

ReentrantLock和Condition的使用方式通常是這樣的:

C1

執行後,結果如下:

C2

可以看到,

Condition的執行方式,是當線上程1中呼叫await方法後,執行緒1將釋放鎖,並且將自己沉睡,等待喚醒,

執行緒2獲取到鎖後,開始做事,完畢後,呼叫Condition的signal方法,喚醒執行緒1,執行緒1恢復執行。

以上說明Condition是一個多執行緒間協調通訊的工具類,使得某個,或者某些執行緒一起等待某個條件(Condition),只有當該條件具備( signal 或者 signalAll方法被帶呼叫)時 ,這些等待執行緒才會被喚醒,從而重新爭奪鎖。

那,它是怎麼實現的呢?

首先還是要明白,reentrantLock.newCondition() 返回的是Condition的一個實現,該類在AbstractQueuedSynchronizer中被實現,叫做newCondition()

C3

它可以訪問AbstractQueuedSynchronizer中的方法和其餘內部類( AbstractQueuedSynchronizer是個抽象類,至於他怎麼能訪問,這裡有個很奇妙的點,後面我專門用demo說明 )

現在,我們一起來看下Condition類的實現,還是從上面的demo入手,

為了方便書寫,我將AbstractQueuedSynchronizer縮寫為AQS

當await被呼叫時,程式碼如下:

01 public final void await() throws InterruptedException {
02 if (Thread.interrupted())
03 throw new InterruptedException();
04 Node node = addConditionWaiter(); //將當前執行緒包裝下後,
05 //新增到Condition自己維護的一個連結串列中。
06 int savedState = fullyRelease(node);