1. 介紹

本文我們繼續探究使用AQS的子類ReentrantReadWriteLock(讀寫鎖)。老規矩,先貼一下類圖



ReentrantReadWriteLock這個類包含讀鎖和寫鎖,這兩種鎖都存在是否公平的概念,這個後面會細講。

此類跟ReentrantLock類似,有以下幾種性質:

  • 可選的公平性政策
  • 重入,讀鎖和寫鎖同一個執行緒可以重複獲取寫鎖可以獲取讀鎖,反之不能
  • 鎖的降級,重入還可以通過獲取寫鎖,然後獲取到讀鎖,通過釋放寫鎖的方式,從而寫鎖降級為讀鎖。 然而,從讀鎖升級到寫鎖是不可能的。
  • 獲取讀寫鎖期間,支援不可中斷

2. 原始碼剖析

先講幾個必要的知識點,然後我們再對寫鎖的獲取與釋放讀鎖的獲取與釋放進行講解,中間穿插著講公平與非公平的實現。

知識點一:

內部類Sync中,將AQS中的state(private volatile int state;長度是32位)邏輯分成了兩份,高16位代表讀鎖持有的count,低16位代表寫鎖持有的count

Sync

    ...
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
...

This lock supports a maximum of 65535 recursive write locks and 65535 read locks. Attempts to exceed these limits result in Error throws from locking methods.(讀鎖與寫鎖都最大支援65535個)

知識點二

HoldCounter的作用,一個計數器記錄每個執行緒持有的讀鎖count。使用ThreadLocal維護。快取在cachedHoldCounter

        static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

使用ThreadLocal維護

        static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

維護最後一個使用HoldCounter的執行緒。簡言之就是,假如A執行緒持有讀鎖,A執行緒重入獲取讀鎖,在它之後沒有其他執行緒獲取讀鎖,那麼當獲取HoldCounter時,可以直接將cachedHoldCounter賦值給該執行緒,就不用從ThreadLocal中去查詢了(ThreadLocal內部維持一個 Map ,想獲取當前執行緒的值就需要去遍歷查詢),這樣做可以節約時間。

        private transient HoldCounter cachedHoldCounter;

當前執行緒持有的重入讀鎖count,當某個執行緒持有的count降至0,將被刪除。

        private transient ThreadLocalHoldCounter readHolds;

初始化在建構函式或readObject

        Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
        private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}

知識點三:

是否互斥 :

讀操作 寫操作
讀操作
寫操作

只有讀讀不互斥,其餘都互斥

獲取到了寫鎖,也有資格獲取讀鎖,反之不行.

知識點四

ReentranReadWriteLock,實現了ReadWriteLock介面。

public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock(); /**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}

ReadWriteLock維護著寫鎖讀鎖寫鎖是排他的,而讀鎖可以同時由多個執行緒持有。與互斥鎖相比,讀寫鎖的粒度更細

有了上面的知識,等會理解下面的原始碼就更容易了,故事從下面幾個變數開始~

    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock readLock = rwl.readLock();
private final Lock writeLock = rwl.writeLock();

ReentrantReadWriteLock的構造器,預設是非公平模式


/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {
this(false);
} /**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

2.1 寫鎖

寫鎖,排他鎖;一個執行緒獲取了寫鎖,其他執行緒只能等待

2.1.1 寫鎖的獲取

    writeLock.lock();

java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock

public void lock() {
sync.acquire(1);
}

又來到了AQSacquire方法

    public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

我們看ReentrantReadWriteLock是如何實現tryAcquire

 protected final boolean tryAcquire(int acquires) {

            /*
* 對下面的條件做一個總述:
* 1. 當讀鎖或者寫鎖的count不為零時,同時擁有者不是當前執行緒,返回false
* 2. 當count達到飽和(超過最大的限制65535),返回false
* 3. 如果上面的情況都不是,該執行緒有資格去獲取鎖,如果它是可重入獲取鎖或佇列策略允許它。那麼就更新state並且設定鎖的擁有者
*/
// 結合總述看下面的程式碼,很容易就看懂了 Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 存在讀鎖或寫鎖
if (c != 0) {
// 情況分析: 1. w = 0, 表示已經獲取了讀鎖(不管是自己還是其他執行緒),直接返回false
// 2. 寫鎖不為零,且當前鎖的擁有者不是當前執行緒,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
} // 不存在讀鎖或寫鎖被佔用的情況
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

