1. 程式人生 > >Java併發之AQS詳解

Java併發之AQS詳解

一、概述

  談到併發,不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronizer(AQS)!

  類如其名,抽象的佇列式的同步器,AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock/Semaphore/CountDownLatch...。

  以下是本文的目錄大綱:

    1. 概述
    2. 框架
    3. 原始碼詳解
    4. 簡單應用

  若有不正之處,請諒解和批評指正,不勝感激。

二、框架

  它維護了一個volatile int state(代表共享資源)和一個FIFO執行緒等待佇列(多執行緒爭用資源被阻塞時會進入此佇列)。這裡volatile是核心關鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:

  • getState()
  • setState()
  • compareAndSetState()

  AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個執行緒能執行,如ReentrantLock)和Share(共享,多個執行緒可同時執行,如Semaphore/CountDownLatch)。

  不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:

  • isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。

  以ReentrantLock為例,state初始化為0,表示未鎖定狀態。A執行緒lock()時,會呼叫tryAcquire()獨佔該鎖並將state+1。此後,其他執行緒再tryAcquire()時就會失敗,直到A執行緒unlock()到state=0(即釋放鎖)為止,其它執行緒才有機會獲取該鎖。當然,釋放鎖之前,A執行緒自己是可以重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。

  再以CountDownLatch以例,任務分為N個子執行緒去執行,state也初始化為N(注意N要與執行緒個數一致)。這N個子執行緒是並行執行的,每個子執行緒執行完後countDown()一次,state會CAS減1。等到所有子執行緒都執行完後(即state=0),會unpark()主呼叫執行緒,然後主呼叫執行緒就會從await()函式返回,繼續後餘動作。

  一般來說,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支援自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。

三、原始碼詳解

  本節開始講解AQS的原始碼實現。依照acquire-release、acquireShared-releaseShared的次序來。

3.1 acquire(int)

  此方法是獨佔模式下執行緒獲取共享資源的頂層入口。如果獲取到資源,執行緒直接返回,否則進入等待佇列,直到獲取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當然不僅僅只限於lock()。獲取到資源後,執行緒就可以去執行其臨界區程式碼了。下面是acquire()的原始碼:

1 public final void acquire(int arg) {
2     if (!tryAcquire(arg) &&
3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4         selfInterrupt();
5 }

  函式流程如下:

    1. tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
    2. addWaiter()將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
    3. acquireQueued()使執行緒在等待佇列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
    4. 如果執行緒在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

  這時單憑這4個抽象的函式來看流程還有點朦朧,不要緊,看完接下來的分析後,你就會明白了。就像《大話西遊》裡唐僧說的:等你明白了捨生取義的道理,你自然會回來和我唱這首歌的。

3.1.1 tryAcquire(int)

  此方法嘗試去獲取獨佔資源。如果獲取成功,則直接返回true,否則直接返回false。這也正是tryLock()的語義,還是那句話,當然不僅僅只限於tryLock()。如下是tryAcquire()的原始碼:

1     protected boolean tryAcquire(int arg) {
2         throw new UnsupportedOperationException();
3     }

  什麼?直接throw異常?說好的功能呢?好吧,還記得概述裡講的AQS只是一個框架,具體資源的獲取/釋放方式交由自定義同步器去實現嗎?就是這裡了!!!AQS這裡只定義了一個介面,具體資源的獲取交由自定義同步器去實現了(通過state的get/set/CAS)!!!至於能不能重入,能不能加塞,那就看具體的自定義同步器怎麼去設計了!!!當然,自定義同步器在進行資源訪問時要考慮執行緒安全的影響。

  這裡之所以沒有定義成abstract,是因為獨佔模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared。如果都定義成abstract,那麼每個模式也要去實現另一模式下的介面。說到底,Doug Lea還是站在咱們開發者的角度,儘量減少不必要的工作量。

3.1.2 addWaiter(Node)

  此方法用於將當前執行緒加入到等待佇列的隊尾,並返回當前執行緒所在的結點。還是上原始碼吧:

 1 private Node addWaiter(Node mode) {
 2     //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)
 3     Node node = new Node(Thread.currentThread(), mode);
 4     
 5     //嘗試快速方式直接放到隊尾。
 6     Node pred = tail;
 7     if (pred != null) {
 8         node.prev = pred;
 9         if (compareAndSetTail(pred, node)) {
10             pred.next = node;
11             return node;
12         }
13     }
14     
15     //上一步失敗則通過enq入隊。
16     enq(node);
17     return node;
18 }

 不用再說了,直接看註釋吧。這裡我們說下Node。Node結點是對每一個訪問同步程式碼的執行緒的封裝,其包含了需要同步的執行緒本身以及執行緒的狀態,如是否被阻塞,是否等待喚醒,是否已經被取消等。變數waitStatus則表示當前被封裝成Node結點的等待狀態,共有4種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

  • CANCELLED:值為1,在同步佇列中等待的執行緒等待超時或被中斷,需要從同步佇列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。

  • SIGNAL:值為-1,被標識為該等待喚醒狀態的後繼結點,當其前繼結點的執行緒釋放了同步鎖或被取消,將會通知該後繼結點的執行緒執行。說白了,就是處於喚醒狀態,只要前繼結點釋放鎖,就會通知標識為SIGNAL狀態的後繼結點的執行緒執行。

  • CONDITION:值為-2,與Condition相關,該標識的結點處於等待佇列中,結點的執行緒等待在Condition上,當其他執行緒呼叫了Condition的signal()方法後,CONDITION狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖。

  • PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀態標識結點的執行緒處於可執行狀態。

  • 0狀態:值為0,代表初始化狀態。

AQS在判斷狀態時,通過用waitStatus>0表示取消狀態,而waitStatus<0表示有效狀態。

3.1.2.1 enq(Node)

   此方法用於將node加入隊尾。原始碼如下:

 1 private Node enq(final Node node) {
 2     //CAS"自旋",直到成功加入隊尾
 3     for (;;) {
 4         Node t = tail;
 5         if (t == null) { // 佇列為空,建立一個空的標誌結點作為head結點,並將tail也指向它。
 6             if (compareAndSetHead(new Node()))
 7                 tail = head;
 8         } else {//正常流程,放入隊尾
 9             node.prev = t;
10             if (compareAndSetTail(t, node)) {
11                 t.next = node;
12                 return t;
13             }
14         }
15     }
16 }

如果你看過AtomicInteger.getAndIncrement()函式原始碼,那麼相信你一眼便看出這段程式碼的精華。CAS自旋volatile變數,是一種很經典的用法。還不太瞭解的,自己去百度一下吧。

3.1.3 acquireQueued(Node, int)

  OK,通過tryAcquire()和addWaiter(),該執行緒獲取資源失敗,已經被放入等待佇列尾部了。聰明的你立刻應該能想到該執行緒下一部該幹什麼了吧:進入等待狀態休息,直到其他執行緒徹底釋放資源後喚醒自己,自己再拿到資源,然後就可以去幹自己想幹的事了。沒錯,就是這樣!是不是跟醫院排隊拿號有點相似~~acquireQueued()就是幹這件事:在等待佇列中排隊拿號(中間沒其它事幹可以休息),直到拿到號後再返回。這個函式非常關鍵,還是上原始碼吧:

 1 final boolean acquireQueued(final Node node, int arg) {
 2     boolean failed = true;//標記是否成功拿到資源
 3     try {
 4         boolean interrupted = false;//標記等待過程中是否被中斷過
 5         
 6         //又是一個“自旋”!
 7         for (;;) {
 8             final Node p = node.predecessor();//拿到前驅
 9             //如果前驅是head,即該結點已成老二,那麼便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。
10             if (p == head && tryAcquire(arg)) {
11                 setHead(node);//拿到資源後,將head指向該結點。所以head所指的標杆結點,就是當前獲取到資源的那個結點或null。
12                 p.next = null; // setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結點。也就意味著之前拿完資源的結點出隊了!
13                 failed = false;
14                 return interrupted;//返回等待過程中是否被中斷過
15             }
16             
17             //如果自己可以休息了,就進入waiting狀態,直到被unpark()
18             if (shouldParkAfterFailedAcquire(p, node) &&
19                 parkAndCheckInterrupt())
20                 interrupted = true;//如果等待過程中被中斷過,哪怕只有那麼一次,就將interrupted標記為true
21         }
22     } finally {
23         if (failed)
24             cancelAcquire(node);
25     }
26 }

到這裡了,我們先不急著總結acquireQueued()的函式流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體幹些什麼。

3.1.3.1 shouldParkAfterFailedAcquire(Node, Node)

  此方法主要用於檢查狀態,看看自己是否真的可以去休息了(進入waiting狀態,如果執行緒狀態轉換不熟,可以參考本人上一篇寫的),萬一佇列前邊的執行緒都放棄了只是瞎站著,那也說不定,對吧!

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2     int ws = pred.waitStatus;//拿到前驅的狀態
 3     if (ws == Node.SIGNAL)
 4         //如果已經告訴前驅拿完號後通知自己一下,那就可以安心休息了
 5         return true;
 6     if (ws > 0) {
 7         /*
 8          * 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。
 9          * 注意:那些放棄的結點,由於被自己“加塞”到它們前邊,它們相當於形成一個無引用鏈,稍後就會被保安大叔趕走了(GC回收)!
10          */
11         do {
12             node.prev = pred = pred.prev;
13         } while (pred.waitStatus > 0);
14         pred.next = node;
15     } else {
16          //如果前驅正常,那就把前驅的狀態設定成SIGNAL,告訴它拿完號後通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!
17         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
18     }
19     return false;
20 }

