1. 程式人生 > >java鎖的語義及ReentrantLock原始碼剖析

java鎖的語義及ReentrantLock原始碼剖析

鎖是java併發的重要機制,它除了可以讓臨界區互斥執行以外,還可以向其它執行緒傳送訊息。

監視器鎖

class MonitorExample {
    int a = 0;
    public synchronized void write() {     //1
	a++;                                //2
     }                                      //3
    public synchronized void read() {     //4
       int i = a;                           //5
     ...
     }                                      //6
}
下面使用happens-before來分析其執行過程。happens-before規則詳見 http://blog.csdn.net/quanzhongzhao/article/details/45619135 。
1、程式順序規則:1 happens before 2, 2 happens before 3; 4 happens before 5, 5 happens before 6。

2、監視器鎖規則:3 happens before4。

3、happens-before規則的傳遞性:2 happens-before 5。

假如A執行緒先執行write()方法,即先獲取監視器鎖,然後B執行緒使用方法read()方法獲取變數a的值。因為2 happens-before5,所以A執行緒在釋放監視器鎖之前對變數a的操作,在A釋放監視器(按照監視器鎖的語義,釋放監視器 將本地記憶體中共享變數重新整理到主記憶體)。所以隨後B獲取監視器鎖(獲取監視器會清空本地記憶體共享變數資料,並從主記憶體讀取相應資料)後A對a的操作對B都是可見的。

JUC併發包中的鎖

下面以ReenterantLock為例,說一下JUC併發包中的鎖是如何實現的。
class ReentrantLockExample {
  int a = 0;
  ReentrantLock lock = new ReentrantLock();
  public void write() {
    lock.lock(); //獲取鎖
  try {
    a++;
      } finally {
<span style="white-space:pre">	</span>lock.unlock(); //釋放鎖
<span style="white-space:pre">	</span>}
    }
  public void reade () {
    lock.lock(); //獲取鎖
    try {
        int i = a;
        ……
    } finally {
       lock.unlock(); //釋放鎖
    }
  }
}
ReentrantLock是一種互斥鎖,但是是可重入的,即獲得該鎖的執行緒可以多次再獲得該鎖。ReentrantLock的實現依賴於其中的Sync型別變數。

而Sync是同步器AQS的子類,AQS即AbstractQueuedSynchronizer類。
AbstractQueuedSynchronizer使用一個整型volatile變數來維護同步器的狀態,這個volatile的變數state是鎖記憶體語義實現的關鍵。
     /**
     * The synchronization state.
     */
  private volatile int state;
先看看ReentrantLock的建構函式吧
    public ReentrantLock() {  
        sync = new NonfairSync();   //預設使用非公平鎖
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync() ;  //若傳入引數fair = true,則使用公平鎖
    }
公平鎖與非公平鎖的區別主要在獲得鎖時的策略。後面再具體分析。下面分析一下我們使用lock.lock()方法和lock.unlock時候都發生了什麼。
   public void lock() {
        sync.lock();
    }
先來看看公平鎖的lock.lock()方法
     final void lock() {
            acquire(1);   //直接呼叫acquire()方法。 
        }
再來看看非公平鎖的實現,此處(lock方法)為公平鎖與非公平鎖的第一處不同。
     final void lock() {
            if (compareAndSetState(0, 1))                         //首先CAS判斷同步器的狀態state是否為0,為0表示當前鎖可獲得,獲取該鎖。
                setExclusiveOwnerThread(Thread.currentThread());  //並設定當前執行緒為鎖的擁有者。
            else
                acquire(1);                                      //若CAS失敗,則同公平鎖一樣呼叫acquire()方法。
        }
acquire()方法由AbstractQueuedSynchronizer定義
   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