上面程式碼中的writerShouldBlock方法就是tryAcquire控制公平與否的關鍵,我們分別看看公平與非公平是如何實現的

2.1.1.1 寫鎖的獲取(非公平)

預設情況下是非公平的

 static final class NonfairSync extends Sync {
...
// 直接返回false
final boolean writerShouldBlock() {
return false;
}
...

2.1.1.2 寫鎖的獲取(公平)

    static final class FairSync extends Sync {
...
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
...

判斷是否該執行緒前面還有其他執行緒的結點,上一節有講到過。

這裡還貼一下,整個acquire的流程圖

2.1.2 寫鎖的釋放

下面的這段程式碼,記得一定放在finally中

    writeLock.unlock();
        public void unlock() {
sync.release(1);
}
    public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

又看到了熟悉的面孔,但我們主要看的還是tryRelease,

        protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

很簡單,就貼一下release的流程圖

2.2 讀鎖

讀鎖與讀鎖並不互斥,可以存在多個持有讀鎖的執行緒

2.2.1 讀鎖的獲取

        readLock.lock();
        public void lock() {
sync.acquireShared(1);
}
    public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

之前的文章還沒有講解過tryAcquireShared在子類如何實現的。看看如何實現的

  protected final int tryAcquireShared(int unused) {

            /*
* 對下面的條件做一個總述:
* 1. 如果寫鎖被其他執行緒持有,失敗
* 2. 否則,如果此執行緒有資格去獲取寫鎖,首先查詢是否需要阻塞(readerShouldBlock).如果不需要,就通過CAS更新state。
*
* 3. 如果上面兩步都失敗了,可能是當前執行緒沒有資格(需要被阻塞),或CAS失敗,或者count數量飽和了,然後執行fullTryAcquireShared進行重試
*/
Thread current = Thread.currentThread();
int c = getState();
// 這裡可以看出,若該執行緒持有寫鎖,同樣也可以去獲取讀鎖
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c); // readerShouldBlock表示此執行緒是否被阻塞(後面細談)
// 多個執行緒執行compareAndSetState(c, c + SHARED_UNIT), 只有一個執行緒可以執行成功,其餘執行緒去執行fullTryAcquireShared
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// firstReader是第一個獲取到讀鎖的執行緒
// firstReaderHoldCount是firstReader持有的count
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 重入
firstReaderHoldCount++;
} else {
// sync queue還存在其他執行緒持有讀鎖,且該執行緒不是第一個持有讀鎖的執行緒
// 結合上面的知識點二,理解下面的邏輯
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 到達這裡的執行緒,1. 沒有資格獲取(需要阻塞) 2. CAS失敗 3. 讀鎖count飽和
return fullTryAcquireShared(current);
}

Full version of acquire for reads, that handles CAS misses and reentrant reads not dealt with in tryAcquireShared.


final int fullTryAcquireShared(Thread current) { // 主要邏輯跟上面的tryAcquireShared類似。tryAcquireShared的邏輯只有一個執行緒會成功CAS,
// 其餘的執行緒都進入fullTryAcquireShared,進行重試(程式碼不那麼複雜)
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) { // 需要被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 此時, 如果該執行緒前面已經有執行緒獲取了讀鎖,且當前執行緒持有的讀鎖count為0,從readHolds除去。
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 這裡表示該執行緒還是一個new reader, 還沒有持有讀鎖
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 這裡只有一個執行緒會成功,若CAS失敗,一直重試直到成功,或者讀鎖count飽和,或者需要被阻塞為止
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

我們上面看到了readerShouldBlock,這個方法是控制讀鎖獲取公平與否,下面我們分別看看在非公平與公平模式下的實現

2.2.1.1 讀鎖的獲取(非公平)

    static final class NonfairSync extends Sync {
...
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
...
}

判斷sync queue的head的後繼結點是否是寫鎖(獨佔模式)

    final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

上面的方法是,獲取讀鎖時,避免導致寫鎖飢餓(indefinite writer starvation)的一個措施,下面我們對它進行詳細的解釋



結合上面的圖片,我們設想有一個這樣的情況,寫鎖沒有被獲取,執行緒A獲取到了讀鎖,此時另一個執行緒X想要獲取寫鎖,但是寫鎖與讀鎖互斥,所以此時將執行緒X代表的node新增到sync queue中,等待讀鎖被釋放,才有資格去獲取寫鎖

上面的情況 + 不存在判斷sync queue的head的後繼結點是否是寫鎖(apparentlyFirstQueuedIsExclusive)的方法時,我們看看會出什麼問題

時刻一: 執行緒B執行緒C執行緒D是新建執行緒想要去獲取讀鎖(new reader),此時因為不存在寫鎖被獲取,所以執行緒B執行緒C執行緒D都會在fullTryAcquireShared中不斷重試,最終都獲得讀鎖

時刻二: 執行緒A釋放,會執行unparkSuccessor,此時執行緒X被喚醒,但是執行到tryAcquire,又檢測到讀鎖被持有(不管是自己還是是其他執行緒)執行緒X又被阻塞。執行緒B釋放,還是會出現這種情況,只有等到最後一個讀鎖被釋放執行緒X才能獲取到寫鎖。但是想想如果後面一連串的讀鎖執行緒X不是早就被'餓死了'

apparentlyFirstQueuedIsExclusive,就可以防止這種'寫鎖飢餓'的情況發生。執行緒B執行緒C執行緒D只有被阻塞,等待執行緒X獲取到寫鎖,才有機會獲取讀鎖

2.2.1.2 讀鎖的獲取(公平)


/**
* Fair version of Sync
*/
static final class FairSync extends Sync {
... final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
...
}

這裡關於讀鎖的獲取(公平與非公平)分析完了,貼一張整個acquireShared的流程圖

2.2.2 讀鎖的釋放

        readLock.unlock();
        public void unlock() {
sync.releaseShared(1);
}

AQS

    public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 喚醒head後驅結點
doReleaseShared();
return true;
}
return false;
}
 // unused = 1
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// 獲取讀鎖時,會執行 compareAndSetState(c, c + SHARED_UNIT),這裡就是減SHARED_UNIT
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 釋放讀鎖,對需要讀鎖的執行緒沒有影響(讀讀不互斥)
// 但是,這裡如果返回true,可以喚醒被阻塞的想要持有寫鎖的執行緒
return nextc == 0;
}
}

3. 總結

  1. ReentrantReadWriteLock中的防止'寫鎖飢餓'的操作,值得一看
  2. AQS中的state(private volatile int state;),邏輯分為高16位(代表讀鎖的state),低16位(代表寫鎖的state),是一個值得學習的辦法
  3. 使用ThreadLocal維護每一個現成的讀鎖的重入數,使用cachedHoldCounter維護最後一個使用HoldCounter的讀鎖執行緒,節省在ThreadLocal中的查詢

使用讀寫鎖的情況,應該取決於與修改相比,讀取資料的頻率,讀取和寫入操作持續的時間。

例如:

  • 某個集合不經常修改,但是對其元素搜尋很頻繁,使用讀寫鎖就是最佳選擇。(簡言之,就是讀多寫少)
  • 現在也是讀多寫少,但是讀操作時間很短,只有一小段程式碼,而讀寫鎖比互斥鎖更加複雜,開銷可能大於互斥鎖,這種情況使用讀寫鎖可能不合適。此時就要通過效能分析,判斷使用讀寫鎖在系統中是否合適。

4. 參考