整個流程中,如果前驅結點的狀態不是SIGNAL,那麼自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿號。

3.1.3.2 parkAndCheckInterrupt()

  如果執行緒找好安全休息點後,那就可以安心去休息了。此方法就是讓執行緒去休息,真正進入等待狀態。

1 private final boolean parkAndCheckInterrupt() {
2     LockSupport.park(this);//呼叫park()使執行緒進入waiting狀態
3     return Thread.interrupted();//如果被喚醒,檢視自己是不是被中斷的。
4 }

   park()會讓當前執行緒進入waiting狀態。在此狀態下,有兩種途徑可以喚醒該執行緒:1)被unpark();2)被interrupt()。(再說一句,如果執行緒狀態轉換不熟,可以參考本人寫的)。需要注意的是,Thread.interrupted()會清除當前執行緒的中斷標記位。 

3.1.3.3 小結

  OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),現在讓我們再回到acquireQueued(),總結下該函式的具體流程:

  1. 結點進入隊尾後,檢查狀態,找到安全休息點;
  2. 呼叫park()進入waiting狀態,等待unpark()或interrupt()喚醒自己;
  3. 被喚醒後,看自己是不是有資格能拿到號。如果拿到,head指向當前結點,並返回從入隊到拿到號的整個過程中是否被中斷過;如果沒拿到,繼續流程1。

3.1.4 小結

  OKOK,acquireQueued()分析完之後,我們接下來再回到acquire()!再貼上它的原始碼吧:

1 public final void acquire(int arg) {
2     if (!tryAcquire(arg) &&
3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4         selfInterrupt();
5 }

再來總結下它的流程吧:

  1. 呼叫自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
  2. 沒成功,則addWaiter()將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
  3. acquireQueued()使執行緒在等待佇列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
  4. 如果執行緒在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

由於此函式是重中之重,我再用流程圖總結一下:

至此,acquire()的流程終於算是告一段落了。這也就是ReentrantLock.lock()的流程,不信你去看其lock()原始碼吧,整個函式就是一條acquire(1)!!!

3.2 release(int)

   上一小節已經把acquire()說完了,這一小節就來講講它的反操作release()吧。此方法是獨佔模式下執行緒釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待佇列裡的其他執行緒來獲取資源。這也正是unlock()的語義,當然不僅僅只限於unlock()。下面是release()的原始碼:

1 public final boolean release(int arg) {
2     if (tryRelease(arg)) {
3         Node h = head;//找到頭結點
4         if (h != null && h.waitStatus != 0)
5             unparkSuccessor(h);//喚醒等待佇列裡的下一個執行緒
6         return true;
7     }
8     return false;
9 }

  邏輯並不複雜。它呼叫tryRelease()來釋放資源。有一點需要注意的是,它是根據tryRelease()的返回值來判斷該執行緒是否已經完成釋放掉資源了!所以自定義同步器在設計tryRelease()的時候要明確這一點!!

3.2.1 tryRelease(int)

  此方法嘗試去釋放指定量的資源。下面是tryRelease()的原始碼:

1 protected boolean tryRelease(int arg) {
2     throw new UnsupportedOperationException();
3 }

  跟tryAcquire()一樣,這個方法是需要獨佔模式的自定義同步器去實現的。正常來說,tryRelease()都會成功的,因為這是獨佔模式,該執行緒來釋放資源,那麼它肯定已經拿到獨佔資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮執行緒安全的問題。但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該執行緒是否已經完成釋放掉資源了!所以自義定同步器在實現時,如果已經徹底釋放資源(state=0),要返回true,否則返回false。

3.2.2 unparkSuccessor(Node)

  此方法用於喚醒等待佇列中下一個執行緒。下面是原始碼:

 1 private void unparkSuccessor(Node node) {
 2     //這裡,node一般為當前執行緒所在的結點。
 3     int ws = node.waitStatus;
 4     if (ws < 0)//置零當前執行緒所在的結點狀態,允許失敗。
 5         compareAndSetWaitStatus(node, ws, 0);
 6 
 7     Node s = node.next;//找到下一個需要喚醒的結點s
 8     if (s == null || s.waitStatus > 0) {//如果為空或已取消
 9         s = null;
10         for (Node t = tail; t != null && t != node; t = t.prev)
11             if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。
12                 s = t;
13     }
14     if (s != null)
15         LockSupport.unpark(s.thread);//喚醒
16 }

  這個函式並不複雜。一句話概括:用unpark()喚醒等待佇列中最前邊的那個未放棄執行緒,這裡我們也用s來表示吧。此時,再和acquireQueued()聯絡起來,s被喚醒後,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關係,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這裡既然s已經是等待佇列中最前邊的那個未放棄執行緒了,那麼通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),然後s把自己設定成head標杆結點,表示自己已經獲取到資源了,acquire()也返回了!!And then, DO what you WANT!

3.2.3 小結

  release()是獨佔模式下執行緒釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待佇列裡的其他執行緒來獲取資源。

3.3 acquireShared(int)

  此方法是共享模式下執行緒獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待佇列,直到獲取到資源為止,整個過程忽略中斷。下面是acquireShared()的原始碼:

1 public final void acquireShared(int arg) {
2     if (tryAcquireShared(arg) < 0)
3         doAcquireShared(arg);
4 }

  這裡tryAcquireShared()依然需要自定義同步器去實現。但是AQS已經把其返回值的語義定義好了:負值代表獲取失敗;0代表獲取成功,但沒有剩餘資源;正數表示獲取成功,還有剩餘資源,其他執行緒還可以去獲取。所以這裡acquireShared()的流程就是:

    1. tryAcquireShared()嘗試獲取資源,成功則直接返回;
    2. 失敗則通過doAcquireShared()進入等待佇列,直到獲取到資源為止才返回。

