1. 程式人生 > >讀寫鎖 ReentrantReadWriteLock

讀寫鎖 ReentrantReadWriteLock

readers bee 初始 unit 隊列 .net tps countdown volatile

一、讀寫鎖 ReadWriteLock概念特點
讀寫鎖維護了一對相關的鎖,一個用於只讀操作,一個用於寫入操作。只要沒有writer,讀取鎖可以由多個reader線程同時保持。寫入鎖是獨占的。

互斥鎖【ReetrantLock】一次只允許一個線程訪問共享數據,哪怕進行的是只讀操作;讀寫鎖【ReadWriteLock】允許對共享數據進行更高級別的並發訪問:對於寫操作,一次只有一個線程(write線程)可以修改共享數據,對於讀操作,允許任意數量的線程同時進行讀取。writer可以獲取讀取鎖,但reader不能獲取寫入鎖。寫入鎖降級為讀取鎖,實現方式是:先獲取寫入鎖,然後獲取讀取鎖,最後釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不可能的。


讀寫鎖的讀取鎖和寫入鎖都支持鎖獲取期間的中斷。並且寫入鎖提供了一個 Condition 實現,對於寫入鎖來說,該實現的行為與 ReentrantLock.newCondition() 提供的 Condition 實現對 ReentrantLock 所做的行為相同。當然,此 Condition 只能用於寫入鎖。讀取鎖不支持 Condition,readLock().newCondition() 會拋出 UnsupportedOperationException。

二、實現原理及核心代碼

讀寫鎖也是基於AQS實現。

AQS以單個 int 類型的原子變量來表示其狀態,定義了4個抽象方法( tryAcquire(int)、tryRelease(int)、tryAcquireShared(int)、tryReleaseShared(int),前兩個方法用於獨占/排他模式,後兩個用於共享模式 )留給子類實現,用於自定義同步器的行為以實現特定的功能。

對於 ReentrantLock,它是可重入的獨占鎖,內部的 Sync 類實現了 tryAcquire(int)、tryRelease(int) 方法,並用狀態的值來表示重入次數,加鎖或重入鎖時狀態加 1,釋放鎖時狀態減 1,狀態值等於 0 表示鎖空閑。

對於 CountDownLatch,它是一個關卡,在條件滿足前阻塞所有等待線程,條件滿足後允許所有線程通過。內部類 Sync 把狀態初始化為大於 0 的某個值,當狀態大於 0 時所有wait線程阻塞,每調用一次 countDown 方法就把狀態值減 1,減為 0 時允許所有線程通過。利用了AQS的共享模式。


【AQS一個狀態如何表示多個讀鎖與單個寫鎖呢,】一個狀態是沒法既表示讀鎖,又表示寫鎖的,不夠用啊,那就辦成兩份用了,客家話說一個飯粒咬成兩半吃,狀態的高位部分表示讀鎖,低位表示寫鎖,由於寫鎖只有一個,所以寫鎖的重入計數也解決了,這也會導致寫鎖可重入的次數減小。

【如何表示每個讀鎖、寫鎖的重入次數呢】由於讀鎖可以同時有多個,肯定不能再用辦成兩份用的方法來處理了,但我們有 ThreadLocal,可以把線程重入讀鎖的次數作為值存在 ThreadLocal 裏。AQS 的狀態是32位(int 類型)的,辦成兩份,讀鎖用高16位,表示持有讀鎖的線程數(sharedCount),寫鎖低16位,表示寫鎖的重入次數(exclusiveCount)。狀態值為 0 表示鎖空閑,sharedCount不為 0 表示分配了讀鎖,exclusiveCount 不為 0 表示分配了寫鎖,sharedCount和exclusiveCount 肯定不會同時不為 0。
【讀、寫鎖的公平性如何實現】對於公平性的實現,可以通過AQS的等待隊列和它的抽象方法來控制,在狀態值的另一半裏存儲當前持有讀鎖的線程數。如果讀線程申請讀鎖,當前寫鎖重入次數不為 0 時,則等待,否則可以馬上分配;如果是寫線程申請寫鎖,當前狀態為 0 則可以馬上分配,否則等待。

abstract static class Sync extends AbstractQueuedSynchronizer {

    // 
     // 
       static final int SHARED_SHIFT   = 16;

       // 由於讀鎖用高位部分,所以讀鎖個數加1,其實是狀態值加 2^16
       static final int SHARED_UNIT    = (1 << SHARED_SHIFT);

       // 寫鎖的可重入的最大次數、讀鎖允許的最大數量
       static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

       // 寫鎖的掩碼,用於狀態的低16位有效值
       static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

       // 讀鎖計數,當前持有讀鎖的線程數
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

    // 寫鎖的計數,也就是它的重入次數
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
/**
     * 每個線程特定的 read 持有計數。存放在ThreadLocal,不需要是線程安全的。
     */
    static final class HoldCounter {
        int count = 0;

        // 使用id而不是引用是為了避免保留垃圾。註意這是個常量。
        final long tid = Thread.currentThread().getId();
    }

    /**
     * 采用繼承是為了重寫 initialValue 方法,這樣就不用進行這樣的處理:
     * 如果ThreadLocal沒有當前線程的計數,則new一個,再放進ThreadLocal裏。
     * 可以直接調用 get。
     * */
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

    /**
     * 保存當前線程重入讀鎖的次數的容器。在讀鎖重入次數為 0 時移除。
     */
    private transient ThreadLocalHoldCounter readHolds;

