1. 程式人生 > >原始碼閱讀:基於併發AQS的(獨佔鎖)重入鎖(ReetrantLock)及其Condition實現原理

原始碼閱讀:基於併發AQS的(獨佔鎖)重入鎖(ReetrantLock)及其Condition實現原理

Lock介面

前面我們詳談過解決多執行緒同步問題的關鍵字synchronized,synchronized屬於隱式鎖,即鎖的持有與釋放都是隱式的,我們無需干預,而本篇我們要講解的是顯式鎖,即鎖的持有和釋放都必須由我們手動編寫。在Java 1.5中,官方在concurrent併發包中加入了Lock介面,該介面中提供了lock()方法和unLock()方法對顯式加鎖和顯式釋放鎖操作進行支援,簡單瞭解一下程式碼編寫,如下:

Lock lock = new ReentrantLock();
lock.lock();
try{
    //臨界區......
}finally{
    lock.unlock();
}

正如程式碼所顯示(ReentrantLock是Lock的實現類,稍後分析),當前執行緒使用lock()方法與unlock()對臨界區進行包圍,其他執行緒由於無法持有鎖將無法進入臨界區直到當前執行緒釋放鎖,注意unlock()操作必須在finally程式碼塊中,這樣可以確保即使臨界區執行丟擲異常,執行緒最終也能正常釋放鎖,Lock介面還提供了鎖以下相關方法:

public interface Lock {
    //加鎖
    void lock();

    //解鎖
    void unlock();

    //可中斷獲取鎖,與lock()不同之處在於可響應中斷操作,即在獲
    //取鎖的過程中可中斷,注意synchronized在獲取鎖時是不可中斷的
    void lockInterruptibly() throws InterruptedException;

    //嘗試非阻塞獲取鎖,呼叫該方法後立即返回結果,如果能夠獲取則返回true,否則返回false
    boolean tryLock();

