1. 程式人生 > >多執行緒之美7一ReentrantReadWriteLock原始碼分析

多執行緒之美7一ReentrantReadWriteLock原始碼分析

目錄

前言

在多執行緒環境下,為了保證執行緒安全, 我們通常會對共享資源加鎖操作,我們常用Synchronized關鍵字或者ReentrantLock 來實現,這兩者加鎖方式都是排他鎖,即同一時刻最多允許一個執行緒操作,然而大多數場景中對共享資源讀多於寫,那麼存線上程安全問題的是寫操作(修改,新增,刪除),我們是否應該考慮將讀和寫兩個分開,只要運用合理,併發效能是不是可以提高,吞吐量增大呢? ReentrantReadWriteLock已經為我們實現了這種機制,我們一起來看它是怎樣實現的吧!

1、讀寫鎖的一些概念

在檢視可重入讀寫鎖的原始碼前,有幾個概念需要先知道,對於後面理解原始碼很有幫助。

1、ReentrantReadWriteLock 內部 Sync類依然是繼承AQS實現的,因此同步狀態欄位 state,依然表示對鎖資源的佔用情況。那麼如何實現一個 int型別的state 同時來表示讀寫鎖兩種狀態的佔用情況呢? 這裡實現非常巧妙,將4個位元組的int型別, 32位拆分為2部分,高16位表示讀鎖的佔用情況,低16位表示寫鎖的佔用情況,這樣讀寫鎖互不影響,相互獨立;也因此讀寫鎖的最大值是2^16-1 = 65535,不能超過16位,下面原始碼有體現。

state值表示如圖所示:

2、讀鎖是共享鎖,只要不超過最大值,可多個執行緒同時獲取; 寫鎖是排他鎖,同一時刻最多允許一個執行緒獲取。

寫鎖與其他鎖都互斥,含寫寫互斥,寫讀互斥,讀寫互斥。

3、state可同時表示讀寫鎖的狀態,state的高16位表示獲取讀鎖的執行緒數,讀鎖支援可重入,即一個執行緒也可多次獲取讀鎖,怎麼維護每個讀鎖執行緒的重入次數的? 每個執行緒有一個計數器 HoldCounter,用ThreadLocal來存放每個執行緒的計數器;state的低16位表示寫鎖的同步狀態,因為寫鎖是排他鎖,這裡就不能表示獲取寫鎖的執行緒數了,只能表示寫鎖的重入次數,獲取寫鎖的執行緒可多次重複獲取寫鎖(支援重入)。

讀鎖的計數器的實現原理如下:

可見ThreadLocalHoldCounter繼承 ThreadLocal,每個獲取讀鎖的執行緒是通過其本地變數來儲存自己的計數器,來統計獲取讀鎖的重入次數。ThreadLocal原理解析

    static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
          //重寫了ThreadLocal的initialValue方法
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

4、state的高16位需要記錄獲取讀鎖的執行緒數,每增加一個執行緒獲取讀鎖,在state的高16執行加1操作,即state+2^16,寫鎖增加重入次數,直接 state+1即可。

5、鎖降級:獲取寫鎖的執行緒,可以再次獲取到讀鎖,即寫鎖降級為讀鎖。

​ 讀鎖可以升級為寫鎖嗎? 不可以,因為存線上程安全問題,試想獲取讀鎖的執行緒有多個,其中一個執行緒升級為寫鎖,對臨界區資源進行操作,比如修改了某個值,對其他已經獲取讀鎖的執行緒不可見,出現執行緒安全問題。

程式碼演示:

1、讀寫狀態

AQS(AbstractQueuedSynchronizer的簡稱)中同步狀態欄位 private volatile int state, int型別,4個位元組,32位,拆分為高16位表示讀狀態,低16位表示寫狀態,如下定義了一些常量,實現獲取讀寫鎖的數量。

