1. 程式人生 > >Java併發程式設計之鎖機制之ReentrantReadWriteLock(讀寫鎖)

Java併發程式設計之鎖機制之ReentrantReadWriteLock(讀寫鎖)

前言

在前面的文章中,我們講到了ReentrantLock(重入鎖),接下來我們講ReentrantReadWriteLock(讀寫鎖),該鎖具備重入鎖的可重入性可中斷獲取鎖等特徵,但是與ReentrantLock不一樣的是,在ReentrantReadWriteLock中,維護了一對鎖,一個讀鎖一個寫鎖,而讀寫鎖在同一時刻允許多個執行緒訪問。但是在寫執行緒訪問時,所有的讀執行緒和其他的寫執行緒均被阻塞。在閱讀本片文章之前,希望你已閱讀過以下幾篇文章:

基本結構

在具體瞭解ReentrantReadWriteLock之前,我們先看一下其整體結構,具體結構如下圖所示:

ReentrantReadWriteLock.png

從整體圖上來看,ReentrantReadWriteLock實現了ReadWriteLock介面,其中在ReentrantReadWriteLock中分別聲明瞭以下幾個靜態內部類:

  • WriteLockReadLock(維護的一對讀寫鎖):單從類名我們可以看出這兩個類的作用,就是控制讀寫執行緒的鎖
  • Sync及其子類NofairSyncFairSync:如果你閱讀過 Java併發程式設計之鎖機制之重入鎖中公平鎖與非公平鎖的介紹,那麼我們也可以猜測出ReentrantReadWriteLock(讀寫鎖)是支援公平鎖與非公平鎖的。
  • ThreadLoclHoldCounterHoldCounter:涉及到鎖的重進入,在下文中我們會具體進行描述。

基本使用

在使用某些種類的Collection時,可以使用ReentrantReadWriteLock 來提高併發性。通常,在預期Collection 很大,且讀取執行緒訪問它的次數多於寫入執行緒的情況下,且所承擔的操作開銷高於同步開銷時,這很值得一試。例如,以下是一個使用 TreeMap(我們假設預期它很大,並且能被同時訪問) 的字典類。

class RWDictionary {
    private final Map<String, Data> m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();//獲取讀鎖
    private final Lock w = rwl.writeLock();//獲取寫鎖
    
	//讀取Map中的對應key的資料
    public Data get(String key) {
        r.lock();
        try { return m.get(key); }
        finally { r.unlock(); }
    }
    //讀取Map中所有的key
    public String[] allKeys() {
        r.lock();
        try { return m.keySet().toArray(); }
        finally { r.unlock(); }
    }
    //往Map中寫資料
    public Data put(String key, Data value) {
        w.lock();
        try { return m.put(key, value); }
        finally { w.unlock(); }
    }
    //清空資料
    public void clear() {
        w.lock();
        try { m.clear(); }
        finally { w.unlock(); }
    }
 }
複製程式碼

在上述例子中,我們分別對TreeMap中的讀取操作進行了加鎖的操作。當我們呼叫get(String key)方法,去獲取TreeMap中對應key值的資料時,需要先獲取讀鎖。那麼其他執行緒對於寫鎖的獲取將會被阻塞,而對獲取讀鎖的執行緒不會阻塞。同理,當我們呼叫put(String key, Data value)方法,去更新資料時,我們需要獲取寫鎖。那麼其他執行緒對於寫鎖與讀鎖的獲取都將會被阻塞。只有當獲取寫鎖的執行緒釋放了鎖之後。其他讀寫操作才能進行。

這裡可能會有小夥伴會有疑問,為什麼當獲取寫鎖成功後,會阻塞其他的讀寫操作?,這裡其實是為了保證資料可見性。如果不阻塞其他讀寫操作,假如讀操作優先與寫操作,那麼在資料更新之前,讀操作獲取的資料與寫操作更新後的資料就會產生不一致的情況。

需要注意的是:ReentrantReadWriteLock最多支援 65535 個遞迴寫入鎖和65535個讀取鎖。試圖超出這些限制將導致鎖方法丟擲 Error。具體原因會在下文進行描述。

實現原理

到現在為止,我們已經基本瞭解了ReentrantReadWriteLock的基本結構與基本使用。我相信大家肯定對其內部原理感到好奇,下面我會帶著大家一起去了解其內部實現。這裡我會對整體的一個原理進行分析,內部更深的細節會在下文進行描述。因為我覺得只有理解整體原理後,再去理解其中的細節。那麼對整個ReentrantReadWriteLock(讀寫鎖)的學習來說,要容易一點。

整體原理

在前文中,我們介紹了ReentrantReadWriteLock的基本使用,我們發現整個讀寫鎖對執行緒的控制是交給了WriteLockReadLock。當我們呼叫讀寫鎖的lock()方法去獲取相應的鎖時,我們會執行以下程式碼:

 public void lock() { sync.acquireShared(1);}
