深入理解 ReentrantReadWriteLock
ReentrantLock
是排它鎖,它在同一時刻只允許一個執行緒進行訪問。在很多場景中,讀服務遠多於寫服務,而讀服務之間不存在資料競爭問題,在一個執行緒讀資料時禁止其他讀執行緒訪問,會導致效能降低。
所以就有了讀寫鎖,它在同一時刻可以允許多個讀執行緒訪問,但在寫執行緒訪問時,則所有的讀執行緒和其他寫執行緒都會被阻塞。讀寫鎖內部維護了一個讀鎖和一個寫鎖,如此將讀寫鎖分離,可以很大地提升併發性和吞吐量。
ReadWriteLock
ReadWriteLock 介面定義了讀鎖和寫鎖的兩個方法:
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); } 複製程式碼
其中readLock()
方法用於返回讀操作的鎖,writeLock()
用於返回寫操作的鎖。
實現類
ReentrantReadWriteLock
實現了ReadWriteLock
介面,它的幾個重要屬性如下:
// 內部類 ReadLock,讀鎖 private final ReentrantReadWriteLock.ReadLock readerLock; // 內部類 WriteLock 寫鎖 private final ReentrantReadWriteLock.WriteLock writerLock; // 同步器,讀寫和寫鎖依賴於它 final Sync sync; 複製程式碼
其中有兩個構造方法,主要如下:
public ReentrantReadWriteLock() { this(false); } // 指定公平性 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } 複製程式碼
public static class ReadLock implements Lock, java.io.Serializable { private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } ··· } public static class WriteLock implements Lock, java.io.Serializable { private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } ··· } 複製程式碼
可以看到,ReentrantReadWriteLock
鎖的主體依然是Sync
,讀鎖和寫鎖都依賴與Sync
來實現,它們使用的是同一個鎖,只是在獲取鎖和釋放鎖的方式不同。
讀寫狀態
在ReentrantLock
中使用一個int
型變數state
來表示同步狀態,該值表示鎖被一個執行緒重複獲取的次數,而讀寫鎖中需要一個int
型變數上維護多個讀執行緒和一個寫執行緒的狀態。
所以它將該變數分為兩部分,高16
位表示讀,低16
位表示寫。分割之後通過位運算來計算讀鎖和寫鎖的狀態。
static final int SHARED_SHIFT= 16; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 讀鎖狀態 static int sharedCount(int c){ return c >>> SHARED_SHIFT; } // 寫鎖狀態 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } 複製程式碼
內部工作狀態
另外,ReentrantReadWriteLock
還提供了返回內部工作狀態的方法。
方法名 | 描述 |
---|---|
getReadLockCount | 返回讀鎖被獲取的次數(鎖重入次數也會加 1) |
isWriteLocked | 返回寫鎖是否被獲取 |
getWriteHoldCount | 返回當前執行緒獲取寫鎖的次數 |
getReadHoldCount | 返回當前執行緒獲取讀鎖的次數 |
前面三個方法都比較簡單:
final int getReadLockCount() { return sharedCount(getState()); // c >>> SHARED_SHIFT } final boolean isWriteLocked() { return exclusiveCount(getState()) != 0; } // 由於寫鎖只會被一個執行緒獲取 // 所以,如果是當前執行緒,則通過 c & EXCLUSIVE_MASK 直接計算即可 final int getWriteHoldCount() { return isHeldExclusively() ? exclusiveCount(getState()) : 0; } 複製程式碼
最後一個方法,首先來看一下Sync
類的幾個屬性:
// 當前執行緒持有的讀鎖數量 private transient ThreadLocalHoldCounter readHolds; // HoldCounter 的一個快取,減少 ThreadLocal.get 的次數 private transient HoldCounter cachedHoldCounter; // 第一個獲取到讀鎖的讀執行緒 private transient Thread firstReader = null; // 第一個讀執行緒持有的讀鎖數量 private transient int firstReaderHoldCount; // 上面三個都是為了提高效率,如果讀鎖僅有一個或有快取了,就不用去 ThreadLocalHoldCounter 獲取 // 讀執行緒持有鎖的計數器,需要與執行緒繫結 static final class HoldCounter { int count = 0; // 持有執行緒 id,在釋放鎖時,判斷 cacheHoldCounter 快取的是否是當前執行緒的讀鎖數量 final long tid = getThreadId(Thread.currentThread()); } // 通過 ThreadLocal 將 HoldCounter 繫結到執行緒上 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } 複製程式碼
getReadHoldCount()
方法用於獲取當前執行緒獲取讀鎖的次數。
final int getReadHoldCount() { // 如果讀鎖被獲取的次數為 0,那麼當前執行緒獲取讀鎖的次數肯定也為 0 if (getReadLockCount() == 0) return 0; Thread current = Thread.currentThread(); // 如果當前執行緒是第一個獲取讀鎖的執行緒,則直接返回 firstReaderHoldCount if (firstReader == current) return firstReaderHoldCount; // 快取的 HoldCounter 繫結的執行緒是否是當前執行緒,如果是則直接返回讀鎖數量 HoldCounter rh = cachedHoldCounter; if (rh != null && rh.tid == getThreadId(current)) return rh.count; // 否則從 ThreadLocalHoldCounter 中獲取 HoldCounter,再獲取讀鎖數量 int count = readHolds.get().count; if (count == 0) readHolds.remove(); // 防止記憶體洩露 return count; } 複製程式碼
寫鎖
寫鎖是一個支援可重入的排它鎖。
寫鎖的獲取
WriteLock
的lock()
方法如下,可以看到,這裡呼叫的是AQS
的獨佔式獲取鎖方法。
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 複製程式碼
在獲取寫鎖時,呼叫AQS
的acquire
方法,其中又呼叫了Sync
自定義元件實現的tryAcquire
方法:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); // 寫鎖個數 if (c != 0) { // c != 0 && w == 0 表示有執行緒獲取了讀鎖 // 或者當前執行緒不是持有鎖的執行緒,則失敗 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 如果寫鎖會超過範圍,丟擲異常 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 當前執行緒獲取寫鎖,可重入 setState(c + acquires); return true; } // 如果沒有任何執行緒獲取讀鎖和寫鎖,當前執行緒嘗試獲取寫鎖 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } 複製程式碼
這裡如果有執行緒獲取了讀鎖,則當前執行緒不能再獲取寫鎖。因為讀寫鎖需要確保獲取寫鎖的執行緒的操作對於讀鎖的執行緒是可見的,如果存在讀鎖時再允許獲取寫鎖,則獲取讀鎖的執行緒可能無法得知當前獲取寫鎖的執行緒的操作。
判斷獲取寫鎖的執行緒是否應該被阻塞,公平鎖和非公平中實現不同。
static final class NonfairSync extends Sync { // 對於非公平鎖,直接返回 false final boolean writerShouldBlock() { return false; } } static final class FairSync extends Sync { // 對於公平鎖,則需要判斷是否有前驅節點 final boolean writerShouldBlock() { return hasQueuedPredecessors(); } } public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 複製程式碼
寫鎖的釋放
unlock()
方法如下,其中呼叫了AQS
的release
方法:
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 喚醒後繼節點 unparkSuccessor(h); return true; } return false; } 複製程式碼
release()
方法首先呼叫Sync
中的tryRelease()
方法,然後喚醒後繼節點:
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } 複製程式碼
該方法首先減少寫狀態值,如果寫狀態為0
,則表示寫鎖已經被釋放,將持有鎖的執行緒設定為null
,並更改同步狀態值。
讀鎖
讀鎖是一個支援可重入的共享鎖,它能被多個執行緒同時獲取。
讀鎖的獲取
ReadLock
的lock()
方法如下,其中呼叫了AQS
的共享式獲取鎖方法:
public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } 複製程式碼
在acquireShared
方法中,又呼叫了Sync
的tryAcquireShared
方法:
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 存在寫鎖,並且寫鎖被其他執行緒持有,則失敗 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); // 獲取讀鎖的執行緒是否需要阻塞 // 讀鎖小於 MAX_COUNT(1 << 16) // 使用 CAS 更新狀態為 c + 1 << 16 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 沒有讀鎖 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 讀鎖僅有一個 firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) //更新快取 cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 將 HoldCounter 設定到 ThreadLocal 中 readHolds.set(rh); // 讀鎖數量加 1 rh.count++; } return 1; } return fullTryAcquireShared(current); } 複製程式碼
該方法中,如果滿足上述三個條件,則獲取讀鎖成功,會對firstReaderHoldCount
等值進行設定,稍後詳細介紹。如果不滿足時,會呼叫fullTryAcquireShared
方法:
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); // 如果寫鎖不為 0 if (exclusiveCount(c) != 0) { // 當前執行緒不是持有寫鎖的執行緒,返回 if (getExclusiveOwnerThread() != current) return -1; // 讀鎖是否需要被阻塞 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // 讀鎖超出最大範圍 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 使用 CAS 更新狀態值,嘗試獲取鎖 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } 複製程式碼
判斷讀鎖是否應該被阻塞,公平鎖和非公平鎖實現不同,
static final class NonfairSync extends Sync { // 對於非公平鎖,需要判斷同步佇列中第一個結點是否是獨佔式(寫鎖) final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next)!= null && !s.isShared()&& s.thread != null; } static final class FairSync extends Sync { // 對於公平鎖,需要判斷是否有前驅節點 final boolean readerShouldBlock() { return hasQueuedPredecessors(); } } 複製程式碼
讀鎖的釋放
ReadLock
的unlock
方法如下,其中呼叫的是AQS
的共享式釋放鎖方法:
public void unlock() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } 複製程式碼
releaseShared
方法中又呼叫了Sync
的tryReleaseShared
方法:
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 如果當前執行緒是第一個獲取讀鎖的執行緒 if (firstReader == current) { if (firstReaderHoldCount == 1) // 僅獲取了一次,將 firstReader 置為 null firstReader = null; else // 否則將 firstReadHoldCount 減 1 firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 快取如果有效,直接使用;否則重新獲取 rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 迴圈使用 CAS 更新狀態值 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } 複製程式碼
鎖降級
ReentrantReadWriteLock
允許鎖降級,也就是寫鎖降級為讀鎖。它是指先獲取寫鎖,再獲取到讀鎖,最後釋放寫鎖的過程。但鎖升級是不允許的,也就是先獲取讀鎖,再獲取寫鎖,最後釋放讀鎖的過程。
在獲取讀鎖的tryAcquireShared
方法中:
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 存在寫鎖,並且寫鎖被其他執行緒持有,則失敗 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); ··· } 複製程式碼
如果存在寫鎖,並且寫鎖被其他執行緒持有時,才會失敗。說明如果當前執行緒持有了寫鎖,也可以再獲取讀鎖。最後釋放寫鎖,這稱為鎖降級。
為何要這樣做呢?試想如果一個執行緒獲取了寫鎖,這個時候其他任何執行緒都是無法再獲取讀鎖或寫鎖的,然後該執行緒再去獲取讀鎖,也就不會產生任何的競爭。通過這種鎖降級機制,就不會有釋放寫鎖後,再去競爭獲取讀鎖的情況,避免了鎖的競爭和執行緒的上下文切換,也就提高了效率。