1. 程式人生 > >JUC 可重入 讀寫鎖 ReentrantReadWriteLock

JUC 可重入 讀寫鎖 ReentrantReadWriteLock

本文首先發表在 碼蜂筆記

讀寫鎖 ReadWriteLock

讀寫鎖維護了一對相關的鎖,一個用於只讀操作,一個用於寫入操作。只要沒有writer,讀取鎖可以由多個reader執行緒同時保持。寫入鎖是獨佔的。

互斥鎖一次只允許一個執行緒訪問共享資料,哪怕進行的是隻讀操作;讀寫鎖允許對共享資料進行更高級別的併發訪問:對於寫操作,一次只有一個執行緒(write執行緒)可以修改共享資料,對於讀操作,允許任意數量的執行緒同時進行讀取。

與互斥鎖相比,使用讀寫鎖能否提升效能則取決於讀寫操作期間讀取資料相對於修改資料的頻率,以及資料的爭用——即在同一時間試圖對該資料執行讀取或寫入操作的執行緒數。

讀寫鎖適用於讀多寫少的情況。

可重入讀寫鎖 ReentrantReadWriteLock

屬性

ReentrantReadWriteLock 也是基於 AbstractQueuedSynchronizer 實現的,它具有下面這些屬性(來自Java doc文件):

  • 獲取順序:此類不會將讀取者優先或寫入者優先強加給鎖訪問的排序。
    • 非公平模式(預設):連續競爭的非公平鎖可能無限期地推遲一個或多個reader或writer執行緒,但吞吐量通常要高於公平鎖。
    • 公平模式:執行緒利用一個近似到達順序的策略來爭奪進入。當釋放當前保持的鎖時,可以為等待時間最長的單個writer執行緒分配寫入鎖,如果有一組等待時間大於所有正在等待的writer執行緒的reader,將為該組分配讀者鎖。
    • 試圖獲得公平寫入鎖的非重入的執行緒將會阻塞,除非讀取鎖和寫入鎖都自由(這意味著沒有等待執行緒)。
  • 重入:此鎖允許reader和writer按照 ReentrantLock 的樣式重新獲取讀取鎖或寫入鎖。在寫入執行緒保持的所有寫入鎖都已經釋放後,才允許重入reader使用讀取鎖。
    writer可以獲取讀取鎖,但reader不能獲取寫入鎖。
  • 鎖降級:重入還允許從寫入鎖降級為讀取鎖,實現方式是:先獲取寫入鎖,然後獲取讀取鎖,最後釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不可能的。
  • 鎖獲取的中斷:讀取鎖和寫入鎖都支援鎖獲取期間的中斷。
  • Condition 支援:寫入鎖提供了一個 Condition
    實現,對於寫入鎖來說,該實現的行為與 ReentrantLock.newCondition() 提供的 Condition 實現對 ReentrantLock 所做的行為相同。當然,此 Condition 只能用於寫入鎖。
    讀取鎖不支援 ConditionreadLock().newCondition() 會丟擲 UnsupportedOperationException
  • 監測:此類支援一些確定是讀取鎖還是寫入鎖的方法。這些方法設計用於監視系統狀態,而不是同步控制。

實現

AQS 回顧

在之前的文章已經提到,AQS以單個 int 型別的原子變數來表示其狀態,定義了4個抽象方法( tryAcquire(int)、tryRelease(int)、tryAcquireShared(int)、tryReleaseShared(int),前兩個方法用於獨佔/排他模式,後兩個用於共享模式 )留給子類實現,用於自定義同步器的行為以實現特定的功能。

對於 ReentrantLock,它是可重入的獨佔鎖,內部的 Sync 類實現了 tryAcquire(int)、tryRelease(int) 方法,並用狀態的值來表示重入次數,加鎖或重入鎖時狀態加 1,釋放鎖時狀態減 1,狀態值等於 0 表示鎖空閒。

