多執行緒學習筆記五之讀寫鎖實現分析
目錄
簡介
在前一篇部落格多執行緒學習筆記三之ReentrantLock與AQS實現分析
ReentrantReadWriteLock是基於AQS實現的讀寫鎖,內部維護了一個讀鎖(共享鎖)和寫鎖(獨佔鎖)。如果我們要在程式中提供共享的快取資料結構,快取肯定是讀操作(資料查詢)多而寫操作(資料更新)少,只要保證寫操作對後續的讀操作是可見的就行了,這種情況下使用獨佔鎖就不如讀寫鎖的吞吐量大,讀寫鎖中的讀鎖允許多個執行緒獲得讀鎖對資源進行讀操作,寫鎖是傳統的獨佔鎖,只允許單個執行緒獲得寫鎖對資源進行更新。以下是JDK提供基於ReentrantReadWriteLock簡單實現快取結構的Demo:
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 必須先釋放讀鎖再獲取寫鎖 rwl.readLock().unlock(); rwl.writeLock().lock(); try { //再次檢查cacheValid防止其他執行緒獲得寫鎖改變cacheValid值 if (!cacheValid) { data = ... cacheValid = true; } // 寫鎖降級為讀鎖 rwl.readLock().lock(); } finally { //釋放寫鎖 rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
ReentranReadWriteLock的關係圖:
ReentrantReadWriteLock沒有實現Lock介面,實現了ReadWriteLock介面。內部類ReadLock和WriteLock實現Lock介面,ReadLock和WriteLock包含了繼承了AQS的Sync物件,從而提供了共享鎖和獨佔鎖特性的實現。讀寫鎖ReentrantReadWriteLock具有以下特性:
- 可重入,不管是讀鎖還是寫鎖,都是可重入鎖
- 公平鎖和非公平鎖,支援以公平方式或非公平方式(預設方式)獲取讀鎖和寫鎖。
- 支援鎖降級,執行緒獲得寫鎖之後可以降級為讀鎖,具體是先獲取寫鎖,再獲得讀鎖,再釋放寫鎖。但讀鎖不可升級為寫鎖。
讀寫狀態
在實現ReentrantLock時,當一個執行緒去嘗試獲取鎖時,執行緒會去檢查同步器AQS中維護的int型變數state是否為0,同步狀態加一表示當前執行緒成功獲取鎖。而讀寫鎖ReentrantReadWriteLock維護了讀鎖和寫鎖,那麼一個執行緒獲得了鎖,怎麼通過state表明到底是讀鎖還是寫鎖呢?答案是把int型變數切位兩部分,高16位表示讀狀態,低16位表示寫狀態。ReentrantReadWriteLock在內部類Sync定義了以下常量用以區分讀寫狀態:
//偏移量
static final int SHARED_SHIFT = 16;
//執行緒獲得讀鎖,state加SHARED_UNIT,state高16位SHARED_UNIT個數代表了有多少個共享鎖
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//讀寫鎖重入最多不超過65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
通過把32位int型變數state按位切割成兩部分維護讀寫兩種狀態,具體劃分如圖:
從圖中可以看到,當前執行緒獲取了寫鎖,重進入了3次,連續獲得了兩次讀鎖,每次獲得寫鎖,就把state加1,而低16位總共最大是65535,就是MAX_COUNT的值。每獲得一次讀鎖,就把state加SHARED_COUNT。那麼如何獲取讀寫狀態呢?只要通過位運算取出高16位或低16位就行了,對於讀狀態,state>>>SHARED_SHIFT(無符號補0右移16位)就可以得到加了多少次SHARED_UNIT從而獲得讀狀態;對於寫狀態,state & EXCLUSIVE_MASK(0X0000FFFF,高16位都變為0,低16位不變)就可以獲得寫狀態。
讀鎖計數器
由於ReentrantReadWriteLock支援讀寫鎖的重入,而寫鎖是獨佔鎖,只要取出同步狀態state低16位對應的數值就是獲得寫鎖的重入次數;而讀鎖是共享鎖,每個執行緒獲得讀鎖就會把state加上SHARED_UNIT(包括讀鎖重入),取出state高16位的對應的數值表示是所有執行緒獲得讀鎖的次數,但是如何獲得單個執行緒獲得共享鎖的次數呢?內部類Sync為同步器維護了一個讀鎖計數器,專門統計每個執行緒獲得讀鎖的次數。Sync內部有兩個內部類分別為HoldCounter和ThreadLocalHoldCounter:
abstract static class Sync extends AbstractQueuedSynchronizer {
static final class HoldCounter {
//計數器,用於統計執行緒重入讀鎖次數
int count = 0;
// Use id, not reference, to avoid garbage retention
//執行緒TID,區分執行緒,可以唯一標識一個執行緒
final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
//重寫初始化方法,在沒有進行set的情況下,獲取的都是該HoldCounter值
public HoldCounter initialValue() {
return new HoldCounter();
}
}
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
//本地執行緒讀鎖計數器
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
}
- firstReader和firstReaderHoldCount
如果只有一個執行緒獲取了讀鎖,就不需要使用本地執行緒變數readHolds,當前執行緒就是第一個獲得讀鎖的執行緒firstReader,使用firstReaderHoldCount儲存執行緒重入次數。 - readHolds
第一個獲得讀鎖的執行緒使用firstReaderHoldCount儲存讀鎖重入次數,後面的執行緒就要使用ThreadLocal型別變數readHolds了,每個執行緒擁有自己的副本,用來儲存自己的重入數。 - cachedHoldCounter
快取計數器,是最後一個獲取到讀鎖的執行緒計數器,每當有新的執行緒獲取到讀鎖,這個變數都會更新。如果當前執行緒不是第一個獲得讀鎖的執行緒,先到快取計數器cachedHoldCounter檢視快取計數器是否指向當前執行緒,不是再去readHolds查詢,通過快取提高效率。
共享鎖的獲取
獲取讀鎖,由內部類ReadLock提供lock方法,呼叫了Sync父類AQS的方法:
//獲取讀鎖
public void lock() {
sync.acquireShared(1);
}
//獲取共享鎖
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared(int unused)
嘗試獲取共享鎖:
protected final int tryAcquireShared(int unused) {
//當前執行緒
Thread current = Thread.currentThread();
//同步狀態state
int c = getState();
//檢查獨佔鎖是否被佔據,如果被佔據,是否是當前執行緒獲取了獨佔鎖
//如果是當前執行緒獲取了寫鎖,可以繼續獲取讀鎖,如果都不是返回-1表示獲取失敗
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//讀鎖數量
int r = sharedCount(c);
//!readerShouldBlock() 根據公平與否策略和佇列是否含有等待節點決定當前執行緒是否繼續獲取鎖
//不能大於65535且CAS修改成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//如果沒有執行緒獲取過讀鎖
if (r == 0) {
//將當前執行緒設定為第一個讀鎖執行緒
firstReader = current;
// 計數器為一
firstReaderHoldCount = 1;
//讀鎖重入
} else if (firstReader == current) {
//計數器加一
firstReaderHoldCount++;
} else {
// 如果不是第一個執行緒,獲取鎖成功
// cachedHoldCounter 代表的是最後一個獲取讀鎖的執行緒的計數器
HoldCounter rh = cachedHoldCounter;
// 如果計數器是 null 或者不指向當前執行緒,那麼就新建一個 HoldCounter 物件
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
//計數器為0,儲存到readHolds中
else if (rh.count == 0)
readHolds.set(rh);
//計數器加一
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
- fullTryAcquireShared(Thread current)
當已有執行緒佔據獨佔鎖、讀鎖數量超過MAX_COUNT、不滿足公平策略或者CAS設定state失敗,就會呼叫這個方法。與tryAcquireShared方法邏輯大體相似。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
//死迴圈
for (;;) {
//同步狀態
int c = getState();
//檢查寫鎖獲取情況
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
//進入到這裡,說明沒有其他執行緒獲取寫鎖
//公平鎖策略檢查
} else if (readerShouldBlock()) {
//readerShouldBlock()返回true,應該堵塞,檢查是否獲取過讀鎖
// 第一個獲取讀鎖執行緒是當前執行緒,重入
if (firstReader == current) {
} else {
//迴圈中,若計數器為null
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
//需要阻塞且是非重入(還未獲取讀鎖的),獲取失敗。
if (rh.count == 0)
return -1;
}
}
//檢查讀鎖總數量是否超過最大值
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//CAS設定同步狀態state
if (compareAndSetState(c, c + SHARED_UNIT)) {
//當前執行緒獲得第一個讀鎖
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
//讀鎖重入
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//從快取讀入計數器,提高效率
if (rh == null)
rh = cachedHoldCounter;
//計數器為空或不是指向當前執行緒
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
doAcquireShared(int arg)
當tryAcquireShared嘗試獲取共享鎖失敗,返回-1,進入AQS同步佇列等待獲取共享鎖
private void doAcquireShared(int arg) {
//將當前節點以共享型型別加入同步佇列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//前驅節點獲取到鎖,可能佔據鎖,也可能已經釋放鎖,呼叫tryAcquireShared嘗試獲取鎖
if (p == head) {
int r = tryAcquireShared(arg);
//獲取成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//與獨佔鎖ReentrantLock堵塞邏輯一致
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//因中斷/超時,取消獲取鎖
if (failed)
cancelAcquire(node);
}
}
共享鎖的釋放
釋放讀鎖,由內部類ReadLock提供unlock方法,呼叫了Sync父類AQS的方法:
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(int unused)
tryReleaseShared返回true,即同步狀態為0,不存線上程佔據讀鎖或寫鎖。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//當前執行緒是第一個獲得讀鎖的執行緒
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
//不是firstReader,更新計數器
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
//完全釋放鎖
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
//重入鎖退出
--rh.count;
}
//CAS更新同步狀態,
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
doReleaseShared()
tryReleaseShared方法成功釋放鎖,呼叫doReleaseShared喚醒後繼節點。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果節點狀態為 Node.SIGNAL,將狀態設定為0,設定成功,喚醒執行緒。
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//如果本身頭結點的waitStatus是出於重置狀態(waitStatus==0)的,
//將其設定為“傳播”狀態。意味著需要將狀態向後一個節點傳播。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
寫鎖獲取
獲取寫鎖,由內部類WriteLock提供lock方法,呼叫了Sync父類AQS的方法,重點解析一下tryAcquire實現:
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(int acquires)
內部類Sync重寫的tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//同步狀態不為0
if (c != 0) {
//其他執行緒獲得寫鎖,獲取失敗;w為0而同步狀態不為0,沒有執行緒佔據寫鎖,有執行緒佔據讀鎖
//注意:不存在讀鎖與寫鎖同時被多個執行緒獲取的情況。
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//當前執行緒已經獲得寫鎖,重入次數超過MAX_COUNT,失敗
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 鎖重入
setState(c + acquires);
return true;
}
//公平策略檢查
//CAS設定同步狀態成功則獲得寫鎖
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
寫鎖釋放
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(int releases)
當同步狀態state為0時,tryRelease方法返回true。
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//獨佔鎖,只有當前執行緒釋放同步狀態,不需要考慮併發
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
鎖降級
讀寫鎖ReentrantReadWriteLock支援寫鎖降級,從下面可以看到執行緒獲得寫鎖後,在沒有釋放寫鎖的情況下獲得了讀鎖(鎖降級),然後在手動釋放寫鎖。這更像是一種特殊的鎖重入,由於獲得寫鎖有繼續獲得讀鎖的需要,相對於釋放寫鎖再獲取讀鎖,直接去獲取讀鎖沒有其他執行緒競爭,免去了由於其他執行緒獲得寫鎖進入等待狀態的可能,效率更高。注意:鎖降級後需要手動釋放寫鎖,否則執行緒會一直持有獨佔鎖
讀寫鎖ReentrantReadWriteLock是不支援鎖升級的,如果一個獲得了讀鎖的執行緒在持有讀鎖的情況下嘗試獲取寫鎖,是不可能成功獲得讀鎖的,因為獲得寫鎖會判斷當前有沒有執行緒持有讀鎖,而嘗試鎖升級的執行緒本身讀鎖沒有釋放,所以會進入同步佇列等待同步狀態為0獲取寫鎖,由於讀鎖一直不釋放會導致其他執行緒無法獲取寫鎖(獲取寫鎖條件不能有其他執行緒佔據讀鎖或寫鎖),只能獲取共享鎖讀鎖。因此ReentrantReadWriteLock是不支援讀寫鎖的。
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 必須先釋放讀鎖再獲取寫鎖
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 寫鎖未釋放獲得讀鎖
rwl.readLock().lock();
} finally {
//釋放寫鎖,降級為讀鎖
rwl.writeLock().unlock();
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
總結
讀寫鎖內部維護了共享鎖讀鎖和獨佔鎖寫鎖,讀鎖和寫鎖都支援重進入,當讀鎖已經被獲取(state高16位不為0)或寫鎖已被其他執行緒獲取,獲取寫鎖的執行緒進入等待狀態;當寫鎖已經被其他執行緒獲取,獲取讀鎖的執行緒進入等待狀態。讀寫鎖支援由獨佔鎖(寫鎖)降級到(讀鎖),但不支援讀鎖升級到寫鎖,在使用時要考慮手動釋放好讀鎖與寫鎖的釋放,否則程式可能會出現意想不到的問題。