複製程式碼

也就是會呼叫sync.acquireShared(1),而sync又是什麼呢?從其建構函式中我們也可以看出:

 public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
複製程式碼

其中關於FairSyncNonfairSync的宣告如下所示:

//同步佇列
abstract static class Sync extends AbstractQueuedSynchronizer {省略部分程式碼...}
//非公平鎖
static final class NonfairSync extends Sync{省略部分程式碼...}
//公平鎖
static final class FairSync extends Sync {省略部分程式碼...}
複製程式碼

這裡我們又看到了我們熟悉的AQS,也就是說WriteLockReadLock這兩個鎖,其實是通過AQS中的同步佇列來對執行緒的進行控制的。那麼結合我們之前的AQS的知識,我們可以得到下圖:

(如果你對AQS不熟,那麼你可以閱讀該篇文章---->Java併發程式設計之鎖機制之AQS(AbstractQueuedSynchronizer)

讀寫鎖狀態關係圖.png
這裡我省略了 為什麼維護的是同一個同步佇列的原因,這個問題留給大家。

讀寫狀態設計

雖然現在我們已經知道了,WriteLockReadLock這兩個鎖維護了同一個同步佇列,但是我相信大家都會有個疑問,同步佇列中只有一個int型別的state變數來表示當前的同步狀態。那麼其內部是怎麼將兩個讀寫狀態分開,並且達到控制執行緒的目的的呢?

ReentrantReadWriteLock中的同步佇列,其實是將同步狀態分為了兩個部分,其中高16位表示讀狀態低16位表示寫狀態,具體情況如下圖所示:

讀寫鎖狀態劃分.png

在上圖中,我們能得知,讀寫狀態能表示的最大值為65535(排除負數),也就是說允許鎖重進入的次數為65535次。

接下來 我們單看高16位,這裡表示當前執行緒已經獲取了寫鎖,且重進入了七次。同樣的這裡如果我們也只但看低16位,那麼就表示當前執行緒獲取了讀鎖,且重進入了七次。這裡大家需要注意的是,在實際的情況中,讀狀態與寫狀態是不能被不同執行緒同時賦值的。因為根據ReentrantReadWriteLock的設計來說,讀寫操作執行緒是互斥的。上圖中這樣表示,只是為了幫助大家理解同步狀態的劃分

到現在為止我們已經知道同步狀態的劃分,那接下來又有新的問題了。如何快速的區分及獲取讀寫狀態呢?其實也非常簡單。

  • 讀狀態:想要獲取讀狀態,只需要將當前同步變數無符號右移16位
  • 寫狀態:我們只需要將當前同步狀態(這裡用S表示)進行這樣的操作S&0x0000FFFF),也就是S&(1<<16-1)

也就是如下圖所示(可能圖片不是很清楚,建議在pc端上觀看):

讀寫鎖狀態原理.png

細節分析

在瞭解了ReentrantReadWriteLock的整體原理及讀寫狀態的劃分後,我們再來理解其內部的讀寫執行緒控制就容易的多了,下面的文章中,我會對讀鎖與寫鎖的獲取分別進行討論。

讀鎖的獲取

因為當呼叫ReentrantReadWriteLock中的ReadLock的lock()方法時,最終會走Sync中的tryAcquireShared(int unused)方法,來判斷能否獲取寫鎖。那現在我們就來看看該方法的具體實現。具體程式碼如下所示:

   protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            //(1)判斷當前是否有寫鎖,有直接返回
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
               
            int r = sharedCount(c);
             //(2)獲取當前讀鎖的狀態,判斷是否小於最大值,
             //同時根據公平鎖,還是非公平鎖的模式,判斷當前執行緒是否需要阻塞,
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
	                compareAndSetState(c, c + SHARED_UNIT)) {
                //(3)如果是不要阻塞,且寫狀態小於最大值,則設定當前執行緒重進入的次數
                if (r == 0) {
			        //如果當前讀狀態為0,則設定當前讀執行緒為,當前執行緒為第一個讀執行緒。
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
		            //計算第一個讀執行緒,重進入的次數
                    firstReaderHoldCount++;
                } else {
	                //通過ThreadLocl獲取讀執行緒中進入的鎖
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;//獲取共享同步狀態成功
            }
            //(4)當獲取讀狀態失敗後,繼續嘗試獲取讀鎖,
            return fullTryAcquireShared(current);
        }