對於 CountDownLatch,它是一個關卡,在條件滿足前阻塞所有等待執行緒,條件滿足後允許所有執行緒通過。內部類 Sync 把狀態初始化為大於 0 的某個值,當狀態大於 0 時所有wait執行緒阻塞,每呼叫一次 countDown 方法就把狀態值減 1,減為 0 時允許所有執行緒通過。利用了AQS的共享模式。

現在,要用AQS來實現 ReentrantReadWriteLock

一點思考問題

  • AQS只有一個狀態,那麼如何表示 多個讀鎖 與 單個寫鎖 呢?
  • ReentrantLock 裡,狀態值表示重入計數,現在如何在AQS裡表示每個讀鎖、寫鎖的重入次數呢?
  • 如何實現讀鎖、寫鎖的公平性呢?

一點提示

  • 一個狀態是沒法既表示讀鎖,又表示寫鎖的,不夠用啊,那就辦成兩份用了,客家話說一個飯粒咬成兩半吃,狀態的高位部分表示讀鎖,低位表示寫鎖,由於寫鎖只有一個,所以寫鎖的重入計數也解決了,這也會導致寫鎖可重入的次數減小。
  • 由於讀鎖可以同時有多個,肯定不能再用辦成兩份用的方法來處理了,但我們有 ThreadLocal,可以把執行緒重入讀鎖的次數作為值存在 ThreadLocal 裡。
  • 對於公平性的實現,可以通過AQS的等待佇列和它的抽象方法來控制,在狀態值的另一半里儲存當前持有讀鎖的執行緒數。如果讀執行緒申請讀鎖,當前寫鎖重入次數不為 0 時,則等待,否則可以馬上分配;如果是寫執行緒申請寫鎖,當前狀態為 0 則可以馬上分配,否則等待。

原始碼分析

現在來看看具體的實現原始碼。

辦成兩份

AQS 的狀態是32位(int 型別)的,辦成兩份,讀鎖用高16位,表示持有讀鎖的執行緒數(sharedCount),寫鎖低16位,表示寫鎖的重入次數 (exclusiveCount)。狀態值為 0 表示鎖空閒,sharedCount不為 0 表示分配了讀鎖,exclusiveCount 不為 0 表示分配了寫鎖,sharedCount和exclusiveCount 肯定不會同時不為 0。

abstract static class Sync extends AbstractQueuedSynchronizer {

    // 
     // 
       static final int SHARED_SHIFT   = 16;

       // 由於讀鎖用高位部分,所以讀鎖個數加1,其實是狀態值加 2^16
       static final int SHARED_UNIT    = (1 << SHARED_SHIFT);

       // 寫鎖的可重入的最大次數、讀鎖允許的最大數量
       static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

       // 寫鎖的掩碼,用於狀態的低16位有效值
       static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

       // 讀鎖計數,當前持有讀鎖的執行緒數
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

    // 寫鎖的計數,也就是它的重入次數
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
讀鎖重入計數
abstract static class Sync extends AbstractQueuedSynchronizer {
    /**
     * 每個執行緒特定的 read 持有計數。存放在ThreadLocal,不需要是執行緒安全的。
     */
    static final class HoldCounter {
        int count = 0;

        // 使用id而不是引用是為了避免保留垃圾。注意這是個常量。
        final long tid = Thread.currentThread().getId();
    }

    /**
     * 採用繼承是為了重寫 initialValue 方法,這樣就不用進行這樣的處理:
     * 如果ThreadLocal沒有當前執行緒的計數,則new一個,再放進ThreadLocal裡。
     * 可以直接呼叫 get。
     * */
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

    /**
     * 儲存當前執行緒重入讀鎖的次數的容器。在讀鎖重入次數為 0 時移除。
     */
    private transient ThreadLocalHoldCounter readHolds;

    /**
     * 最近一個成功獲取讀鎖的執行緒的計數。這省卻了ThreadLocal查詢,
     * 通常情況下,下一個釋放執行緒是最後一個獲取執行緒。這不是 volatile 的,
     * 因為它僅用於試探的,執行緒進行快取也是可以的
     * (因為判斷是否是當前執行緒是通過執行緒id來比較的)。
     */
    private transient HoldCounter cachedHoldCounter;