巨集觀上解釋一下acquire()方法 1、使用tryAcquire()方法嘗試獲取鎖,獲取成功則該方法直接返回true 表示獲取成功 。該方法NonfairSync和FairSync都有實現。 2、嘗試獲取失敗,則新生成一個節點(Node),其型別為互斥鎖(Exclusive),存放當前執行緒。當前執行緒獲取鎖失敗,則使用acquireQueued方法讓等待佇列中執行緒獲取鎖, 該方法是自旋的,即獲取成功才能返回,保證了等待佇列的非阻塞。且該方法是非中斷模式的,即不響應中斷,但會記錄中斷狀態。 3、acquireQueued方法呼叫過程中可能會有中斷產生,有的話,執行完畢後在此處引發一箇中斷。 當多個執行緒嘗試獲取ReentrantLock鎖時,只有一個執行緒能夠獨佔該鎖,其它執行緒則放入一個由Node組成等待佇列中。其中每個Node持有一個執行緒,並維護節點的狀態值。
static final class Node {
        
        static final Node SHARED = new Node(); //表示節點是共享節點
        static final Node EXCLUSIVE = null;    //獨佔節點
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
 	volatile int waitStatus; //節點狀態值。為SIGNAL/CANCELLED/CONDITION/PROPAGATE。    
        volatile Node prev;      //等待佇列下一節點  
	volatile Node next;
	volatile Thread thread; //節點持有的執行緒。一個節點代表一個等待的執行緒。 
 	Node nextWaiter;       //用於waitStatus=CONDITION的節點。
        Node() {}    // Used to establish initial head or SHARED marker
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
       }
      Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
由上可知,節點的狀態值也是一個volatile變數。 當waitStatus=SIGNAL時,表示該節點的後繼節點正在阻塞狀態,當前執行緒釋放鎖時或者當前執行緒狀態變為CANCELLED,即取消獲取鎖時需要喚醒(unpark)後繼節點。為了避免競爭,acquire方法首先指示我們需要一個Signal
  acquireQueued(addWaiter(Node.EXCLUSIVE), arg) //即生成一個Node,其型別為EXCLUSIVE
當waitStatus=CANCELLED時,表示由於超時或者中斷導致該等待執行緒被取消,即執行緒不再獲取鎖。 當waitStatus=CONDITION時,表示節點在條件佇列中。(目前不太理解這一狀態) 當waitStatus=PROPAGATE時,用於共享鎖。releaseShared應該傳遞給佇列中的其它節點。用於doReleaseShared()方法中。

節點先說到這裡,下面詳細介紹acquire()方法。 其中tryAcquire()方法在AQS類中定義,NonfairSync類和FairSync類中分別覆寫該方法。此處(tryAcquire方法)為公平與非公平鎖的第二處不同
  protected boolean tryAcquire(int arg) {            
        throw new UnsupportedOperationException();        //AQS中tryAcquire()方法並未實現,也未使用抽象方法,考慮到AQS的兩種功能(共享鎖與互斥鎖)     
    }                                                     //避免子類實現抽象方法時需要實現兩種功能。

//非公平鎖版本
  protected final boolean tryAcquire(int acquires) {     
            return nonfairTryAcquire(acquires);
        }

  final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread(); //獲取當前執行緒
            int c = getState();           //獲取同步器狀態state
            if (c == 0) {                                    
                if (compareAndSetState(0, acquires)) {  //state=0,表示鎖空閒,可以獲取。使用CAS比較並置state的狀態為1。
                    setExclusiveOwnerThread(current);   //設定當前執行緒為鎖的持有者。
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {   // state!=0,則表示鎖已被某執行緒持有,則判斷鎖的持有執行緒 是否是 當前執行緒
                int nextc = c + acquires;    //鎖的持有者為當前執行緒,則更新state的值。前面說過ReentrantLock可重入。
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);     //設定state的值為新值。      
                return true;
            }
            return false; //前面兩種條件都不滿足,則返回false,表示當前執行緒獲取鎖失敗   
        }  
   //公平鎖版本    
  protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();  //獲取當前執行緒
            int c = getState();                 //獲取同步器狀態
            if (c == 0) {                       //當前同步器state=0,鎖空閒,可獲取
                if (!hasQueuedPredecessors() &&    //判斷是否有等待佇列中是否有其它執行緒  
                    compareAndSetState(0, acquires)) {   //沒有的話,則嘗試獲取鎖
                    setExclusiveOwnerThread(current);  //獲取成功,設定當前執行緒為鎖的持有者。
                    return true;                               
                }
            }
            else if (current == getExclusiveOwnerThread()) {  //若state!=0,判斷鎖的持有執行緒是否是當前執行緒 
                int nextc = c + acquires;       //是,則重入,並更新鎖的狀態值state
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;       //表示獲取鎖成功  
            }
            return false;          //獲取失敗
        }