    //根據傳入的時間段獲取鎖,在指定時間內沒有獲取鎖則返回false,如果在指定時間內當前執行緒未被中並斷獲取到鎖則返回true
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    //獲取等待通知元件,該元件與當前鎖繫結,當前執行緒只有獲得了鎖
    //才能呼叫該元件的wait()方法,而呼叫後,當前執行緒將釋放鎖。
    Condition newCondition();

可見Lock物件鎖還提供了synchronized所不具備的其他同步特性,如可中斷鎖的獲取(synchronized在等待獲取鎖時是不可中的),超時中斷鎖的獲取,等待喚醒機制的多條件變數Condition等,這也使得Lock鎖在使用上具有更大的靈活性。下面進一步分析Lock的實現類重入鎖ReetrantLock。

重入鎖ReetrantLock

重入鎖ReetrantLock,JDK 1.5新增的類,實現了Lock介面,作用與synchronized關鍵字相當,但比synchronized更加靈活。ReetrantLock本身也是一種支援重進入的鎖,即該鎖可以支援一個執行緒對資源重複加鎖,同時也支援公平鎖與非公平鎖。所謂的公平與非公平指的是在請求先後順序上,先對鎖進行請求的就一定先獲取到鎖,那麼這就是公平鎖,反之,如果對於鎖的獲取並沒有時間上的先後順序,如後請求的執行緒可能先獲取到鎖,這就是非公平鎖,一般而言非,非公平鎖機制的效率往往會勝過公平鎖的機制,但在某些場景下,可能更注重時間先後順序,那麼公平鎖自然是很好的選擇。需要注意的是ReetrantLock支援對同一執行緒重加鎖,但是加鎖多少次,就必須解鎖多少次,這樣才可以成功釋放鎖。下面看看ReetrantLock的簡單使用案例:

import java.util.concurrent.locks.ReentrantLock;

public class ReenterLock implements Runnable{
    public static ReentrantLock lock=new ReentrantLock();
    public static int i=0;
    @Override
    public void run() {
        for(int j=0;j<10000000;j++){
            lock.lock();
            //支援重入鎖
            lock.lock();
            try{
                i++;
            }finally{
                //執行兩次解鎖
                lock.unlock();
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ReenterLock tl=new ReenterLock();
        Thread t1=new Thread(tl);
        Thread t2=new Thread(tl);
        t1.start();t2.start();
        t1.join();t2.join();
        //輸出結果:20000000
        System.out.println(i);
    }
}

程式碼非常簡單,我們使用兩個執行緒同時操作臨界資源i,執行自增操作,使用ReenterLock進行加鎖,解決執行緒安全問題,這裡進行了兩次重複加鎖,由於ReenterLock支援重入,因此這樣是沒有問題的,需要注意的是在finally程式碼塊中,需執行兩次解鎖操作才能真正成功地讓當前執行執行緒釋放鎖,從這裡看ReenterLock的用法還是非常簡單的,除了實現Lock介面的方法,ReenterLock其他方法說明如下

//查詢當前執行緒保持此鎖的次數。
int getHoldCount() 

//返回目前擁有此鎖的執行緒,如果此鎖不被任何執行緒擁有,則返回 null。      
protected  Thread   getOwner(); 

//返回一個 collection,它包含可能正等待獲取此鎖的執行緒,其內部維持一個佇列,這點稍後會分析。      
protected  Collection<Thread>   getQueuedThreads(); 

//返回正等待獲取此鎖的執行緒估計數。   
int getQueueLength();

// 返回一個 collection,它包含可能正在等待與此鎖相關給定條件的那些執行緒。
protected  Collection<Thread>   getWaitingThreads(Condition condition); 

//返回等待與此鎖相關的給定條件的執行緒估計數。       
int getWaitQueueLength(Condition condition);

// 查詢給定執行緒是否正在等待獲取此鎖。     
boolean hasQueuedThread(Thread thread); 

//查詢是否有些執行緒正在等待獲取此鎖。     
boolean hasQueuedThreads();

//查詢是否有些執行緒正在等待與此鎖有關的給定條件。     
boolean hasWaiters(Condition condition); 

//如果此鎖的公平設定為 true,則返回 true。     
boolean isFair() 

//查詢當前執行緒是否保持此鎖。      
boolean isHeldByCurrentThread() 

//查詢此鎖是否由任意執行緒保持。        
boolean isLocked()  

由於ReetrantLock鎖在使用上還是比較簡單的,也就暫且打住,下面著重分析一下ReetrantLock的內部實現原理,這才是本篇博文的重點。實際上ReetrantLock是基於AQS併發框架實現的,我們先深入瞭解AQS,然後一步步揭開ReetrantLock的內部實現原理。

併發基礎元件AQS與ReetrantLock

AQS工作原理概要

AbstractQueuedSynchronizer又稱為佇列同步器(後面簡稱AQS),它是用來構建鎖或其他同步元件的基礎框架,內部通過一個int型別的成員變數state來控制同步狀態,當state=0時,則說明沒有任何執行緒佔有共享資源的鎖,當state=1時,則說明有執行緒目前正在使用共享變數,其他執行緒必須加入同步佇列進行等待,AQS內部通過內部類Node構成FIFO的同步佇列來完成執行緒獲取鎖的排隊工作,同時利用內部類ConditionObject構建等待佇列,當Condition呼叫wait()方法後,執行緒將會加入等待佇列中,而當Condition呼叫signal()方法後,執行緒將從等待佇列轉移動同步佇列中進行鎖競爭。注意這裡涉及到兩種佇列,一種的同步佇列,當執行緒請求鎖而等待的後將加入同步佇列等待,而另一種則是等待佇列(可有多個),通過Condition呼叫await()方法釋放鎖後,將加入等待佇列。關於Condition的等待佇列我們後面再分析,這裡我們先來看看AQS中的同步佇列模型,如下:

/**
 * AQS抽象類
 */
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer{
//指向同步佇列隊頭
private transient volatile Node head;

//指向同步的隊尾
private transient volatile Node tail;

//同步狀態,0代表鎖未被佔用,1代表鎖已被佔用
private volatile int state;

//省略其他程式碼......
}

在這裡插入圖片描述 head和tail分別是AQS中的變數,其中head指向同步佇列的頭部,注意head為空結點,不儲存資訊。而tail則是同步佇列的隊尾,同步佇列採用的是雙向連結串列的結構這樣可方便佇列進行結點增刪操作。state變數則是代表同步狀態,執行當執行緒呼叫lock方法進行加鎖後,如果此時state的值為0,則說明當前執行緒可以獲取到鎖(在本篇文章中,鎖和同步狀態代表同一個意思),同時將state設定為1,表示獲取成功。如果state已為1,也就是當前鎖已被其他執行緒持有,那麼當前執行執行緒將被封裝為Node結點加入同步佇列等待。其中Node結點是對每一個訪問同步程式碼的執行緒的封裝,從圖中的Node的資料結構也可看出,其包含了需要同步的執行緒本身以及執行緒的狀態,如是否被阻塞,是否等待喚醒,是否已經被取消等。每個Node結點內部關聯其前繼結點prev和後繼結點next,這樣可以方便執行緒釋放鎖後快速喚醒下一個在等待的執行緒,Node是AQS的內部類,其資料結構如下:

static final class Node {
    //共享模式
    static final Node SHARED = new Node();
    //獨佔模式
    static final Node EXCLUSIVE = null;

    //標識執行緒已處於結束狀態
    static final int CANCELLED =  1;
    //等待被喚醒狀態
    static final int SIGNAL    = -1;
    //條件狀態,
    static final int CONDITION = -2;
    //在共享模式中使用表示獲得的同步狀態會被傳播
    static final int PROPAGATE = -3;

    //等待狀態,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4種
    volatile int waitStatus;

    //同步佇列中前驅結點
    volatile Node prev;

    //同步佇列中後繼結點
    volatile Node next;

    //請求鎖的執行緒
    volatile Thread thread;

    //等待佇列中的後繼結點,這個與Condition有關,稍後會分析
    Node nextWaiter;

    //判斷是否為共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //獲取前驅結點
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    //.....
}

其中SHARED和EXCLUSIVE常量分別代表共享模式和獨佔模式,所謂共享模式是一個鎖允許多條執行緒同時操作,如訊號量Semaphore採用的就是基於AQS的共享模式實現的,而獨佔模式則是同一個時間段只能有一個執行緒對共享資源進行操作,多餘的請求執行緒需要排隊等待,如ReentranLock。變數waitStatus則表示當前被封裝成Node結點的等待狀態,共有4種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

  • CANCELLED:值為1,在同步佇列中等待的執行緒等待超時或被中斷,需要從同步佇列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。

  • SIGNAL:值為-1,被標識為該等待喚醒狀態的後繼結點,當其前繼結點的執行緒釋放了同步鎖或被取消,將會通知該後繼結點的執行緒執行。說白了,就是處於喚醒狀態,只要前繼結點釋放鎖,就會通知標識為SIGNAL狀態的後繼結點的執行緒執行。

  • CONDITION:值為-2,與Condition相關,該標識的結點處於等待佇列中,結點的執行緒等待在Condition上,當其他執行緒呼叫了Condition的signal()方法後,CONDITION狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖。

  • PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀態標識結點的執行緒處於可執行狀態。

  • 0狀態:值為0,代表初始化狀態。

pre和next,分別指向當前Node結點的前驅結點和後繼結點,thread變數儲存的請求鎖的執行緒。nextWaiter,與Condition相關,代表等待佇列中的後繼結點,關於這點這裡暫不深入,後續會有更詳細的分析,嗯,到此我們對Node結點的資料結構也就比較清晰了。總之呢,AQS作為基礎元件,對於鎖的實現存在兩種不同的模式,即共享模式(如Semaphore)和獨佔模式(如ReetrantLock),無論是共享模式還是獨佔模式的實現類,其內部都是基於AQS實現的,也都維持著一個虛擬的同步佇列,當請求鎖的執行緒超過現有模式的限制時,會將執行緒包裝成Node結點並將執行緒當前必要的資訊儲存到node結點中,然後加入同步佇列等會獲取鎖,而這系列操作都有AQS協助我們完成,這也是作為基礎元件的原因,無論是Semaphore還是ReetrantLock,其內部絕大多數方法都是間接呼叫AQS完成的,下面是AQS整體類圖結構 在這裡插入圖片描述 這裡以ReentrantLock為例,簡單講解ReentrantLock與AQS的關係 在這裡插入圖片描述

  • AbstractOwnableSynchronizer:抽象類,定義了儲存獨佔當前鎖的執行緒和獲取的方法

  • AbstractQueuedSynchronizer:抽象類,AQS框架核心類,其內部以虛擬佇列的方式管理執行緒的鎖獲取與鎖釋放,其中獲取鎖(tryAcquire方法)和釋放鎖(tryRelease方法)並沒有提供預設實現,需要子類重寫這兩個方法實現具體邏輯,目的是使開發人員可以自由定義獲取鎖以及釋放鎖的方式。

  • Node:AbstractQueuedSynchronizer 的內部類,用於構建虛擬佇列(連結串列雙向連結串列),管理需要獲取鎖的執行緒。

  • Sync:抽象類,是ReentrantLock的內部類,繼承自AbstractQueuedSynchronizer,實現了釋放鎖的操作(tryRelease()方法),並提供了lock抽象方法,由其子類實現。

  • NonfairSync:是ReentrantLock的內部類,繼承自Sync,非公平鎖的實現類。

  • FairSync:是ReentrantLock的內部類,繼承自Sync,公平鎖的實現類。

  • ReentrantLock:實現了Lock介面的,其內部類有Sync、NonfairSync、FairSync,在建立時可以根據fair引數決定建立NonfairSync(預設非公平鎖)還是FairSync。

ReentrantLock內部存在3個實現類,分別是Sync、NonfairSync、FairSync,其中Sync繼承自AQS實現瞭解鎖tryRelease()方法,而NonfairSync(非公平鎖)、 FairSync(公平鎖)則繼承自Sync,實現了獲取鎖的tryAcquire()方法,ReentrantLock的所有方法呼叫都通過間接呼叫AQS和Sync類及其子類來完成的。從上述類圖可以看出AQS是一個抽象類,但請注意其原始碼中並沒一個抽象的方法,這是因為AQS只是作為一個基礎元件,並不希望直接作為直接操作類對外輸出,而更傾向於作為基礎元件,為真正的實現類提供基礎設施,如構建同步佇列,控制同步狀態等,事實上,從設計模式角度來看,AQS採用的模板模式的方式構建的,其內部除了提供併發操作核心方法以及同步佇列操作外,還提供了一些模板方法讓子類自己實現,如加鎖操作以及解鎖操作,為什麼這麼做?這是因為AQS作為基礎元件,封裝的是核心併發操作,但是實現上分為兩種模式,即共享模式與獨佔模式,而這兩種模式的加鎖與解鎖實現方式是不一樣的,但AQS只關注內部公共方法實現並不關心外部不同模式的實現,所以提供了模板方法給子類使用,也就是說實現獨佔鎖,如ReentrantLock需要自己實現tryAcquire()方法和tryRelease()方法,而實現共享模式的Semaphore,則需要實現tryAcquireShared()方法和tryReleaseShared()方法,這樣做的好處是顯而易見的,無論是共享模式還是獨佔模式,其基礎的實現都是同一套元件(AQS),只不過是加鎖解鎖的邏輯不同罷了,更重要的是如果我們需要自定義鎖的話,也變得非常簡單,只需要選擇不同的模式實現不同的加鎖和解鎖的模板方法即可,AQS提供給獨佔模式和共享模式的模板方法如下

//AQS中提供的主要模板方法,由子類實現。
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer{

    //獨佔模式下獲取鎖的方法
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    //獨佔模式下解鎖的方法
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    //共享模式下獲取鎖的方法
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    //共享模式下解鎖的方法
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    //判斷是否為持有獨佔鎖
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

}

在瞭解AQS的原理概要後,下面我們就基於ReetrantLock進一步分析AQS的實現過程,這也是ReetrantLock的內部實現原理。

基於ReetrantLock分析AQS獨佔模式實現過程

ReetrantLock中非公平鎖

AQS同步器的實現依賴於內部的同步佇列(FIFO的雙向連結串列對列)完成對同步狀態(state)的管理,當前執行緒獲取鎖(同步狀態)失敗時,AQS會將該執行緒以及相關等待資訊包裝成一個節點(Node)並將其加入同步佇列,同時會阻塞當前執行緒,當同步狀態釋放時,會將頭結點head中的執行緒喚醒,讓其嘗試獲取同步狀態。關於同步佇列和Node結點,前面我們已進行了較為詳細的分析,這裡重點分析一下獲取同步狀態和釋放同步狀態以及如何加入佇列的具體操作,這裡從ReetrantLock入手分析AQS的具體實現,我們先以非公平鎖為例進行分析。

//預設構造,建立非公平鎖NonfairSync
public ReentrantLock() {
    sync = new NonfairSync();
}
//根據傳入引數建立鎖型別
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

//加鎖操作
public void lock() {
     sync.lock();
}

前面說過sync是個抽象類,存在兩個不同的實現子類,這裡從非公平鎖入手,看看其實現

/**
 * 非公平鎖實現
 */
static final class NonfairSync extends Sync {
    //加鎖
    final void lock() {
        //執行CAS操作,獲取同步狀態
        if (compareAndSetState(0, 1))
       //成功則將獨佔鎖執行緒設定為當前執行緒  
          setExclusiveOwnerThread(Thread.currentThread());
        else
            //否則再次請求同步狀態
            acquire(1);
    }
}

這裡獲取鎖時,首先對同步狀態執行CAS操作,嘗試把state的狀態從0設定為1,如果返回true則代表獲取同步狀態成功,也就是當前執行緒獲取鎖成,可操作臨界資源,如果返回false,則表示已有執行緒持有該同步狀態(其值為1),獲取鎖失敗,注意這裡存在併發的情景,也就是可能同時存在多個執行緒設定state變數,因此是CAS操作保證了state變數操作的原子性。返回false後,執行 acquire(1)方法,該方法是AQS中的方法,它對中斷不敏感,即使執行緒獲取同步狀態失敗,進入同步佇列,後續對該執行緒執行中斷操作也不會從同步佇列中移出,方法如下

public final void acquire(int arg) {
    //再次嘗試獲取同步狀態
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這裡傳入引數arg表示要獲取同步狀態後設置的值(即要設定state的值),因為要獲取鎖,而status為0時是釋放鎖,1則是獲取鎖,所以這裡一般傳遞引數為1,進入方法後首先會執行tryAcquire(arg)方法,在前面分析過該方法在AQS中並沒有具體實現,而是交由子類實現,因此該方法是由ReetrantLock類內部實現的

//NonfairSync類
static final class NonfairSync extends Sync {

    protected final boolean tryAcquire(int acquires) {
         return nonfairTryAcquire(acquires);
     }
 }

//Sync類
abstract static class Sync extends AbstractQueuedSynchronizer {

  //nonfairTryAcquire方法
  final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      //判斷同步狀態是否為0,並嘗試再次獲取同步狀態
      if (c == 0) {
          //執行CAS操作
          if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
          }
      }
      //如果當前執行緒已獲取鎖,屬於重入鎖,再次獲取鎖後將status值加1
      else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          //設定當前同步狀態,當前只有一個執行緒持有鎖,因為不會發生執行緒安全問題,可以直接執行 setState(nextc);
          setState(nextc);
          return true;
      }
      return false;
  }
  //省略其他程式碼
}

從程式碼執行流程可以看出,這裡做了兩件事,一是嘗試再次獲取同步狀態,如果獲取成功則將當前執行緒設定為OwnerThread,否則失敗,二是判斷當前執行緒current是否為OwnerThread,如果是則屬於重入鎖,state自增1,並獲取鎖成功,返回true,反之失敗,返回false,也就是tryAcquire(arg)執行失敗,返回false。需要注意的是nonfairTryAcquire(int acquires)內部使用的是CAS原子性操作設定state值,可以保證state的更改是執行緒安全的,因此只要任意一個執行緒呼叫nonfairTryAcquire(int acquires)方法並設定成功即可獲取鎖,不管該執行緒是新到來的還是已在同步佇列的執行緒,畢竟這是非公平鎖,並不保證同步佇列中的執行緒一定比新到來執行緒請求(可能是head結點剛釋放同步狀態然後新到來的執行緒恰好獲取到同步狀態)先獲取到鎖,這點跟後面還會講到的公平鎖不同。ok~,接著看之前的方法acquire(int arg)

public final void acquire(int arg) {
    //再次嘗試獲取同步狀態
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

如果tryAcquire(arg)返回true,acquireQueued自然不會執行,這是最理想的,因為畢竟當前執行緒已獲取到鎖,如果tryAcquire(arg)返回false,則會執行addWaiter(Node.EXCLUSIVE)進行入隊操作,由於ReentrantLock屬於獨佔鎖,因此結點型別為Node.EXCLUSIVE,下面看看addWaiter方法具體實現

private Node addWaiter(Node mode) {
    //將請求同步狀態失敗的執行緒封裝成結點
    Node node = new Node(Thread.currentThread(), mode);

    Node pred = tail;
    //如果是第一個結點加入肯定為空,跳過。
    //如果非第一個結點則直接執行CAS入隊操作,嘗試在尾部快速新增
    if (pred != null) {
        node.prev = pred;
        //使用CAS執行尾部結點替換,嘗試在尾部快速新增
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果第一次加入或者CAS操作沒有成功執行enq入隊操作
    enq(node);
    return node;
}

建立了一個Node.EXCLUSIVE型別Node結點用於封裝執行緒及其相關資訊,其中tail是AQS的成員變數,指向隊尾(這點前面的我們分析過AQS維持的是一個雙向的連結串列結構同步佇列),如果是第一個結點,則為tail肯定為空,那麼將執行enq(node)操作,如果非第一個結點即tail指向不為null,直接嘗試執行CAS操作加入隊尾,如果CAS操作失敗還是會執行enq(node),繼續看enq(node)

private Node enq(final Node node) {
    //死迴圈
    for (;;) {
         Node t = tail;
         //如果佇列為null,即沒有頭結點
         if (t == null) { // Must initialize
             //建立並使用CAS設定頭結點
             if (compareAndSetHead(new Node()))
                 tail = head;
         } else {//隊尾新增新結點
             node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
             }
         }
     }
}

這個方法使用一個死迴圈進行CAS操作,可以解決多執行緒併發問題。這裡做了兩件事,一是如果還沒有初始同步佇列則建立新結點並使用compareAndSetHead設定頭結點,tail也指向head,二是佇列已存在,則將新結點node新增到隊尾。注意這兩個步驟都存在同一時間多個執行緒操作的可能,如果有一個執行緒修改head和tail成功,那麼其他執行緒將繼續迴圈,直到修改成功,這裡使用CAS原子操作進行頭結點設定和尾結點tail替換可以保證執行緒安全,從這裡也可以看出head結點本身不存在任何資料,它只是作為一個牽頭結點,而tail永遠指向尾部結點(前提是佇列不為null)。 在這裡插入圖片描述 新增到同步佇列後,結點就會進入一個自旋過程,即每個結點都在觀察時機待條件滿足獲取同步狀態,然後從同步佇列退出並結束自旋,回到之前的acquire()方法,自旋過程是在acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法中執行的,程式碼如下

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋,死迴圈
        for (;;) {
            //獲取前驅結點
            final Node p = node.predecessor();
            當且僅當p為頭結點才嘗試獲取同步狀態
            if (p == head && tryAcquire(arg)) {
                //將node設定為頭結點
                setHead(node);
                //清空原來頭結點的引用便於GC
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果前驅結點不是head,判斷是否掛起執行緒
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            //最終都沒能獲取同步狀態,結束該執行緒的請求
            cancelAcquire(node);
    }
}

當前執行緒在自旋(死迴圈)中獲取同步狀態,當且僅當前驅結點為頭結點才嘗試獲取同步狀態,這符合FIFO的規則,即先進先出,其次head是當前獲取同步狀態的執行緒結點,只有當head釋放同步狀態喚醒後繼結點,後繼結點才有可能獲取到同步狀態,因此後繼結點在其前繼結點為head時,才進行嘗試獲取同步狀態,其他時刻將被掛起。進入if語句後呼叫setHead(node)方法,將當前執行緒結點設定為head

//設定為頭結點
private void setHead(Node node) {
        head = node;
        //清空結點資料
        node.thread = null;
        node.prev = null;
}

設定為node結點被設定為head後,其thread資訊和前驅結點將被清空,因為該執行緒已獲取到同步狀態(鎖),正在執行了,也就沒有必要儲存相關資訊了,head只有儲存指向後繼結點的指標即可,便於head結點釋放同步狀態後喚醒後繼結點,執行結果如下圖 在這裡插入圖片描述 從圖可知更新head結點的指向,將後繼結點的執行緒喚醒並獲取同步狀態,呼叫setHead(node)將其替換為head結點,清除相關無用資料。當然如果前驅結點不是head,那麼執行如下

//如果前驅結點不是head,判斷是否掛起執行緒
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())