    /**
     * firstReader是這樣一個特殊執行緒:它是最後一個把 共享計數 從 0 改為 1 的
     * (在鎖空閒的時候),而且從那之後還沒有釋放讀鎖的。如果不存在則為null。
     * firstReaderHoldCount 是 firstReader 的重入計數。
     *
     * firstReader 不能導致保留垃圾,因此在 tryReleaseShared 裡設定為null,
     * 除非執行緒異常終止,沒有釋放讀鎖。
     *
     * 作用是在跟蹤無競爭的讀鎖計數時非常便宜。
     *
     * firstReader及其計數firstReaderHoldCount是不會放入 readHolds 的。
     */
    private transient Thread firstReader = null;
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // 確保 readHolds 的記憶體可見性,利用 volatile 寫的記憶體語義。
    }
}

寫鎖獲取與釋放

寫鎖的獲取與釋放通過 tryAcquiretryRelease 方法實現,原始碼檔案裡有這麼一段說明:tryReleasetryAcquire 可能被 Conditions 呼叫。因此可能出現引數裡包含在條件等待和用 tryAcquire 重新獲取到鎖的期間內已經釋放的 讀和寫 計數。

這說明看起來像是在 tryAcquire 裡設定狀態時要考慮方法引數(acquires)的高位部分,其實是不需要的。由於寫鎖是獨佔的,acquires 表示的只能是寫鎖的計數,如果當前執行緒成功獲取寫鎖,只需要簡單地把當前狀態加上 acquires 的值即可,tryRelease 裡直接減去其引數值即可。

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) { // 狀態不為0,表示鎖被分配出去了。

        // (Note: if c != 0 and w == 0 then shared count != 0)
      // c != 0 and w == 0 表示分配了讀鎖
      // w != 0 && current != getExclusiveOwnerThread() 表示其他執行緒獲取了寫鎖。
        if (w == 0 || current != getExclusiveOwnerThread())
            return false ;

        // 寫鎖重入
        // 檢測是否超過最大重入次數。
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");

        // 更新寫鎖重入次數,寫鎖在低位,直接加上 acquire 即可。
        // Reentrant acquire
        setState(c + acquires);
        return true ;
    }

    // writerShouldBlock 留給子類實現,用於實現公平性策略。
    // 如果允許獲取寫鎖,則用 CAS 更新狀態。
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false ; // 不允許獲取鎖 或 CAS 失敗。

    // 獲取寫鎖超過,設定獨佔執行緒。
    setExclusiveOwnerThread(current);
    return true;
}

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively()) // 是否是當前執行緒持有寫鎖
        throw new IllegalMonitorStateException();

    // 這裡不考慮高16位是因為高16位肯定是 0。
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread( null); // 寫鎖完全釋放,設定獨佔執行緒為null。
    setState(nextc);
    return free;
}

讀鎖獲取與釋放