當tryAcquire方法失敗時,繼而呼叫addWaiter()方法,新建節點持有當前執行緒,然後將其加入等待佇列連結串列尾。
 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode); //持有當前執行緒,接收引數mode=EXCLUSIVE 生成新節點,
    
        Node pred = tail;     
        if (pred != null) {                 //判斷等待佇列是否為空 
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {  //佇列非空,嘗試CAS更新隊尾。
                pred.next = node;
                return node;                    //CAS成功,則返回該節點。
            }
        }
        enq(node);   //佇列為空或者CAS失敗 則以自旋的方式 加入佇列。             
        return node;
    }
  private Node enq(final Node node) {
        for (;;) {       //自旋,直到節點成功加入等待佇列。
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))  //佇列為空,建立新的頭結點。   
                    tail = head;
            } else {
                node.prev = t;         //非空,則CAS更改佇列尾部
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;     //表示是否成功獲取鎖
        try {
            boolean interrupted = false;
            for (;;) {      //自旋,直到 等待佇列中某一執行緒 成功獲取鎖
                final Node p = node.predecessor();   //獲得當前節點前驅
                if (p == head && tryAcquire(arg)) {  //其前驅節點為佇列頭節點,則嘗試獲取鎖。
                    setHead(node);                  //獲取鎖成功,設定當前節點為頭結點,
                    p.next = null; // help GC    //此時原來的頭結點 p 已經無效,設定為null有助於垃圾回收。  
                    failed = false;
                    return interrupted;  //此過程中未產生中斷,interrupted=false;所以acquire()方法不需要呼叫 selfInterrupt()方法。
                }
                if (shouldParkAfterFailedAcquire(p, node) &&  //當前節點 前驅節點不是頭節點,或者 是頭結點但獲取鎖失 敗呼叫該函式。
                    parkAndCheckInterrupt())                  
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
繼續展開 shouldParkAfterFailedAcquire
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 
        int ws = pred.waitStatus;   //獲取前驅結點p的節點狀態waitStatus
        if (ws == Node.SIGNAL)      //前驅結點 waitStatus=SIGNAL,表示需要喚醒(unpark)其後繼節點
            return true;   //返回,進而呼叫 parkAndCheckInterrupt()
        if (ws > 0) {   //節點狀態不是SIGNAL,大於0 ,只可能是CANCELLED ,表示前驅結點被取消,不再獲取鎖。
            do {
                node.prev = pred = pred.prev;  
            } while (pred.waitStatus > 0);  //只要節點狀態為CANCELLED,則繼續向前搜尋前驅節點,即為當前節點找到一個狀態不為CANCELLED的前驅結點。
            pred.next = node;   //找到,將其設為當前節點的前驅節點。
        } else {
 
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //waitStatus must be 0 or PROPAGATE。 Indicate that we need a signal, but don't park yet
        }                 //Caller will need to retry to make sure it cannot acquire before parking 
        return false;
    }
private final boolean parkAndCheckInterrupt() {     //阻塞當前執行緒,並檢查acquireQueued函式執行過程是是否發生中斷
        LockSupport.park(this);                
        return Thread.interrupted();
    }
至此,lock.lock()方法執行完畢。 再看一下lock.unlock()方法。
 public void unlock() {
        sync.release(1);
    }
release()方法是在AQS中定義並實現的,子類並未實現。
   public final boolean release(int arg) {
        if (tryRelease(arg)) {  //嘗試釋放鎖         
            Node h = head;      
            if (h != null && h.waitStatus != 0) //釋放成功判斷頭結點是否為空,並且判斷其waitStatus狀態
                unparkSuccessor(h); //喚醒後繼節點。
            return true;
        }
        return false; //釋放失敗
    }
  protected boolean tryRelease(int arg) {  //AQS中tryRelease()方法並未實現,在子類Sync中實現。
        throw new UnsupportedOperationException();
    }
tryRelease()方法在Sync中實現,並無公平鎖與非公平鎖之分。
   protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);  //釋放鎖的最後,寫volatile變數state
            return free;
        }