    /**
     * 最近一個成功獲取讀鎖的線程的計數。這省卻了ThreadLocal查找,
     * 通常情況下,下一個釋放線程是最後一個獲取線程。這不是 volatile 的,
     * 因為它僅用於試探的,線程進行緩存也是可以的
     * (因為判斷是否是當前線程是通過線程id來比較的)。
     */
    private transient HoldCounter cachedHoldCounter;

    /**
     * firstReader是這樣一個特殊線程:它是最後一個把 共享計數 從 0 改為 1 的
     * (在鎖空閑的時候),而且從那之後還沒有釋放讀鎖的。如果不存在則為null。
     * firstReaderHoldCount 是 firstReader 的重入計數。
     *
     * firstReader 不能導致保留垃圾,因此在 tryReleaseShared 裏設置為null,
     * 除非線程異常終止,沒有釋放讀鎖。
     *
     * 作用是在跟蹤無競爭的讀鎖計數時非常便宜。
     *
     * firstReader及其計數firstReaderHoldCount是不會放入 readHolds 的。
     */
    private transient Thread firstReader = null;
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // 確保 readHolds 的內存可見性,利用 volatile 寫的內存語義。
    }
}

//寫鎖的獲取與釋放
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) { // 狀態不為0,表示鎖被分配出去了。

        // (Note: if c != 0 and w == 0 then shared count != 0)
      // c != 0 and w == 0 表示分配了讀鎖
      // w != 0 && current != getExclusiveOwnerThread() 表示其他線程獲取了寫鎖。
        if (w == 0 || current != getExclusiveOwnerThread())
            return false ;

        // 寫鎖重入
        // 檢測是否超過最大重入次數。
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");

        // 更新寫鎖重入次數,寫鎖在低位,直接加上 acquire 即可。
        // Reentrant acquire
        setState(c + acquires);
        return true ;
    }

    // writerShouldBlock 留給子類實現,用於實現公平性策略。
    // 如果允許獲取寫鎖,則用 CAS 更新狀態。
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false ; // 不允許獲取鎖 或 CAS 失敗。

    // 獲取寫鎖超過,設置獨占線程。
    setExclusiveOwnerThread(current);
    return true;
}

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively()) // 是否是當前線程持有寫鎖
        throw new IllegalMonitorStateException();

    // 這裏不考慮高16位是因為高16位肯定是 0。
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread( null); // 寫鎖完全釋放,設置獨占線程為null。
    setState(nextc);
    return free;
}

//讀鎖的獲取與釋放
// 參數變為 unused 是因為讀鎖的重入計數是內部維護的。
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();

    // 這個if語句是說:持有寫鎖的線程可以獲取讀鎖。
    if (exclusiveCount(c) != 0 && // 已分配了寫鎖
        getExclusiveOwnerThread() != current) // 且當前線程不是持有寫鎖的線程
        return -1;

    int r = sharedCount(c); // 取讀鎖計數
    if (!readerShouldBlock() && // 由子類根據其公平策略決定是否允許獲取讀鎖
        r < MAX_COUNT &&           // 讀鎖數量還沒達到最大值

        // 嘗試獲取讀鎖。註意讀線程計數的單位是  2^16
        compareAndSetState(c, c + SHARED_UNIT)) {
         // 成功獲取讀鎖

     // 註意下面對firstReader的處理:firstReader是不會放到readHolds裏的
     // 這樣,在讀鎖只有一個的情況下,就避免了查找readHolds。
        if (r == 0) { // 是 firstReader,計數不會放入  readHolds。
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { // firstReader 重入
            firstReaderHoldCount++;
        } else {
             // 非 firstReader 讀鎖重入計數更新
            HoldCounter rh = cachedHoldCounter; // 首先訪問緩存
            if (rh == null || rh.tid != current.getId())
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 獲取讀鎖失敗,放到循環裏重試。
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
           // 寫鎖被分配,非寫鎖線程獲取讀鎖,失敗
                return -1;
            // 否則,當前線程持有寫鎖,在這裏阻塞將導致死鎖。

        } else if (readerShouldBlock()) {
            // 寫鎖空閑  且  公平策略決定 線程應當被阻塞
            // 下面的處理是說,如果是已獲取讀鎖的線程重入讀鎖時,
            // 即使公平策略指示應當阻塞也不會阻塞。
            // 否則,這也會導致死鎖的。
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId()) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 需要阻塞且是非重入(還未獲取讀鎖的),獲取失敗。
                if (rh.count == 0)
                    return -1;
            }
        }

        // 寫鎖空閑  且  公平策略決定線程可以獲取讀鎖
        if (sharedCount(c) == MAX_COUNT) // 讀鎖數量達到最多
            throw new Error( "Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 申請讀鎖成功,下面的處理跟tryAcquireShared是類似的。

            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
           // 設定最後一次獲取讀鎖的緩存
                if (rh == null)
                    rh = cachedHoldCounter;

                if (rh == null || rh.tid != current.getId())
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;

                cachedHoldCounter = rh; // 緩存起來用於釋放
            }
            return 1;
        }
    }
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 清理firstReader緩存 或 readHolds裏的重入計數
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            // 完全釋放讀鎖
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count; // 主要用於重入退出
    }
    // 循環在CAS更新狀態值,主要是把讀鎖數量減 1
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 釋放讀鎖對其他讀線程沒有任何影響,
            // 但可以允許等待的寫線程繼續,如果讀鎖、寫鎖都空閑。
            return nextc == 0;
    }
}

https://coderbee.net/index.php/concurrent/20140214/792/comment-page-1#comment-5314

Java 讀寫鎖 ReentrantReadWriteLock 源碼分析

讀寫鎖 ReentrantReadWriteLock