// 引數變為 unused 是因為讀鎖的重入計數是內部維護的。
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();

    // 這個if語句是說:持有寫鎖的執行緒可以獲取讀鎖。
    if (exclusiveCount(c) != 0 && // 已分配了寫鎖
        getExclusiveOwnerThread() != current) // 且當前執行緒不是持有寫鎖的執行緒
        return -1;

    int r = sharedCount(c); // 取讀鎖計數
    if (!readerShouldBlock() && // 由子類根據其公平策略決定是否允許獲取讀鎖
        r < MAX_COUNT &&           // 讀鎖數量還沒達到最大值

        // 嘗試獲取讀鎖。注意讀執行緒計數的單位是  2^16
        compareAndSetState(c, c + SHARED_UNIT)) {
         // 成功獲取讀鎖

     // 注意下面對firstReader的處理:firstReader是不會放到readHolds裡的
     // 這樣,在讀鎖只有一個的情況下,就避免了查詢readHolds。
        if (r == 0) { // 是 firstReader,計數不會放入  readHolds。
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { // firstReader 重入
            firstReaderHoldCount++;
        } else {
             // 非 firstReader 讀鎖重入計數更新
            HoldCounter rh = cachedHoldCounter; // 首先訪問快取
            if (rh == null || rh.tid != current.getId())
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 獲取讀鎖失敗,放到迴圈裡重試。
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
           // 寫鎖被分配,非寫鎖執行緒獲取讀鎖,失敗
                return -1;
            // 否則,當前執行緒持有寫鎖,在這裡阻塞將導致死鎖。

        } else if (readerShouldBlock()) {
            // 寫鎖空閒  且  公平策略決定 執行緒應當被阻塞
            // 下面的處理是說,如果是已獲取讀鎖的執行緒重入讀鎖時,
            // 即使公平策略指示應當阻塞也不會阻塞。
            // 否則,這也會導致死鎖的。
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId()) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 需要阻塞且是非重入(還未獲取讀鎖的),獲取失敗。
                if (rh.count == 0)
                    return -1;
            }
        }

        // 寫鎖空閒  且  公平策略決定執行緒可以獲取讀鎖
        if (sharedCount(c) == MAX_COUNT) // 讀鎖數量達到最多
            throw new Error( "Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 申請讀鎖成功,下面的處理跟tryAcquireShared是類似的。

            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
           // 設定最後一次獲取讀鎖的快取
                if (rh == null)
                    rh = cachedHoldCounter;

                if (rh == null || rh.tid != current.getId())
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;

                cachedHoldCounter = rh; // 快取起來用於釋放
            }
            return 1;
        }
    }
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 清理firstReader快取 或 readHolds裡的重入計數
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            // 完全釋放讀鎖
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count; // 主要用於重入退出
    }
    // 迴圈在CAS更新狀態值,主要是把讀鎖數量減 1
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 釋放讀鎖對其他讀執行緒沒有任何影響,
            // 但可以允許等待的寫執行緒繼續,如果讀鎖、寫鎖都空閒。
            return nextc == 0;
    }
}

公平性策略

公平與非公平策略是由 Sync 的子類 FairSyncNonfairSync 實現的。

/**
 * 這個非公平策略的同步器是寫鎖優先的,申請寫鎖時總是不阻塞。
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    final boolean writerShouldBlock() {
        return false; // 寫執行緒總是可以突入
    }
    final boolean readerShouldBlock() {
        /* 作為一個啟發用於避免寫執行緒飢餓,如果執行緒臨時出現在等待佇列的頭部則阻塞,
         * 如果存在這樣的,則是寫執行緒。
         */
        return apparentlyFirstQueuedIsExclusive();
    }
}

/**
 * 公平的 Sync,它的策略是:如果執行緒準備獲取鎖時,
 * 同步佇列裡有等待執行緒,則阻塞獲取鎖,不管是否是重入
 * 這也就需要tryAcqire、tryAcquireShared方法進行處理。
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

現在用奇數表示申請讀鎖的讀執行緒,偶數表示申請寫鎖的寫執行緒,每個數都表示一個不同的執行緒,存在下面這樣的申請佇列,假設開始時鎖空閒:

1  3  5  0  7  9  2  4

讀執行緒1申請讀鎖時,鎖是空閒的,馬上分配,讀執行緒3、5申請時,由於已分配讀鎖,它們也可以馬上獲取讀鎖。
假設此時有執行緒11申請讀鎖,由於它不是讀鎖重入,只能等待。而執行緒1再次申請讀鎖是可以的,因為它的重入。
寫執行緒0申請寫鎖時,由於分配了讀鎖,只能等待,當讀執行緒1、3、5都釋放讀鎖後,執行緒0可以獲取寫鎖。
執行緒0釋放後,執行緒7、9獲取讀鎖,它們釋放後,執行緒2獲取寫鎖,此時執行緒4必須等待執行緒2釋放。
執行緒4線上程2釋放寫鎖後獲取寫鎖,它釋放寫鎖後,鎖恢復空閒。