ReentrantReadWriteLock部分程式碼如下:

   //分隔位數,16位 
     static final int SHARED_SHIFT   = 16;
   //讀鎖加1的數量,1左位移16位, (16)0x10000  = (2)1000000000000000= (10) 65536
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
  //讀寫鎖的最大數量, (16)0xFFFFFFFF =(2)1111111111111111 =(10)65535 
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
  //寫鎖的掩碼,用於計算寫鎖重入次數時,將state的高16全部置為0, 等於(2)1111111111111111
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
   //獲取讀鎖數,表示當前有多少個執行緒獲取到讀鎖
   static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
  //獲取寫鎖重入次數(不等於0表示有執行緒持有獨佔鎖,大於1,表示寫鎖有重入)
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

分別看一下獲取讀寫鎖數量的方法。

獲取佔用讀鎖的執行緒數,程式碼如下:

 static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

傳入的c為 state,state 無符號右移16位,抹去低16位值,左邊補0

示例圖如下:

獲取寫鎖的值的方法

  static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

與運算,將高16全部置為0,低16值代表寫鎖的值,&運算,相同為1,不同為0,得到低16位寫鎖值。

示例圖如下:

2、三個鎖概念

  1. int c =getState() ,獲取state的值,代表同步鎖狀態,該值包含讀寫兩個鎖的同步狀態
  2. int w = exclusiveCount(c); w代表寫鎖的同步狀態,通過c獲取到寫鎖的狀態值
  3. int r = sharedCount(c); r 代表讀鎖的同步狀態,通過c獲取到讀鎖的狀態值

以下分析三種情況下state,r, w 的值及代表的含義:

  • 1、一個執行緒獲取到寫鎖:

state =1, w =1, r =0

獲取寫鎖加1操作就比較簡單了,因為寫鎖是獨佔鎖,與正常的ReentrantLock獲取鎖實現一樣,佔用state的低16位表示,不用看state的高16,左邊補16位0。獲取寫鎖一次,直接 c+1;

  • 2、一個執行緒獲取到讀鎖:

state =65536, w= 0, r=1

c初始為0 ,獲取讀鎖,則讀鎖數量+1,執行 c + SHARED_UNIT, SHARED_UNIT = (2)1000000000000000 = (10)65536,括號內表示進位制,SHARED_UNIT是每次讀鎖加1的數值。

如下圖所示: 在獲取讀鎖數量 r時,將state的低16位抹去,r=1,而state此時的值= 2^16 =65536,state的實際值可能會很大,但其實分別拆分讀寫鎖的值不一定大,只是讀鎖值表示在高位,會造成state值很大。

  • 3、一個執行緒獲取到寫鎖,又獲取到讀鎖情況(鎖降級):

state = 65537,w=1, r=1

state二進位制表示: 00000000 00000001 00000000 00000001

鎖降級程式碼演示如下:

package readwritelock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
 * @author zdd
 * 2019/12/30  上午
 * Description: 鎖降級測試
 */
public class ReadWriteLockTest {
    static Integer shareVar = 0;
    public static void main(String[] args) {
        ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
        //1,首先獲取寫鎖
        rw.writeLock().lock();
        //2.修改共享變數值
        shareVar = 10 ;
        //3.再獲取讀鎖
        rw.readLock().lock();
        System.out.println("讀取變數值 shareVar:"+ shareVar);
        //4.釋放寫鎖
        rw.writeLock().unlock();
        //5.釋放讀鎖
        rw.readLock().unlock();
    }
}

2、類結構和構造方法

ReentrantReadWriteLock 類中有ReadLock和WriteLock,分別對應讀鎖和寫鎖,而讀寫鎖又分為公平方式和非公平方式獲取鎖。

簡略類圖結構如下:

構造方法如下:根據傳入引數設定公平或者非公平獲取鎖方式,預設是非公平方式

  public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

3、寫鎖

由於寫鎖是獨佔鎖,由於寫鎖是獨佔鎖,獲取寫鎖的方式在AQS中已經說過了,詳見AQS原始碼分析, 只是每個子類的嘗試獲取鎖方式不同,所以ReentrantReadWriteLock類獲取寫鎖過程就看一下嘗試獲取鎖方法的原始碼。