複製程式碼
  • (1)根據當前的同步狀態,判斷是否存在寫鎖,且當前擁有寫鎖的執行緒不是當前執行緒,那麼直接返回-1,需要注意的是如果該方法返回值為負數,那麼會將該請求執行緒加入到同步佇列中的等待佇列中。(對該方法不是很熟的小夥伴,建議檢視 Java併發程式設計之鎖機制之AQS(AbstractQueuedSynchronizer)
  • (2)獲取當前讀鎖的狀態,判斷是否小於最大值,同時根據公平鎖,還是非公平鎖的模式,判斷當前執行緒是否需要阻塞
  • (3)如果條件(2)滿足,則設定分別第一個讀取執行緒重進入的次數後續執行緒重進入的次數
  • (4)如果條件(2)不滿足,在再次嘗試獲取讀鎖。

在讀鎖的獲取中,涉及到的方法較為複雜,所以下面會對每個步驟中涉及到的方法,進行介紹。

步驟(1)中如何判斷是否有寫鎖?

在讀鎖的獲取中的步驟(1)中,程式碼中會呼叫exclusiveCount(int c)方法來判當前是否存在寫鎖。而該方法是屬於Sync中的方法,具體程式碼如下所示:

    abstract static class Sync extends AbstractQueuedSynchronizer {

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;//最大狀態數為2的16次方-1
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /*返回當前的讀狀態*/
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /*返回當前的寫狀態 */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
       }
複製程式碼

從程式碼中我們可以看出,只是簡單的執行了c & EXCLUSIVE_MASK,也就是S&0x0000FFFF,結合我們上文中我們所講的讀寫狀態的區分,我相信exclusiveCount(int c)sharedCount(int c)方法是不難理解的。

步驟(2)中如何判斷是公平鎖與非公平鎖。

在步驟(2)中,我們發現呼叫了readerShouldBlock()方法,而該方法是Sync類中的抽象方法。在ReentrantReadWriteLock類中,公平鎖與非公平鎖進行了相應的實現,具體程式碼如下圖所示:

	//公平鎖
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock(){return hasQueuedPredecessors();}
        final boolean readerShouldBlock(){return hasQueuedPredecessors();
        }
    }
    //非公平鎖
    static final class NonfairSync extends Sync {
        final boolean writerShouldBlock() { return false;}
        final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}
    }
複製程式碼

這裡就不再對公平鎖與非公平鎖進行分析了。在文章 Java併發程式設計之鎖機制之重入鎖中已經對這個知識點進行了分析。有興趣的小夥伴可以參考該文章。

步驟(3)中為毛要記錄第一個獲取寫鎖的執行緒?執行緒的重進入是如何實現的?

在ReentrantReadWriteLock類中分別定義了Thread firstReaderint firstReaderHoldCount變數來記錄當前第一個獲取寫鎖的執行緒以及其重進入的次數。官方的給的解釋是便於跟蹤與記錄執行緒且這種記錄是非常廉價的。也就是說,之所以單獨定義一個變數來記錄第一個獲取獲取寫鎖的執行緒,是為了在眾多的讀執行緒中區分執行緒,也是為了以後的除錯與跟蹤。

當我們解決了第一個問題後,現在我們來解決第二個問題。這裡我就不在對第一個執行緒如何記錄重進入次數進行分析了。我們直接看其他讀執行緒的重進入次數設定。這裡因為篇幅的限制,我就直接講原理,其他執行緒的重進入的次數判斷是通過ThreadLocal來實現的。通過在每個執行緒中的記憶體空間儲存HodlerCount類(用於記錄當前執行緒獲取鎖的次數),來獲取相應的次數。具體程式碼如下所示:

   static final class HoldCounter {
            int count;//記錄當前執行緒進入的次數
            final long tid = getThreadId(Thread.currentThread());
        }
    
   static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
     
   private transient ThreadLocalHoldCounter readHolds;
複製程式碼

如果有小夥伴不熟悉ThreadLocal,可以參看該篇文章《Android Handler機制之ThreadLocal》

步驟(4)中繼續嘗試獲取讀鎖?

當第一次獲取讀鎖失敗的時候,會呼叫fullTryAcquireShared(Thread current)方法會繼續嘗試獲取鎖。該函式返回的三個條件為:

  • 當前已經存在寫鎖了。直接加入同步佇列中的等待佇列中。
  • 當前寫鎖的次數超過最大值,直接丟擲異常
  • 獲取讀鎖成功。直接返回