      interrupted = true;
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //獲取當前結點的等待狀態
        int ws = pred.waitStatus;
        //如果為等待喚醒(SIGNAL)狀態則返回true
        if (ws == Node.SIGNAL)
            return true;
        //如果ws>0 則說明是結束狀態,
        //遍歷前驅結點直到找到沒有結束狀態的結點
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果ws小於0又不是SIGNAL狀態,
            //則將其設定為SIGNAL狀態,代表該結點的執行緒正在等待喚醒。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

private final boolean parkAndCheckInterrupt() {
        //將當前執行緒掛起
        LockSupport.park(this);
        //獲取執行緒中斷狀態,interrupted()是判斷當前中斷狀態,
        //並非中斷執行緒,因此可能true也可能false,並返回
        return Thread.interrupted();
}

shouldParkAfterFailedAcquire()方法的作用是判斷當前結點的前驅結點是否為SIGNAL狀態(即等待喚醒狀態),如果是則返回true。如果結點的wsCANCELLED狀態(值為1>0),即結束狀態,則說明該前驅結點已沒有用應該從同步佇列移除,執行while迴圈,直到尋找到非CANCELLED狀態的結點。倘若前驅結點的ws值不為CANCELLED,也不為SIGNAL(當從Condition的條件等待佇列轉移到同步佇列時,結點狀態為CONDITION因此需要轉換為SIGNAL),那麼將其轉換為SIGNAL狀態,等待被喚醒。 若shouldParkAfterFailedAcquire()方法返回true,即前驅結點為SIGNAL狀態同時又不是head結點,那麼使用parkAndCheckInterrupt()方法掛起當前執行緒,稱為WAITING狀態,需要等待一個unpark()操作來喚醒它,到此ReetrantLock內部間接通過AQSFIFO的同步佇列就完成了lock()操作,這裡我們總結成邏輯流程圖 在這裡插入圖片描述 關於獲取鎖的操作,這裡看看另外一種可中斷的獲取方式,即呼叫ReentrantLock類的lockInterruptibly()或者tryLock()方法,最終它們都間接呼叫到doAcquireInterruptibly()

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //直接拋異常,中斷執行緒的同步狀態請求
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

最大的不同是

if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
     //直接拋異常,中斷執行緒的同步狀態請求
       throw new InterruptedException();

檢測到執行緒的中斷操作後,直接丟擲異常,從而中斷執行緒的同步狀態請求,移除同步佇列,ok~,加鎖流程到此。下面接著看unlock()操作

//ReentrantLock類的unlock
public void unlock() {
    sync.release(1);
}

//AQS類的release()方法
public final boolean release(int arg) {
    //嘗試釋放鎖
    if (tryRelease(arg)) {

        Node h = head;
        if (h != null && h.waitStatus != 0)
            //喚醒後繼結點的執行緒
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//ReentrantLock類中的內部類Sync實現的tryRelease(int releases) 
protected final boolean tryRelease(int releases) {

      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
          throw new IllegalMonitorStateException();
      boolean free = false;
      //判斷狀態是否為0,如果是則說明已釋放同步狀態
      if (c == 0) {
          free = true;
          //設定Owner為null
          setExclusiveOwnerThread(null);
      }
      //設定更新同步狀態
      setState(c);
      return free;
  }

釋放同步狀態的操作相對簡單些,tryRelease(int releases)方法是ReentrantLock類中內部類自己實現的,因為AQS對於釋放鎖並沒有提供具體實現,必須由子類自己實現。釋放同步狀態後會使用unparkSuccessor(h)喚醒後繼結點的執行緒,這裡看看unparkSuccessor(h)

private void unparkSuccessor(Node node) {
    //這裡,node一般為當前執行緒所在的結點。
    int ws = node.waitStatus;
    if (ws < 0)//置零當前執行緒所在的結點狀態,允許失敗。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一個需要喚醒的結點s
    if (s == null || s.waitStatus > 0) {//如果為空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//喚醒
}

從程式碼執行操作來看,這裡主要作用是用unpark()喚醒同步佇列中最前邊未放棄執行緒(也就是狀態為CANCELLED的執行緒結點s)。此時,回憶前面分析進入自旋的函式acquireQueued()s結點的執行緒被喚醒後,會進入acquireQueued()函式的if (p == head && tryAcquire(arg))的判斷,如果p!=head也不會有影響,因為它會執行shouldParkAfterFailedAcquire(),由於s通過unparkSuccessor()操作後已是同步佇列中最前邊未放棄的執行緒結點,那麼通過shouldParkAfterFailedAcquire()內部對結點狀態的調整,s也必然會成為headnext結點,因此再次自旋時p==head就成立了,然後s把自己設定成head結點,表示自己已經獲取到資源了,最終acquire()也返回了,這就是獨佔鎖釋放的過程。 ok~,關於獨佔模式的加鎖和釋放鎖的過程到這就分析完,總之呢,在AQS同步器中維護著一個同步佇列,當執行緒獲取同步狀態失敗後,將會被封裝成Node結點,加入到同步佇列中並進行自旋操作,噹噹前執行緒結點的前驅結點為head時,將嘗試獲取同步狀態,獲取成功將自己設定為head結點。在釋放同步狀態時,則通過呼叫子類(ReetrantLock中的Sync內部類)的tryRelease(int releases)方法釋放同步狀態,釋放成功則喚醒後繼結點的執行緒。

ReetrantLock中公平鎖

瞭解完ReetrantLock中非公平鎖的實現後,我們再來看看公平鎖。與非公平鎖不同的是,在獲取鎖的時,公平鎖的獲取順序是完全遵循時間上的FIFO規則,也就是說先請求的執行緒一定會先獲取鎖,後來的執行緒肯定需要排隊,這點與前面我們分析非公平鎖的nonfairTryAcquire(int acquires)方法實現有鎖不同,下面是公平鎖中tryAcquire()方法的實現

//公平鎖FairSync類中的實現
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            //注意!!這裡先判斷同步佇列是否存在結點
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

該方法與nonfairTryAcquire(int acquires)方法唯一的不同是在使用CAS設定嘗試設定state值前,呼叫了hasQueuedPredecessors()判斷同步佇列是否存在結點,如果存在必須先執行完同步佇列中結點的執行緒,當前執行緒進入等待狀態。這就是非公平鎖與公平鎖最大的區別,即公平鎖線上程請求到來時先會判斷同步佇列是否存在結點,如果存在先執行同步佇列中的結點執行緒,當前執行緒將封裝成node加入同步佇列等待。而非公平鎖呢,當執行緒請求到來時,不管同步佇列是否存線上程結點,直接嘗試獲取同步狀態,獲取成功直接訪問共享資源,但請注意在絕大多數情況下,非公平鎖才是我們理想的選擇,畢竟從效率上來說非公平鎖總是勝於公平鎖。

以上便是ReentrantLock的內部實現原理,這裡我們簡單進行小結,重入鎖ReentrantLock,是一個基於AQS併發框架的併發控制類,其內部實現了3個類,分別是SyncNoFairSync以及FairSync類,其中Sync繼承自AQS,實現了釋放鎖的模板方法tryRelease(int),而NoFairSyncFairSync都繼承自Sync,實現各種獲取鎖的方法tryAcquire(int)ReentrantLock的所有方法實現幾乎都間接呼叫了這3個類,因此當我們在使用ReentrantLock時,大部分使用都是在間接呼叫AQS同步器中的方法,這就是ReentrantLock的內部實現原理,最後給出張類圖結構

在這裡插入圖片描述

關於synchronized 與ReentrantLock

在JDK 1.6之後,虛擬機器對於synchronized關鍵字進行整體優化後,在效能上synchronized與ReentrantLock已沒有明顯差距,因此在使用選擇上,需要根據場景而定,大部分情況下我們依然建議是synchronized關鍵字,原因之一是使用方便語義清晰,二是效能上虛擬機器已為我們自動優化。而ReentrantLock提供了多樣化的同步特性,如超時獲取鎖、可以被中斷獲取鎖(synchronized的同步是不能中斷的)、等待喚醒機制的多個條件變數(Condition)等,因此當我們確實需要使用到這些功能是,可以選擇ReentrantLock

神奇的Condition

關於Condition介面

在併發程式設計中,每個Java物件都存在一組監視器方法,如wait()notify()以及notifyAll()方法,通過這些方法,我們可以實現執行緒間通訊與協作(也稱為等待喚醒機制),如生產者-消費者模式,而且這些方法必須配合著synchronized關鍵字使用,與synchronized的等待喚醒機制相比Condition具有更多的靈活性以及精確性,這是因為notify()在喚醒執行緒時是隨機(同一個鎖),而Condition則可通過多個Condition例項物件建立更加精細的執行緒控制,也就帶來了更多靈活性了,我們可以簡單理解為以下兩點

  • 通過Condition能夠精細的控制多執行緒的休眠與喚醒。

  • 對於一個鎖,我們可以為多個執行緒間建立不同的Condition

Condition是一個介面類,其主要方法如下:

public interface Condition {

 /**
  * 使當前執行緒進入等待狀態直到被通知(signal)或中斷
  * 當其他執行緒呼叫singal()或singalAll()方法時,該執行緒將被喚醒
  * 當其他執行緒呼叫interrupt()方法中斷當前執行緒
  * await()相當於synchronized等待喚醒機制中的wait()方法
  */
 void await() throws InterruptedException;

 //當前執行緒進入等待狀態,直到被喚醒,該方法不響應中斷要求
 void awaitUninterruptibly();

 //呼叫該方法,當前執行緒進入等待狀態,直到被喚醒或被中斷或超時
 //其中nanosTimeout指的等待超時時間,單位納秒
 long awaitNanos(long nanosTimeout) throws InterruptedException;

  //同awaitNanos,但可以指明時間單位
  boolean await(long time, TimeUnit unit) throws InterruptedException;

 //呼叫該方法當前執行緒進入等待狀態,直到被喚醒、中斷或到達某個時
 //間期限(deadline),如果沒到指定時間就被喚醒,返回true,其他情況返回false
  boolean awaitUntil(Date deadline) throws InterruptedException;

 //喚醒一個等待在Condition上的執行緒,該執行緒從等待方法返回前必須
 //獲取與Condition相關聯的鎖,功能與notify()相同
  void signal();

 //喚醒所有等待在Condition上的執行緒,該執行緒從等待方法返回前必須
 //獲取與Condition相關聯的鎖,功能與notifyAll()相同
  void signalAll();
}

關於Condition的實現類是AQS的內部類ConditionObject,關於這點我們稍後分析,這裡先來看一個Condition的使用案例,即經典消費者生產者模式

Condition的使用案例-生產者消費者模式

這裡我們通過一個賣烤鴨的案例來演示多生產多消費者的案例,該場景中存在兩條生產執行緒t1和t2,用於生產烤鴨,也存在兩條消費執行緒t3,t4用於消費烤鴨,4條執行緒同時執行,需要保證只有在生產執行緒產生烤鴨後,消費執行緒才能消費,否則只能等待,直到生產執行緒產生烤鴨後喚醒消費執行緒,注意烤鴨不能重複消費。ResourceByCondition類中定義product()consume()兩個方法,分別用於生產烤鴨和消費烤鴨,並且定義ReentrantLock鎖,用於控制product()consume()的併發,由於必須在烤鴨生成完成後消費執行緒才能消費烤鴨,否則只能等待,因此這裡定義兩組Condition物件,分別是producer_conconsumer_con,前者擁有控制生產執行緒,後者擁有控制消費執行緒,這裡我們使用一個標誌flag來控制是否有烤鴨,當flag為true時,代表烤鴨生成完畢,生產執行緒必須進入等待狀態同時喚醒消費執行緒進行消費,消費執行緒消費完畢後將flag設定為false,代表烤鴨消費完成,進入等待狀態,同時喚醒生產執行緒生產烤鴨,具體程式碼如下

package com.zejian.concurrencys;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by zejian on 2017/7/22.
 * Blog : http://blog.csdn.net/javazejian [原文地址,請尊重原創]
 */
public class ResourceByCondition {
    private String name;
    private int count = 1;
    private boolean flag = false;

    //建立一個鎖物件。
    Lock lock = new ReentrantLock();

    //通過已有的鎖獲取兩組監視器,一組監視生產者,一組監視消費者。
    Condition producer_con = lock.newCondition();
    Condition consumer_con = lock.newCondition();

    /**
     * 生產
     * @param name
     */
    public  void product(String name)
    {
        lock.lock();
        try
        {
            while(flag){
                try{producer_con.await();}catch(InterruptedException e){}
            }
            this.name = name + count;
            count++;
            System.out.println(Thread.currentThread().getName()+"...生產者5.0..."+this.name);
            flag = true;
            consumer_con.signal();//直接喚醒消費執行緒
        }
        finally
        {
            lock.unlock();
        }
    }

    /**
     * 消費
     */
    public  void consume()
    {
        lock.lock();
        try
        {
            while(!flag){
                try{consumer_con.await();}catch(InterruptedException e){}
            }
            System.out.println(Thread.currentThread().getName()+"...消費者.5.0......."+this.name);//消費烤鴨1
            flag = false;
            producer_con.signal();//直接喚醒生產執行緒
        }
        finally
        {
            lock.unlock();
        }
    }
}

執行程式碼

package com.zejian.concurrencys;
/**
 * Created by zejian on 2017/7/22.
 * Blog : http://blog.csdn.net/javazejian [原文地址,請尊重原創]
 */
public class Mutil_Producer_ConsumerByCondition {

    public static void main(String[] args) {
        ResourceByCondition r = new ResourceByCondition();
        Mutil_Producer pro = new Mutil_Producer(r);
        Mutil_Consumer con = new Mutil_Consumer(r);
        //生產者執行緒
        Thread t0 = new Thread(pro);
        Thread t1 = new Thread(pro);
        //消費者執行緒
        Thread t2 = new Thread(con);
        Thread t3 = new Thread(con);
        //啟動執行緒
        t0.start();
        t1.start();
        t2.start();
        t3.start();
    }
}

/**
 * @decrition 生產者執行緒
 */
class Mutil_Producer implements Runnable {
    private ResourceByCondition r;

    Mutil_Producer(ResourceByCondition r) {
        this.r = r;
    }

    public void run() {
        while (true) {
            r.product("北京烤鴨");
        }
    }
}

/**
 * @decrition 消費者執行緒
 */
class Mutil_Consumer implements Runnable {
    private ResourceByCondition r;

    Mutil_Consumer(ResourceByCondition r) {
        this.r = r;
    }

    public void run() {
        while (true) {
            r.consume();
        }
    }
}

正如程式碼所示,我們通過兩者Condition物件單獨控制消費執行緒與生產消費,這樣可以避免消費執行緒在喚醒執行緒時喚醒的還是消費執行緒,如果是通過synchronized的等待喚醒機制實現的話,就可能無法避免這種情況,畢竟同一個鎖,對於synchronized關鍵字來說只能有一組等待喚醒佇列,而不能像Condition一樣,同一個鎖擁有多個等待佇列。synchronized的實現方案如下:

public class KaoYaResource {

    private String name;
    private int count = 1;//烤鴨的初始數量
    private boolean flag = false;//判斷是否有需要執行緒等待的標誌
    /**
     * 生產烤鴨
     */
    public synchronized void product(String name){
        while(flag){
            //此時有烤鴨,等待
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.name=name+count;//設定烤鴨的名稱
        count++;
        System.out.println(Thread.currentThread().getName()+"...生產者..."+this.name);
        flag=true;//有烤鴨後改變標誌
        notifyAll();//通知消費執行緒可以消費了
    }

    /**
     * 消費烤鴨
     */
    public synchronized void consume(){
        while(!flag){//如果沒有烤鴨就等待
            try{this.wait();}catch(InterruptedException e){}
        }
        System.out.println(Thread.currentThread().getName()+"...消費者........"+this.name);//消費烤鴨1
        flag = false;
        notifyAll();//通知生產者生產烤鴨
    }
}

如上程式碼,在呼叫notify()或者 notifyAll()方法時,由於等待佇列中同時存在生產者執行緒和消費者執行緒,所以我們並不能保證被喚醒的到底是消費者執行緒還是生產者執行緒,而Codition則可以避免這種情況。嗯,瞭解完Condition的使用方式後,下面我們將進一步探討Condition背後的實現機制

Condition的實現原理

Condition的具體實現類是AQS的內部類ConditionObject,前面我們分析過AQS中存在兩種佇列,一種是同步佇列,一種是等待佇列,而等待佇列就相對於Condition而言的。注意在使用Condition前必須獲得鎖,同時在Condition的等待佇列上的結點與前面同步佇列的結點是同一個類即Node,其結點的waitStatus的值為CONDITION。在實現類ConditionObject中有兩個結點分別是firstWaiterlastWaiterfirstWaiter代表等待佇列第一個等待結點,lastWaiter代表等待佇列最後一個等待結點,如下

 public class ConditionObject implements Condition, java.io.Serializable {
    //等待佇列第一個等待結點
    private transient Node firstWaiter;
    //等待佇列最後一個等待結點
    private transient Node lastWaiter;
    //省略其他程式碼.......
}

每個Condition都對應著一個等待佇列,也就是說如果一個鎖上建立了多個Condition物件,那麼也就存在多個等待佇列。等待佇列是一個FIFO的佇列,在佇列中每一個節點都包含了一個執行緒的引用,而該執行緒就是Condition物件上等待的執行緒。當一個執行緒呼叫了await()相關的方法,那麼該執行緒將會釋放鎖,並構建一個Node節點封裝當前執行緒的相關資訊加入到等待佇列中進行等待,直到被喚醒、中斷、超時才從佇列中移出。Condition中的等待佇列模型如下

在這裡插入圖片描述 正如圖所示,Node節點的資料結構,在等待佇列中使用的變數與同步佇列是不同的,Condtion中等待佇列的結點只有直接指向的後繼結點並沒有指明前驅結點,而且使用的變數是nextWaiter而不是next,這點我們在前面分析結點Node的資料結構時講過。firstWaiter指向等待佇列的頭結點,lastWaiter指向等待佇列的尾結點,等待佇列中結點的狀態只有兩種即CANCELLEDCONDITION,前者表示執行緒已結束需要從等待佇列中移除,後者表示條件結點等待被喚醒。再次強調每個Codition物件對於一個等待佇列,也就是說AQS中只能存在一個同步佇列,但可擁有多個等待佇列。下面從程式碼層面看看被呼叫await()方法(其他await()實現原理類似)的執行緒是如何加入等待佇列的,而又是如何從等待佇列中被喚醒的

public final void await() throws InterruptedException {
      //判斷執行緒是否被中斷
      if (Thread.interrupted())
          throw new InterruptedException();
      //建立新結點加入等待佇列並返回
      Node node = addConditionWaiter();
      //釋放當前執行緒鎖即釋放同步狀態
      int savedState = fullyRelease(node);
      int interruptMode = 0;
      //判斷結點是否同步佇列(SyncQueue)中,即是否被喚醒
      while (!isOnSyncQueue(node)) {
          //掛起執行緒
          LockSupport.park(this);
          //判斷是否被中斷喚醒,如果是退出迴圈。
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
      }
      //被喚醒後執行自旋操作爭取獲得鎖,同時判斷執行緒是否被中斷
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
          interruptMode = REINTERRUPT;
       // clean up if cancelled
      if (node.nextWaiter != null) 
          //清理等待佇列中不為CONDITION狀態的結點
          unlinkCancelledWaiters();
      if (interruptMode != 0)
          reportInterruptAfterWait(interruptMode);
  }

執行addConditionWaiter()新增到等待佇列。

 private Node addConditionWaiter() {
   Node t = lastWaiter;
     // 判斷是否為結束狀態的結點並移除
     if (t != null && t.waitStatus != Node.CONDITION) {
         unlinkCancelledWaiters();
         t = lastWaiter;
     }
     //建立新結點狀態為CONDITION
     Node node = new Node(Thread.currentThread(), Node.CONDITION);
     //加入等待佇列
     if (t == null)
         firstWaiter = node;
     else
         t.nextWaiter = node;
     lastWaiter = node;
     return node;
}

await()方法主要做了3件事,一是呼叫addConditionWaiter()方法將當前執行緒封裝成node結點加入等待佇列,二是呼叫fullyRelease(node)方法釋放同步狀態並喚醒後繼結點的執行緒。三是呼叫isOnSyncQueue(node)方法判斷結點是否在同步佇列中,注意是個while迴圈,如果同步佇列中沒有該結點就直接掛起該執行緒,需要明白的是如果執行緒被喚醒後就呼叫acquireQueued(node, savedState)執行自旋操作爭取鎖,即當前執行緒結點從等待佇列轉移到同步佇列並開始努力獲取鎖。

接著看看喚醒操作singal()方法

 public final void signal() {
     //判斷是否持有獨佔鎖,如果不是丟擲異常
   if (!isHeldExclusively())
          throw new IllegalMonitorStateException();
      Node first = firstWaiter;
      //喚醒等待佇列第一個結點的執行緒
      if (first != null)
          doSignal(first);
 }

這裡signal()方法做了兩件事,一是判斷當前執行緒是否持有獨佔鎖,沒有就丟擲異常,從這點也可以看出只有獨佔模式先採用等待佇列,而共享模式下是沒有等待佇列的,也就沒法使用Condition。二是喚醒等待佇列的第一個結點,即執行doSignal(first)

private void doSignal(Node first) {
     do {
             //移除條件等待佇列中的第一個結點,
             //如果後繼結點為null,那麼說沒有其他結點將尾結點也設定為null
            if ( (firstWaiter = first.nextWaiter) == null)
                 lastWaiter = null;
             first.nextWaiter = null;
          //如果被通知節點沒有進入到同步佇列並且條件等待佇列還有不為空的節點,則繼續迴圈通知後續結點
         } while (!transferForSignal(first) &&
                  (first = firstWaiter) != null);
        }

//transferForSignal方法
final boolean transferForSignal(Node node) {
    //嘗試設定喚醒結點的waitStatus為0,即初始化狀態
    //如果設定失敗,說明當期結點node的waitStatus已不為
    //CONDITION狀態,那麼只能是結束狀態了,因此返回false
    //返回doSignal()方法中繼續喚醒其他結點的執行緒,注意這裡並
    //不涉及併發問題,所以CAS操作失敗只可能是預期值不為CONDITION,
    //而不是多執行緒設定導致預期值變化,畢竟操作該方法的執行緒是持有鎖的。
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
         return false;

        //加入同步佇列並返回前驅結點p
        Node p = enq(node);
        int ws = p.waitStatus;
        //判斷前驅結點是否為結束結點(CANCELLED=1)或者在設定
        //前驅節點狀態為Node.SIGNAL狀態失敗時,喚醒被通知節點代表的執行緒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            //喚醒node結點的執行緒
            LockSupport.unpark(node.thread);
        return true;
    }

註釋說得很明白了,這裡我們簡單整體說明一下,doSignal(first)方法中做了兩件事,從條件等待佇列移除被喚醒的節點,然後重新維護條件等待佇列的firstWaiterlastWaiter的指向。二是將從等待佇列移除的結點加入同步佇列(在transferForSignal()方法中完成的),如果進入到同步佇列失敗並且條件等待佇列還有不為空的節點,則繼續迴圈喚醒後續其他結點的執行緒。到此整個signal()的喚醒過程就很清晰了,即signal()被呼叫後,先判斷當前執行緒是否持有獨佔鎖,如果有,那麼喚醒當前Condition物件中等待佇列的第一個結點的執行緒,並從等待佇列中移除該結點,移動到同步佇列中,如果加入同步佇列失敗,那麼繼續迴圈喚醒等待佇列中的其他結點的執行緒,如果成功加入同步佇列,那麼如果其前驅結點是否已結束或者設定前驅節點狀態為Node.SIGNAL狀態失敗,則通過LockSupport.unpark()喚醒被通知節點代表的執行緒,到此signal()任務完成,注意被喚醒後的執行緒,將從前面的await()方法中的while迴圈中退出,因為此時該執行緒的結點已在同步佇列中,那麼while (!isOnSyncQueue(node))將不在符合迴圈條件,進而呼叫AQSacquireQueued()方法加入獲取同步狀態的競爭中,這就是等待喚醒機制的整個流程實現原理,流程如下圖所示(注意無論是同步佇列還是等待佇列使用的Node資料結構都是同一個,不過是使用的內部變數不同罷了) 在這裡插入圖片描述