3.1、嘗試獲取鎖

tryAcquire(int acquires),獲取鎖失敗則加入同步佇列中等待獲取鎖,原始碼如下:

 protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
           //1,獲取同步狀態state的值,注意該值可表示讀寫鎖的同步狀態
            int c = getState();
          //2,獲取寫鎖狀態,低16位的值
            int w = exclusiveCount(c);
          //3,如果同步鎖狀態不為0,有執行緒已經獲取到了鎖 
            if (c != 0) {
        //4,w==0則表示寫鎖為0,那麼一定有執行緒獲取了讀鎖,需要等待,讀寫互斥
 //current != getExclusiveOwnerThread() 當前執行緒不等於已經獲取到寫鎖的執行緒,則也需等待其釋放,寫寫互斥
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
       //5,此時再次獲取鎖,判斷鎖重入次數是否超過最大限定次數
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //更新寫鎖重入次數
                setState(c + acquires);
                return true;
            }
      //6,程式碼執行這,一定是c==0,同步鎖空閒情況
     //writerShouldBlock該方法是基於公平鎖和非公平鎖2種方式的體現
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
       //獲取到鎖,設定獨佔鎖為當前寫鎖執行緒
            setExclusiveOwnerThread(current);
            return true;
        }

寫鎖是否應該阻塞等待

  • 1、 非公平鎖方式
  final boolean writerShouldBlock() {
          //直接返回false
            return false; // writers can always barge
        }
  • 2、公平鎖方式