具體程式碼如下所示:

    final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {//注意這裡的for迴圈
                int c = getState();
                if (exclusiveCount(c) != 0) {//(1)存在寫鎖直接返回
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)//(2)鎖迭代次數超過最大值。丟擲異常
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {//(3)獲取鎖成功,記錄次數
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
複製程式碼

因為該方法和上文提到的tryAcquireShared(int unused)方法較為類似。所以這裡就不再對其中的邏輯再次講解。大家需要注意的是該方法會自旋式的獲取鎖

寫鎖的獲取

瞭解了讀鎖的獲取,再來了解寫鎖的獲取就非常簡單了。寫鎖的獲取最終會走Sync中的tryAcquire(int acquires)方法。具體程式碼如下所示:

   protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            //(1)獲取同步狀態 = 寫狀態+讀狀態,單獨獲取寫狀態
            int c = getState();
            int w = exclusiveCount(c);
            //(2)如果c!=0則表示有執行緒操作
            if (c != 0) {
                // (2.1)沒有寫鎖執行緒,則表示有讀執行緒,則直接獲取失敗,並返回
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                    
                 //(2.2)如果w>0則,表示當前執行緒為寫執行緒,則計算當前重進入的次數,如果已經飽和,則丟擲異常
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                    
                // (2.3)獲取成功,直接記錄當前寫狀態
                setState(c + acquires);
                return true;
            }
            //(3)沒有執行緒獲取讀寫鎖,根據當前鎖的模式與設定寫狀態是否成功,判斷是否需要阻塞執行緒
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            //(4)第一次進入,獲取成功   
            setExclusiveOwnerThread(current);
            return true;
        }
複製程式碼

為了幫助大家理解,我這裡將該方法分為了一下幾個步驟:

  • (1)獲取同步狀態 c(寫狀態+讀狀態),並單獨獲取寫狀態w
  • (2)如果c!=0則表示有執行緒操作。
  • (2.1)沒有寫鎖執行緒,則表示有讀執行緒,則直接獲取失敗,並返回。
  • (2.2)如果w>0則,表示當前執行緒為寫執行緒,則計算當前重進入的次數,如果已經飽和,則丟擲異常
  • (2.3)獲取成功,直接記錄當前寫狀態。
  • (3)在(2)條件不滿足的條件下,沒有執行緒獲取讀寫鎖,根據當前鎖的模式與設定寫狀態是否成功,判斷是否需要阻塞執行緒
  • (4)在(2)(3)條件都不滿足的情況下,則為第一次進入,那麼就獲取成功 。

相信結合以上步驟。再來理解程式碼就非常容易了。

鎖降級

讀寫鎖除了保證寫操作對讀操作的可見性以及併發性的提升之外,讀寫鎖也能簡化讀寫互動的程式設計方式,試想一種情況,在程式中我們需要定義一個共享的用作快取資料結構,並且其大部分時間提供讀服務(例如查詢和搜尋),而寫操作佔有的時間很少,但是我們又希望寫操作完成之後的更新需要對後續的讀操作可見。那麼該怎麼實現呢?參看如下例子:

public class CachedData {
    Object data;
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    
    void processCachedData() {
        rwl.readLock().lock();
        if (!cacheValid) {
            //如果快取過期,釋放讀鎖,並獲取寫鎖
            rwl.readLock().unlock();
            rwl.writeLock().lock();(1)
            try {
                //重新檢查快取是否過期,因為有可能在當前執行緒操作之前,其他寫執行緒有可能改變快取狀態
                if (!cacheValid) {
                    data = ...//重新寫入資料
                    cacheValid = true;
                }
                // 獲取讀鎖
                rwl.readLock().lock();(2)
            } finally {
	            //釋放寫鎖
                rwl.writeLock().unlock(); (3)
            }
        }

        try {
            use(data);//操作使用資料
        } finally {
            rwl.readLock().unlock();//最後釋放讀鎖
        }
    }
}
複製程式碼

在上述例子中,如果資料快取過期,也就是cacheValid變數(volatile 修飾的布林型別)被設定為false,那麼所有呼叫processCachedData()方法的執行緒都能感知到變化,但是隻有一個執行緒能過獲取到寫鎖。其他執行緒會被阻塞在讀鎖和寫鎖的lock()方法上。當前執行緒獲取寫鎖完成資料準備之後,再獲取讀鎖,隨後釋放寫鎖(上述程式碼的(1)(2)(3)三個步驟),這種在擁有寫鎖的情況下,在獲取讀鎖。隨後釋放寫鎖的過程,稱之為鎖降級(在讀寫鎖內部實現中,是支援鎖鎖降級的)

那接下來,我個問題想問大家,為什麼當執行緒獲取寫鎖,修改資料完成後,要先獲取讀鎖呢,而不直接釋放寫鎖呢?,其實原因很簡單,如果當前執行緒直接釋放寫鎖,那麼這個時候如果有其他執行緒獲取了寫鎖,並修改了資料。那麼對於當前釋放寫鎖的執行緒是無法感知資料變化的。先獲取讀鎖的目的,就是保證沒有其他執行緒來修改資料啦。

總結

  • ReentrantReadWriteLock最多支援 65535 個遞迴寫入鎖和65535個讀取鎖。
  • ReentrantReadWriteLock中用同一int變數的高16位表示讀狀態低16位表示寫狀態
  • ReentrantReadWriteLock支援公平鎖與非公平鎖模式。
  • ReentrantReadWriteLock支援鎖的降級。