3.3.1 doAcquireShared(int)

  此方法用於將當前執行緒加入等待佇列尾部休息,直到其他執行緒釋放資源喚醒自己,自己成功拿到相應量的資源後才返回。下面是doAcquireShared()的原始碼:

 1 private void doAcquireShared(int arg) {
 2     final Node node = addWaiter(Node.SHARED);//加入佇列尾部
 3     boolean failed = true;//是否成功標誌
 4     try {
 5         boolean interrupted = false;//等待過程中是否被中斷過的標誌
 6         for (;;) {
 7             final Node p = node.predecessor();//前驅
 8             if (p == head) {//如果到head的下一個,因為head是拿到資源的執行緒,此時node被喚醒,很可能是head用完資源來喚醒自己的
 9                 int r = tryAcquireShared(arg);//嘗試獲取資源
10                 if (r >= 0) {//成功
11                     setHeadAndPropagate(node, r);//將head指向自己,還有剩餘資源可以再喚醒之後的執行緒
12                     p.next = null; // help GC
13                     if (interrupted)//如果等待過程中被打斷過,此時將中斷補上。
14                         selfInterrupt();
15                     failed = false;
16                     return;
17                 }
18             }
19             
20             //判斷狀態,尋找安全點,進入waiting狀態,等著被unpark()或interrupt()
21             if (shouldParkAfterFailedAcquire(p, node) &&
22                 parkAndCheckInterrupt())
23                 interrupted = true;
24         }
25     } finally {
26         if (failed)
27             cancelAcquire(node);
28     }
29 }

  有木有覺得跟acquireQueued()很相似?對,其實流程並沒有太大區別。只不過這裡將補中斷的selfInterrupt()放到doAcquireShared()裡了,而獨佔模式是放到acquireQueued()之外,其實都一樣,不知道Doug Lea是怎麼想的。

  跟獨佔模式比,還有一點需要注意的是,這裡只有執行緒是head.next時(“老二”),才會去嘗試獲取資源,有剩餘的話還會喚醒之後的隊友。那麼問題就來了,假如老大用完後釋放了5個資源,而老二需要6個,老三需要1個,老四需要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會繼續park()等待其他執行緒釋放資源,也更不會去喚醒老三和老四了。獨佔模式,同一時刻只有一個執行緒去執行,這樣做未嘗不可;但共享模式下,多個執行緒是可以同時執行的,現在因為老二的資源需求量大,而把後面量小的老三和老四也都卡住了。當然,這並不是問題,只是AQS保證嚴格按照入隊順序喚醒罷了(保證公平,但降低了併發)。

3.3.1.1 setHeadAndPropagate(Node, int)

 1 private void setHeadAndPropagate(Node node, int propagate) {
 2     Node h = head; 
 3     setHead(node);//head指向自己
 4      //如果還有剩餘量,繼續喚醒下一個鄰居執行緒
 5     if (propagate > 0 || h == null || h.waitStatus < 0) {
 6         Node s = node.next;
 7         if (s == null || s.isShared())
 8             doReleaseShared();
 9     }
10 }

  此方法在setHead()的基礎上多了一步,就是自己甦醒的同時,如果條件符合(比如還有剩餘資源),還會去喚醒後繼結點,畢竟是共享模式!

  doReleaseShared()我們留著下一小節的releaseShared()裡來講。

3.3.2 小結

  OK,至此,acquireShared()也要告一段落了。讓我們再梳理一下它的流程:

    1. tryAcquireShared()嘗試獲取資源,成功則直接返回;
    2. 失敗則通過doAcquireShared()進入等待佇列park(),直到被unpark()/interrupt()併成功獲取到資源才返回。整個等待過程也是忽略中斷的。

  其實跟acquire()的流程大同小異,只不過多了個自己拿到資源後,還會去喚醒後繼隊友的操作(這才是共享嘛)

3.4 releaseShared()

  上一小節已經把acquireShared()說完了,這一小節就來講講它的反操作releaseShared()吧。此方法是共享模式下執行緒釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待執行緒,它會喚醒等待佇列裡的其他執行緒來獲取資源。下面是releaseShared()的原始碼:

1 public final boolean releaseShared(int arg) {
2     if (tryReleaseShared(arg)) {//嘗試釋放資源
3         doReleaseShared();//喚醒後繼結點
4         return true;
5     }
6     return false;
7 }

  此方法的流程也比較簡單,一句話:釋放掉資源後,喚醒後繼。跟獨佔模式下的release()相似,但有一點稍微需要注意:獨佔模式下的tryRelease()在完全釋放掉資源(state=0)後,才會返回true去喚醒其他執行緒,這主要是基於獨佔下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的執行緒併發執行,那麼擁有資源的執行緒在釋放掉部分資源時就可以喚醒後繼等待結點。例如,資源總量是13,A(5)和B(7)分別獲取到資源併發執行,C(4)來時只剩1個資源就需要等待。A在執行過程中釋放掉2個資源量,然後tryReleaseShared(2)返回true喚醒C,C一看只有3個仍不夠繼續等待;隨後B又釋放2個,tryReleaseShared(2)返回true喚醒C,C一看有5個夠自己用了,然後C就可以跟A和B一起執行。而ReentrantReadWriteLock讀鎖的tryReleaseShared()只有在完全釋放掉資源(state=0)才返回true,所以自定義同步器可以根據需要決定tryReleaseShared()的返回值。

3.4.1 doReleaseShared()

  此方法主要用於喚醒後繼。下面是它的原始碼:

 1 private void doReleaseShared() {
 2     for (;;) {
 3         Node h = head;
 4         if (h != null && h != tail) {
 5             int ws = h.waitStatus;
 6             if (ws == Node.SIGNAL) {
 7                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 8                     continue;
 9                 unparkSuccessor(h);//喚醒後繼
10             }
11             else if (ws == 0 &&
12                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
13                 continue;
14         }
15         if (h == head)// head發生變化
16             break;
17     }
18 }

3.5 小結

  本節我們詳解了獨佔和共享兩種模式下獲取-釋放資源(acquire-release、acquireShared-releaseShared)的原始碼,相信大家都有一定認識了。值得注意的是,acquire()和acquireSahred()兩種方法下,執行緒在等待佇列中都是忽略中斷的。AQS也支援響應中斷的,acquireInterruptibly()/acquireSharedInterruptibly()即是,這裡相應的原始碼跟acquire()和acquireSahred()差不多,這裡就不再詳解了。

四、簡單應用

  通過前邊幾個章節的學習,相信大家已經基本理解AQS的原理了。這裡再將“框架”一節中的一段話複製過來:

  不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:

  • isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。
  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。

  OK,下面我們就以AQS原始碼裡的Mutex為例,講一下AQS的簡單應用。

4.1 Mutex(互斥鎖)

  Mutex是一個不可重入的互斥鎖實現。鎖資源(AQS裡的state)只有兩種狀態:0表示未鎖定,1表示鎖定。下邊是Mutex的核心原始碼:

 1 class Mutex implements Lock, java.io.Serializable {
 2     // 自定義同步器
 3     private static class Sync extends AbstractQueuedSynchronizer {
 4         // 判斷是否鎖定狀態
 5         protected boolean isHeldExclusively() {
 6             return getState() == 1;
 7         }
 8 
 9         // 嘗試獲取資源,立即返回。成功則返回true,否則false。
10         public boolean tryAcquire(int acquires) {
11             assert acquires == 1; // 這裡限定只能為1個量
12             if (compareAndSetState(0, 1)) {//state為0才設定為1,不可重入!
13                 setExclusiveOwnerThread(Thread.currentThread());//設定為當前執行緒獨佔資源
14                 return true;
15             }
16             return false;
17         }
18 
19         // 嘗試釋放資源,立即返回。成功則為true,否則false。
20         protected boolean tryRelease(int releases) {
21             assert releases == 1; // 限定為1個量
22             if (getState() == 0)//既然來釋放,那肯定就是已佔有狀態了。只是為了保險,多層判斷!
23                 throw new IllegalMonitorStateException();
24             setExclusiveOwnerThread(null);
25             setState(0);//釋放資源,放棄佔有狀態
26             return true;
27         }
28     }
29 
30     // 真正同步類的實現都依賴繼承於AQS的自定義同步器!
31     private final Sync sync = new Sync();
32 
33     //lock<-->acquire。兩者語義一樣:獲取資源,即便等待,直到成功才返回。
34     public void lock() {
35         sync.acquire(1);
36     }
37 
38     //tryLock<-->tryAcquire。兩者語義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。
39     public boolean tryLock() {
40         return sync.tryAcquire(1);
41     }
42 
43     //unlock<-->release。兩者語文一樣:釋放資源。
44     public void unlock() {
45         sync.release(1);
46     }
47 
48     //鎖是否佔有狀態
49     public boolean isLocked() {
50         return sync.isHeldExclusively();
51     }
52 }

  同步類在實現時一般都將自定義同步器(sync)定義為內部類,供自己使用;而同步類自己(Mutex)則實現某個介面,對外服務。當然,介面的實現要直接依賴sync,它們在語義上也存在某種對應關係!!而sync只用實現資源state的獲取-釋放方式tryAcquire-tryRelelase,至於執行緒的排隊、等待、喚醒等,上層的AQS都已經實現好了,我們不用關心。

  除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實現方式都差不多,不同的地方就在獲取-釋放資源的方式tryAcquire-tryRelelase。掌握了這點,AQS的核心便被攻破了!

  OK,至此,整個AQS的講解也要落下帷幕了。希望本文能夠對學習Java併發程式設計的同學有所借鑑,中間寫的有不對的地方,也歡迎討論和指正~

相關推薦

Java併發AQS

一、概述   談到併發,不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronizer(AQS)!   類如其名,抽象的佇列式的同步器,AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的Reentrant

Java技術AQS

AQS是AbstractQueuedSynchronizer的簡稱。AQS提供了一種實現阻塞鎖和一系列依賴FIFO等待佇列的同步器的框架,如下圖所示。AQS為一系列同步器依賴於一個單獨的原子變數(state)的同步器提供了一個非常有用的基礎。子類們必須定義改變state變數的protecte

Java併發Semaphore

        一、入題        Semaphore是一種基於計數的訊號量。它可以設定一個閾值,基於此,多個執行緒競爭獲取許可訊號,做完自己的申請後歸還,超過閾值後,執行緒申請許可訊號將會被阻塞。Semaphore可以用來構建一些物件池,資源池之類的,比如資料庫連線池,

Java併發CompletionService

CompletionService是什麼? 它是JUC包中的一個介面類,預設實現類只有一個ExecutorCompletionService。   CompletionService幹什麼的? 它將非同步任務的生成和執行結果的處理進行了解耦,用來執行Callable的任務(實際也是通過Executo

《提升能力,漲薪可待》-Java併發AQS全面

  歡迎關注公眾號【Ccww筆記】,原創技術文章第一時間推出 一、AQS是什麼?有什麼用? AQS全稱AbstractQueuedSynchronizer,即抽象的佇列同步器,是一種用來構建鎖和同步器的框架。 基於AQS構建同步器: ReentrantLock Semaphore C

Java並發AQS

leg 同步器 回收 thread true 判斷 monit map 重入 一、概述   談到並發,不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronized(AQS)!   類如其名,抽象的隊列式

Java併發工具類

在JDK的併發包裡提供了幾個非常有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段,Exchanger工具類則提供了線上程間交換資料的一種手段。本章會配合一些應用場景來介紹如何使用這些工具類。 等待多執行緒完成的Cou

Java基礎 陣列

前言:Java內功心法之陣列詳解,看完這篇你向Java大神的路上又邁出了一步(有什麼問題或者需要資料可以聯絡我的扣扣:734999078)    陣列概念 同一種類型資料的集合。其實陣列就是一個容器。 陣列的好處 可以自動給陣列中的元素從0開始編號,方便操作這些元素。 格式1:

Java併發程式設計(wait(), notify(),sleep())

        上一篇部落格,重點講解了java中鎖的機制,省的在多執行緒之間出現混亂的局面,其實主要能夠理解鑰匙即可。如果要保證方法之間能夠獨立完全的執行,因此就必須所有的方法都共用一把鑰匙。然後小

Java併發AQS用法和原始碼分析

概念 AQS:佇列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構建鎖或者其他同步元件的基礎框架,許多同步器可以通過AQS很容易的並且高效的構建出來。不僅RenntrantLock和Semaphore是基於AQS構建的,還包括CountDownLat

Java併發程式設計--Volatile

摘要      Volatile是Java提供的一種弱同步機制,當一個變數被宣告成volatile型別後編譯器不會將該變數的操作與其他記憶體操作進行重排序。在某些場景下使用volatile代替鎖可以減少程式碼量和使程式碼更易閱讀。 Volatile特性   1.可見性:當一條執行緒對volatile

Java關鍵字final

在我們編寫Java程式時總會根據需求將變數、方法、類設定成static(靜態)或final(最終),熟練掌握final用法是必須的,現在我們就來詳細瞭解final關鍵字! 一、final概述 概念:由字面可以瞭解,final有最終態,無法改變的意思。 使用目的:為了阻止改變

java集合LinkedList

我們上一次說到List的ArrayList,我們這次去看下LinkedList---顧名思義是連結串列,連結串列的優點就不用說了吧,增刪效率比較高(具體的朋友們上網看吧),先來看下LinkedList的整體構架:  首先我們看到了LinkedList間接的實現了List

Java併發——Executor框架(Executor框架結構與框架成員)

一、什麼是Executor框架?我們知道執行緒池就是執行緒的集合,執行緒池集中管理執行緒,以實現執行緒的重用,降低資源消耗,提高響應速度等。執行緒用於執行非同步任務,單個的執行緒既是工作單元也是執行機制,從JDK1.5開始,為了把工作單元與執行機制分離開,Executor框架

Java併發:ThreadLocal

前言ThreadLocal的作用是提供執行緒內的區域性變數,這種變數在多執行緒環境下訪問時能夠保證各個執行緒裡變數的獨立性。ThreadLocal無論在專案開發還是面試中都會經常碰到,本文就ThreadLocal的使用、主要方法原始碼詳解、記憶體洩漏問題展開討論。1.基本使用

java 集合TreeSet

首先說明一下,之前看了一下文章提出TreeSet在新增第一個元素的時候是不比較大小的,這種說發是錯誤的,在第一次新增的時候比較的是第一個物件本省返回的引數是0,下面我們用程式驗證一下:首先由一個Student的內部類:裡面有兩個引數,年齡和名稱我們後期自定義排序也是用得到的。

Java 快取 Ehcache

 一:EhCache是一個純Java的程序內快取框架,具有如下特點:     1. 快速簡單,非常容易和應用整合。     2.支援多種快取策略 。     3. 快取資料有兩級:記憶體和磁碟,因此無需擔心容量問題 。     4. 快取資料會在虛擬機器重啟的過程中寫入

java初級3-第一個Java程式

1 編寫源程式 public class Helloword{public static void main(String[] args){System.out.println("Hello Word

Java 集合 Set 與原始碼分析

    Set集合與List一樣,都是繼承自Collection介面,常用的實現類有HashSet和TreeSet。值得注意的是,HashSet是通過HashMap來實現的而TreeSet是通過TreeMap來實現的,所以HashSet和TreeSet都沒有自己的資料結構,具

Java併發AQS原始碼分析(二)

我在Java併發之AQS原始碼分析(一)這篇文章中,從原始碼的角度深度剖析了 AQS 獨佔鎖模式下的獲取鎖與釋放鎖的邏輯,如果你把