需要判斷同步佇列中是否還有其他執行緒在掛起等待,如存在應該按照入隊順序獲取鎖

  final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
 public final boolean hasQueuedPredecessors() {
   //1.獲取同步佇列頭,尾節點
        Node t = tail; 
        Node h = head;
        Node s;
  // h !=t 同步佇列不為空
  // 佇列中還有其他執行緒在等待鎖,則返回true
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

3.2、釋放寫鎖

unlock方法釋放鎖

public void unlock() {
    sync.release(1);
}

可見,呼叫內部類Sync的release方法,Sync繼承AQS

public final boolean release(int arg) {
    if (tryRelease(arg)) {
       //1,釋放鎖成功
        Node h = head;
        if (h != null && h.waitStatus != 0)
        //2.喚醒同步佇列中等待執行緒
            unparkSuccessor(h);
        return true;
    }
    return false;
}

核心在嘗試釋放鎖方法上,看看寫鎖的釋放鎖方法tryRelease

   protected final boolean tryRelease(int releases) {
           //1,判斷當前執行緒是否持有當前鎖
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //2,同步狀態 - 需要釋放的寫鎖同步值
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
              //3,free ==true,完全釋放寫鎖,將當前獲取獨佔鎖執行緒置空
                setExclusiveOwnerThread(null);
           //4,更新state值
            setState(nextc);
            return free;
        }

注: 在釋放寫鎖佔用次數時, state的高16的讀鎖有值也不影響,減去releases,首先減去的state低位的數,而且在釋放寫鎖時,state的低16位的值一定>=1,不存在減少讀鎖的值情況。

   int nextc = getState() - releases;
   boolean free = exclusiveCount(nextc) == 0;

也可改寫為如下面程式碼

//1,獲取state值
int c = getState();
//2,獲取寫鎖的值
int w= exclusiveCount(c);
int remain = w- releases;
boolean free = remain== 0;

4、讀鎖

4.1、獲取讀鎖

讀鎖呼叫lock方法加鎖,實際呼叫Sync的acquireShared方法

  public void lock() {
            sync.acquireShared(1);
        }

走進acquireShared,獲取共享鎖方法

 public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

嘗試獲取鎖tryAcquireShared,如果返回值<0, 表示獲取讀鎖失敗

主要執行步驟:

1、首先判斷是否存在其他執行緒在佔用寫鎖,有需要掛起等待;

2、在不用阻塞等待,且讀鎖值沒有超過最大值,cas更新成功了state的值,可以獲取到讀鎖,還會做以下事:

​ a. 第一個獲取讀鎖的,直接記錄執行緒物件和其重入獲取讀鎖的次數

​ b. 非第一個獲取讀鎖的,則獲取快取計數器(cachedHoldCounter),其記錄上一次獲取讀鎖的執行緒,如果是同一個執行緒,則直接更新其計數器的重入次數,如果快取計數器為空或快取計數器的執行緒不是當前獲取讀鎖的執行緒,則從當前執行緒本地變數中獲取自己的計數器,更新計數器的值

  protected final int tryAcquireShared(int unused) {
          //1,獲取當前執行緒物件
            Thread current = Thread.currentThread();
          //2,獲取同步鎖的值
            int c = getState();
         /*3,exclusiveCount(c) != 0 計算寫鎖的同步狀態,不等於0,說明有寫鎖已經獲取到同步鎖,
          *需要判斷當前執行緒是否等於獲取寫鎖執行緒,
          *是,可以允許再次獲取讀鎖,這裡涉及到鎖降級問題,寫鎖可以降為讀鎖
          *否則不讓獲取,寫讀互斥
         */
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
       //4,獲取讀鎖同步狀態
            int r = sharedCount(c);
    /**
      *此處3個判斷條件
      * 1.是否應該阻塞等待,這裡也是基於公平鎖和非公平獲取鎖實現 
      * 2.讀鎖同步狀態值是超過最大值,即限制獲取讀鎖的最大執行緒數
      * 3.cas更新讀鎖同步狀態是否成功
      */
      if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
        //可以獲取到讀鎖
         //r==0表示是第一個獲取讀鎖的執行緒
                if (r == 0) {
                    firstReader = current;
                   //記錄第一個執行緒讀鎖的重入次數
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                  //是第一個獲取讀鎖執行緒,鎖重入,鎖重入次數+1
                    firstReaderHoldCount++;
                } else {
               // 已有其他執行緒獲取到讀鎖 
        /*
         *1,獲取快取記錄的計數器,計數器是用來統計每一個獲取讀鎖執行緒的重入次數的,
         *由每個執行緒的ThreadLocal,即執行緒內的副本儲存,相互獨立;
         *此處也不是放入快取,在有多個執行緒同時獲取讀鎖情況,
         *用一個變數記錄上一個獲取讀鎖的執行緒的計數器,可能考慮多次獲取讀鎖執行緒大概率是同一個執行緒情況,
         *這樣做是可提高執行效率
          */
                    HoldCounter rh = cachedHoldCounter;
          // rh==null,第一個獲取讀鎖,rh沒有值
   // 或者計數器儲存的上一次執行緒的id與當前執行緒不等, 即不是相同一個執行緒,
  //那麼就獲取當前執行緒內部的計數器,並賦值給cachedHoldCounter變數,這樣可以讓下一次獲取讀鎖執行緒獲取比較了
       if (rh == null || rh.tid != getThreadId(current))
              cachedHoldCounter = rh = readHolds.get();
         else if (rh.count == 0)
  /*進入該條件,我理解是線上程獲取讀鎖再釋放後,同一執行緒再次獲取讀鎖情況,
   * 快取計數器會記錄上一個執行緒計數器,因為執行緒釋放讀鎖後,count=0,
   * 這裡重新將計數器放入執行緒內部中,
   * 因為執行緒在使用完執行緒內部變數後會防止記憶體洩漏,會執行remove,釋放本地儲存的計數器。
   */
                readHolds.set(rh);
        //計數器+1 
         rh.count++;
              }
                return 1;
            }
       //上面3個條件沒有同時滿足,沒有成功獲取到讀鎖,開始無限迴圈嘗試去獲取讀鎖
            return fullTryAcquireShared(current);
        }

無限迴圈嘗試獲取共享鎖 fullTryAcquireShared方法

主要執行步驟:

1、 如果有其他執行緒獲取到了寫鎖,寫讀互斥,應該去掛起等待;