釋放鎖的最後寫volatile變數state;在獲取鎖時首先讀這個volatile變數。根據volatile的happens-before規則,釋放鎖的執行緒對volatile變數的寫會重新整理到主記憶體,獲取鎖的執行緒讀會強制從主記憶體來讀取該共享變數。即volatile變數保證了共享變數線上程之間的可見性。

最後說一下CAS方法,該方法時原子的,判斷state值是否為expect,是則CAS更改state=update;
 protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
CAS操作具有volatile 讀和寫的記憶體語義。編譯器不會對volatile讀與volatile讀後面的任意記憶體操作重排序;編譯器不會對volatile寫與volatile寫前面的任意記憶體操作重排序。組合這兩個條件,意味著為了同時實現volatile讀和volatile寫的記憶體語義,編譯器不能對CAS與CAS前面和後面的任意記憶體操作重排序。而具體實現的時候,CAS是依靠處理器指令級別的控制來實現原子操作。
現在對公平鎖和非公平鎖的記憶體語義做個總結:
1、公平鎖和非公平鎖釋放時,最後都要寫一個volatile變數state。
2、 公平鎖獲取時,首先會去讀這個volatile變數。
3、非公平鎖獲取時,首先會用CAS更新這個volatile變數,這個操作同時具有volatile讀和volatile寫的記憶體語義。

JUC鎖記憶體語義實現的總結

從本文對ReentrantLock的分析可以看出,鎖釋放-獲取的記憶體語義的實現至少有下面兩種方式:
1. 利用volatile變數的寫-讀所具有的記憶體語義。
2. 利用CAS所附帶的volatile讀和volatile寫的記憶體語義。

由於java的CAS同時具有volatile 讀和volatile寫的記憶體語義,因此Java執行緒之間的通訊現在有了下面四種方式:
1. A執行緒寫volatile變數,隨後B執行緒讀這個volatile變數。
2. A執行緒寫volatile變數,隨後B執行緒用CAS更新這個volatile變數。
3. A執行緒用CAS更新一個volatile變數,隨後B執行緒用CAS更新這個volatile變數。
4. A執行緒用CAS更新一個volatile變數,隨後B執行緒讀這個volatile變數。
Java的CAS會使用現代處理器上提供的高效機器級別原子指令,這些原子指令以原子方式對記憶體執行讀-改-寫操作,這是在多處理器中實現同步的關鍵(從本質上來說,能夠支援原子性讀-改-寫指令的計算機器,是順序計算圖靈機的非同步等價機器,因此任何現代的多處理器都會去支援某種能對記憶體執行原子性讀-改-寫操作的原子指令)。同時,volatile變數的讀/寫和CAS可以實現執行緒之間的通訊。把這些特性整合在一起,就形成了整個concurrent包得以實現的基石。如果我們仔細分析concurrent包的原始碼實現,會發現一個通用化的實現模式:
1. 首先,宣告共享變數為volatile;
2. 然後,使用CAS的原子條件更新來實現執行緒之間的同步;
3. 同時,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的記憶體語義來實現執行緒之間的通訊。
AQS,非阻塞資料結構和原子變數類(java.util.concurrent.atomic包中的類),這些concurrent包中的基礎類都是使用這種模式來實現的,而concurrent包中的高層類又是依賴於這些基礎類來實現的。

本文鎖的語義實現部分主要參考了程曉明<深入理解java記憶體模型>第六章鎖的實現,其中最後仿宋字型部分為直接摘抄,其中AQS繼承體系圖來自 Java多執行緒系列--“JUC鎖”03之 公平鎖(一)  。如有錯漏,多多指出啊。