2、如果可以獲取讀鎖,判斷是否應該阻塞等待,在公平獲取鎖方式中,同步佇列中有其他執行緒在等待,則應該去排隊按照FIFO順序獲取鎖,非公平獲取鎖方式,可以直接去競爭獲取鎖。

3、可以獲取鎖,則嘗試cas更新state的值,更新成功,獲取到鎖。

  final int fullTryAcquireShared(Thread current){
            HoldCounter rh = null;
        //無限迴圈
            for (;;) {
              //獲取同步鎖狀態
                int c = getState();
               //判斷寫鎖值不為0,且不是當前執行緒,不可獲取讀鎖
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                } else if (readerShouldBlock()) {
               //沒有執行緒獲取到寫鎖情況,公平獲取鎖情況,
               //同步佇列中有其他執行緒等待鎖,該方法主要是在需要排隊等待,計數器重入次數==0情況,清除計數器
                    if (firstReader == current) {
               //此處firstReader !=null, 則第1個獲取讀鎖的執行緒還沒釋放鎖,可允許該執行緒繼續重入獲取鎖
               //計數器count一定>0
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                  //清除計數器
                                    readHolds.remove();
                            }
                        }
              // 為什麼rh.count == 0就不讓執行緒獲取到鎖了,基於公平獲取鎖方式,去同步佇列中等待
                        if (rh.count == 0)
                            return -1;
                    }
                }
                //獲取讀鎖執行緒超過最大限制值 65535
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
               // cas執行讀鎖值+1
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                      //1,第一個獲取讀鎖
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                      //2,第一個獲取讀鎖重入
                        firstReaderHoldCount++;
                    } else {
                      //3,非第一個執行緒獲取讀鎖,存在多個執行緒獲取讀鎖
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                      //快取計數器變數記錄此次獲取讀鎖執行緒的計數器
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

tryAcquireShared 返回< 0, 獲取鎖失敗,執行 doAcquireShared

在獲取讀鎖失敗後,執行以下步驟:

1、將節點加入同步佇列中

2、如果前置節點是頭節點,將再次嘗試獲取鎖,如果成功,設定當前節點為head節點,並根據tryAcquireShared方法的返回值r判斷是否需要繼續喚醒後繼節點,如果 r大於0,需要繼續喚醒後繼節點,r=0不需要喚醒後繼節點。

3、如果前置節點不是頭節點,則在佇列中找到安全位置,設定前置節點 ws=SIGNAL, 掛起等待。

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                   //如果前繼節點是頭節點,再次嘗試獲取共享鎖
                    int r = tryAcquireShared(arg);
                   //r>=0,表示獲取到鎖, 
                   //r=0,表示不需要喚醒後繼節點
                   //r>0,需要繼續喚醒後繼節點
                    if (r >= 0) {
                       //該方法實現2個步驟
                       //1,設定當前節點為頭節點
                       //2,r>0情況會繼續喚醒後繼節點
                        setHeadAndPropagate(node, r);
                      //舊的頭節點移出佇列
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

setHeadAndPropagate 該方法是與獨佔鎖獲取鎖的區別之處,獲取到鎖後,設定為頭結點還需要繼續傳播下去。

private void setHeadAndPropagate(Node node, int propagate) {
   //記錄是的舊的頭節點
   Node h = head; // Record old head for check 
   //設定當前獲取到鎖節點為頭節點
    setHead(node);
   //propagate >0,表示還需要繼續喚醒後繼節點
   //舊的頭節點和新頭節點為空,或者ws<0,滿足條件之一,嘗試去喚醒後繼節點
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
       //後繼節點為空或者是共享節點(獲取讀鎖的執行緒)
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

doReleaseShared 方法較難理解,在釋放鎖中也有呼叫,留著後面一起分析。

4.2、釋放讀鎖

public void unlock() {
    sync.releaseShared(1);
}

AQS中釋放共鎖方法releaseShared

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

看一下讀寫鎖具體實現tryReleaseShared 的方法

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
  //1,更新或者移出執行緒內部計數器的值
    if (firstReader == current) {
        //當前執行緒是第一個獲取讀鎖的執行緒
        if (firstReaderHoldCount == 1)
          //直接置空
            firstReader = null;
        else
          //該執行緒獲取讀鎖重入多次,計數器-1
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
          //非第一個獲取讀鎖執行緒,避免ThreadLocal記憶體洩漏,移出計數器
            readHolds.remove();
            if (count <= 0)
             //此處是呼叫釋放鎖次數比獲取鎖次數還多情況,直接拋異常
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
   //2,迴圈cas更新同步鎖的值
    for (;;) {
        int c = getState();
        //讀鎖同步狀態-1
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
          //返回完全釋放讀鎖,讀鎖值是否==0,完全釋放,等待寫鎖執行緒可獲取
            return nextc == 0;
    }
}

tryReleaseShared 返回true情況,表示完全釋放讀鎖,執行doReleaseShared,那就需要喚醒同步佇列中等待的其他執行緒

在讀寫鎖中存在幾種情況

情況一、如果當前獲取鎖的執行緒佔用的是寫鎖,則後來無論是獲取讀鎖還寫鎖的執行緒都會被阻塞在同步佇列中,

同步佇列是FIFO佇列,在佔用寫鎖的釋放後,node1獲取讀鎖,因讀鎖是共享的,繼續喚醒後一個共享節點。

如上圖,在node1獲取到讀鎖時,會呼叫doReleaseShared方法,繼續喚醒下一個共享節點node2,可以持續將喚醒動作傳遞下去,如果node2後面還存在幾個等待獲取讀鎖的執行緒,這些執行緒是由誰喚醒的?是其前置節點,還是第一個獲取讀鎖的節點? 應該是第1個獲取鎖的節點,這裡即node1, 由下程式碼可見,在無限迴圈中,只有頭節點沒有變化時,即再沒其他節點獲取到鎖後,才會跳出迴圈。

private void doReleaseShared() {
    for (;;) {
      //獲取同步佇列中頭節點
        Node h = head;
      //同步佇列中節點不為空,且節點數至少2個
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //1,表示後繼節點需要被喚醒
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
               //喚醒後繼節點 
                unparkSuccessor(h);
            }
           //2,後繼節點暫時不需要喚醒,設定節點 ws = -3, 確保後面可以繼續傳遞下去
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //如果頭節點發生變化,表示已經有其他執行緒獲取到鎖了,需要重新迴圈,確保可以將喚醒動作傳遞下去。
        if (h == head)                   // loop if head changed
            break;
    }
}

5、思考

1、在非公平獲取鎖方式下,是否存在等待獲取寫鎖的執行緒始終獲取不到鎖,每次都被後來獲取讀鎖的執行緒搶先,造成飢餓現象?

存在這種情況,從獲取讀鎖原始碼中看出,如果第一個執行緒獲取到讀鎖正在執行情況下,第二個等待獲取寫鎖的執行緒在同步佇列中掛起等待,在第一個執行緒沒有釋放讀鎖情況下,又陸續來了執行緒獲取讀鎖,因為讀鎖是共享的,執行緒都可以獲取到讀鎖,始終是在讀鎖沒有釋放完畢加入獲取讀鎖的執行緒,那麼等待獲取寫鎖的執行緒是始終拿不到寫鎖,導致飢餓。為什麼預設還是非公平模式?因為減少執行緒的上下文切換,保證更大的吞吐量。

6、總結

1、讀寫鎖可支援公平和非公平兩種方式獲取鎖。

2、支援鎖降級,寫鎖可降級為讀鎖,但讀鎖不可升級為寫鎖。

3、大多數場景是讀多於寫的,所以ReentrantReadWriteLock 比 ReentrantLock(排他鎖)有更好的併發效能和吞吐量。

4、讀寫鎖中讀鎖和寫鎖都支援鎖重入。

5、在獲取Condition物件實現阻塞喚醒機制,ReentrantReadWriteLock.WriteLock 重寫了 newCondition方法,ReadLock不支援,即讀鎖不支援與Condition配合使用,使用阻塞喚醒機制