(2.1.27.11)Java併發程式設計:Lock之ReentrantReadWriteLock 讀寫分離獨享式重入鎖
我們在介紹AbstractQueuedSynchronizer的時候介紹過,AQS支援獨佔式同步狀態獲取/釋放、共享式同步狀態獲取/釋放兩種模式,對應的典型應用分別是ReentrantLock和Semaphore
AQS還可以混合兩種模式使用,讀寫鎖ReentrantReadWriteLock就是如此。
設想以下情景:我們在系統中有一個多執行緒訪問的快取,多個執行緒都可以對快取進行讀或寫操作,但是讀操作遠遠多於寫操作,要求寫操作要執行緒安全,且寫操作執行完成要求對當前的所有讀操作馬上可見。
分析上面的需求:
- 因為有多個執行緒可能會執行寫操作,因此多個執行緒的寫操作必須同步序列執行;
- 而寫操作執行完成要求對當前的所有讀操作馬上可見,這就意味著當有執行緒正在讀的時候,要阻塞寫操作,當正在執行寫操作時,要阻塞讀操作。
一個簡單的實現就是將資料直接加上互斥鎖,同一時刻不管是讀還是寫執行緒,都只能有一個執行緒操作資料。但是這樣的問題就是如果當前只有N個讀執行緒,沒有寫執行緒,這N個讀執行緒也要傻呵呵的排隊讀,儘管其實是可以安全併發提高效率的。
因此理想的實現是:
- 當有寫執行緒時,則寫執行緒獨佔同步狀態。
- 當沒有寫執行緒時只有讀執行緒時,則多個讀執行緒可以共享同步狀態。
讀寫鎖就是為了實現這種效果而生。
一、使用示例
我們先來看一下讀寫鎖怎麼使用,這裡我們基於hashmap(本身執行緒不安全)做一個多執行緒併發安全的快取:
public class ReadWriteCache { private static Map<String, Object> data = new HashMap<>(); private static ReadWriteLock lock = new ReentrantReadWriteLock(false); private static Lock rlock = lock.readLock(); private static Lock wlock = lock.writeLock(); public static Object get(String key) { rlock.lock(); try { return data.get(key); } finally { rlock.unlock(); } } public static Object put(String key, Object value) { wlock.lock(); try { return data.put(key, value); } finally { wlock.unlock(); } } }
限於篇幅我們只實現2個方法,get和put。從程式碼可以看出:
- 我們先建立一個 ReentrantReadWriteLock 物件,建構函式 false 代表是非公平的(非公平的含義和ReentrantLock相同)。
- 然後通過readLock、writeLock方法分別獲取讀鎖和寫鎖。
- 在做讀操作的時候,也就是get方法,我們要先獲取讀鎖;
- 在做寫操作的時候,即put方法,我們要先獲取寫鎖。
通過以上程式碼,我們就構造了一個執行緒安全的快取,達到我們之前說的:寫執行緒獨佔同步狀態,多個讀執行緒可以共享同步狀態。
二、原始碼分析
2.1 ReentrantReadWriteLock整體結構
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {}
}
可以看到,在公平鎖與非公平鎖的實現上,與ReentrantLock一樣,也是有一個繼承AQS的內部類Sync,然後NonfairSync和FairSync都繼承Sync,通過建構函式傳入的布林值決定要構造哪一種Sync例項。
讀寫鎖比ReentrantLock多出了兩個內部類:ReadLock和WriteLock, 用來定義讀鎖和寫鎖,然後在建構函式中,會構造一個讀鎖和一個寫鎖例項儲存到成員變數 readerLock 和 writerLock。
需要注意的是:讀鎖和寫鎖例項是共享一個AQS,也就是說,共享一個FIFO佇列。 這也是實現讀寫之間相互影響的關鍵
2.2 讀寫鎖
我們還是先回顧下lock()
的內容:
方法名稱 | 描述 |
---|---|
void lock() | 獲取鎖. 成功則向下執行,失敗則阻塞 |
void lockInterruptibly() throws InterruptedException | 可中斷地獲取鎖,在當前執行緒獲取鎖的過程中可以響應中斷訊號 |
boolean tryLock() | 嘗試非阻塞獲取鎖,呼叫方法後立即返回,成功返回true,失敗返回false |
boolean tryLock(long time, TimeUnit unit) throws InterruptedException | 在超時時間內獲取鎖,到達超時時間將返回false,也可以響應中斷 |
void unlock(); | 釋放鎖 |
Condition newCondition(); | 獲取等待通知元件實現訊號控制,等待通知元件實現類似於Object.wait()方法的功能 |
- 共享式讀鎖
- 內部呼叫共享式AQS操作,因此真實的實現就是Sync的 acquireShared 和 releaseShared
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();//非阻塞式嘗試獲取讀鎖
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
- 獨享式寫鎖
- 內部呼叫獨享式AQS操作,因此真實的實現就是Sync的 acquire 和 release
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.tryWriteLock();//非阻塞式嘗試獲取寫鎖
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
我們可以看到基本核心的程式碼還是在AQS中實現
2.3 AQS的實現
在上篇我們已經講到了,AQS需要重寫的鉤子方法:
方法名稱 | 描述 |
---|---|
boolean tryAcquire(int arg) | 獨佔式嘗試獲取同步狀態(通過CAS操作設定同步狀態),如果成功返回true,反之返回false |
boolean tryRelease(int arg) | 獨佔式釋放同步狀態,成功返回true,失敗返回false。 |
int tryAcquireShared(int arg) | 共享式的獲取同步狀態,返回大於等於0的值,表示獲取成功,反之失敗。 |
boolean tryReleaseShared(int arg) | 共享式釋放同步狀態,成功返回true,失敗返回false。 |
boolean isHeldExclusively() | 判斷同步器是否在獨佔模式下被佔用,一般用來表示同步器是否被當前執行緒佔用 |
2.3.1 state的改變
之前在ReentrantLock中,我們知道鎖的狀態是儲存在Sync例項的state欄位中的(繼承自父類AQS): 0代表無鎖狀態; >0時代表有鎖,具體值為重入次數
現在有了讀寫兩把鎖,然而可以看到還是隻有一個Sync例項,那麼一個Sync例項的state是如何同時儲存兩把鎖的狀態的呢?
答案就是用了位分隔:
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT); //每次要讓共享鎖+1,就應該讓state加 1<<16
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //每種鎖的最大重入數量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 要獲取共享鎖當前的重入數量 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 獲取獨佔鎖當前的重入數量 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
private transient Thread firstReader;
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
}
state欄位是32位的int,讀寫鎖用state的低16位儲存寫鎖(獨佔鎖)的狀態;高16位儲存讀鎖(共享鎖)的狀態。
因此:
- 要獲取獨佔鎖當前的重入數量,就是 state & ((1 << 16) -1) (即 exclusiveCount 方法)
- 要獲取共享鎖當前的重入數量,就是 state >>> 16 (即 sharedCount 方法)
2.3.2 獨享式寫鎖對應的AQS
獨享式寫鎖內部呼叫獨享式AQS操作,因此真實的實現就是Sync的 acquire 和 release。 對應的式AQS的鉤子函式 tryAcquire
和 tryRelease
2.3.2.1 獨享式寫鎖的獲取
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();//獲取鎖狀態
int w = exclusiveCount(c); //獲取獨佔鎖的重入數
// 【實現重入】當前state不為0(已被讀或寫佔據當前鎖)
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())//如果寫鎖重入為0說明讀鎖此時被佔用, 或者不被當前寫執行緒佔據,則返回false;
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded"); //寫鎖重入數溢位
// Reentrant acquire
setState(c + acquires); //走到這裡標識,被寫鎖佔據且時當前執行緒佔據,重入數增加
return true;
}
//【實現搶佔】到這裡了說明state為0(沒有被佔用)。
if (writerShouldBlock() || //writerShouldBlock是為了實現公平或非公平策略的
!compareAndSetState(c, c + acquires))//則嘗試通過CAS來搶佔,搶佔成功,直接返回true
return false;
setExclusiveOwnerThread(current);
return true;
}
}
}
static final class NonfairSync extends Sync {
//不公平鎖一直返回 fasle, 從而直接誘發CAS搶佔
final boolean writerShouldBlock() {
return false; // writers can always barge
}
}
static final class NonfairSync extends Sync {
//公平鎖則判斷是否由前置阻塞的節點, 從而判斷是否誘發CAS搶佔。
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
}
writerShouldBlock
實現公平或非公平策略的- 不公平鎖一直返回 fasle, 從而直接誘發CAS搶佔
- 公平鎖則判斷是否由前置阻塞的節點, 從而判斷是否誘發CAS搶佔。
- true標識有前置阻塞節點,則
tryAcquire
直接返回false - false標識沒有前置阻塞節點,誘發CAS搶佔。
- true標識有前置阻塞節點,則
2.3.2.2 獨享式寫鎖的釋放
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException(); //非當前執行緒佔據則直接拋異常
int nextc = getState() - releases;//【計算 鎖狀態】
boolean free = exclusiveCount(nextc) == 0;//作為可重入數,state在大於0時標識重入次數,必須退出對應次數時,才算真正的退出
if (free)
setExclusiveOwnerThread(null); //如果獨佔模式重入數為0了,說明獨佔模式被釋放
setState(nextc); //更新鎖狀態
return free;
}
}
2.3.3 共享式讀鎖對應的AQS
類似於寫鎖,讀鎖的lock和unlock的實際實現對應Sync的 tryAcquireShared 和 tryReleaseShared方法。
2.3.3.1 共享式讀鎖的獲取
abstract static class Sync extends AbstractQueuedSynchronizer {
private transient Thread firstReader;//首個獲取讀鎖的執行緒
private transient int firstReaderHoldCount;//首個獲取讀鎖的執行緒的重入數
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();//【1】獲取鎖狀態
//【2】如果獨佔模式被佔且不是當前執行緒持有,則獲取失敗
//表示:如果存在寫鎖,則當前讀鎖獲取失敗
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//【3】獲取 共享鎖重入數
int r = sharedCount(c);
//【4】如果公平策略沒有要求阻塞且重入數沒有到達最大值,則直接嘗試CAS更新state
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//【5】成功獲取讀鎖後,內部變數的更新操作
if (r == 0) {
//如果r=0, 表示,當前執行緒為第一個獲取讀鎖的執行緒。
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果第一個獲取讀鎖的物件為當前物件,將firstReaderHoldCount 加一。
firstReaderHoldCount++;
} else {
//當前執行緒功獲取鎖後,如果不是第一個獲取多鎖的執行緒
// 將該執行緒持有鎖的次數資訊,放入執行緒本地變數中,方便計算當前執行緒的重入次數
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current); //用來處理CAS沒成功的情況,邏輯和上面的邏輯是類似的,就是加了無限迴圈
}
}
static final class NonfairSync extends Sync {
final boolean readerShouldBlock() {
//不公平鎖一直返回 fasle, 從而直接誘發CAS搶佔
return apparentlyFirstQueuedIsExclusive();
}
//這個方法判斷佇列的head.next是否正在等待獨佔鎖(寫鎖)。這個方法的意思是:讀鎖不應該讓寫鎖始終等待。
//該方法如果頭節點不為空,並頭節點的下一個節點不為空,並且不是共享模式【獨佔模式,寫鎖】、並且執行緒不為空,則返回true。
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
}
static final class FairSync extends Sync {
//公平鎖則判斷是否由前置阻塞的節點, 從而判斷是否誘發CAS搶佔。
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
我們主要看下CAS更新成功後(獲取讀鎖成功)的操作:
在firstReaderHoldCount中或readHolds(ThreadLocal型別的)的本執行緒副本中記錄當前執行緒重入數(淺藍色程式碼),這是為了實現jdk1.6中加入的getReadHoldCount()
方法的
這個方法能獲取當前執行緒重入共享鎖的次數(state中記錄的是多個執行緒的總重入次數)
加入了這個方法讓程式碼複雜了不少,但是其原理還是很簡單的:如果當前只有一個執行緒的話,還不需要動用ThreadLocal,直接往firstReaderHoldCount這個成員變數裡存重入數,當有第二個執行緒來的時候,就要動用ThreadLocal變數readHolds了,每個執行緒擁有自己的副本,用來儲存自己的重入數。
fullTryAcquireShared
如果CAS失敗或readerShouldBlock方法返回true,我們呼叫fullTryAcquireShared方法繼續試圖獲取讀鎖。fullTryAcquireShared方法是tryAcquireShared方法的完整版,或者叫升級版,它處理了CAS失敗的情況和readerShouldBlock返回true的情況。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
//【1】獲取鎖狀態
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
//【2】如果獨佔模式被佔且不是當前執行緒持有,則獲取失敗
//表示:如果存在寫鎖,則當前讀鎖獲取失敗
return -1;
} else if (readerShouldBlock()) {
//【3】如果沒有執行緒正在持有寫鎖,則呼叫readerShouldBlock檢測根據公平原則,當前執行緒是否應該進入等待佇列。
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
//共享鎖溢位
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//嘗試CAS更新state
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
2.3.3.2 共享式讀鎖的釋放
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//下邊程式碼也是為了實現jdk1.6中加入的getReadHoldCount()方法,在更新當前執行緒的重入數。
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//這裡是真正的釋放同步狀態的邏輯,就是直接同步狀態-SHARED_UNIT,然後CAS更新,沒啥好說的
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
- 共享式鎖的【更新鎖狀態】不能通過setState,而是通過CAS操作。 這是由於獨享式鎖釋放時肯定是單執行緒模型的,而共享式的則可能是多執行緒模型
- 不需要呼叫setExclusiveOwnerThread 設定獨佔狀態
三、補充內容
通過上面的原始碼分析,我們可以發現一個現象:
- 線上程持有讀鎖的情況下,該執行緒不能取得寫鎖(因為獲取寫鎖的時候,如果發現當前的讀鎖被佔用,就馬上獲取失敗,不管讀鎖是不是被當前執行緒持有)
- 線上程持有寫鎖的情況下,該執行緒可以繼續獲取讀鎖(獲取讀鎖時如果發現寫鎖被佔用,只有寫鎖沒有被當前執行緒佔用的情況才會獲取失敗)
細想想,這個設計是合理的:因為
- 當執行緒獲取讀鎖的時候,可能有其他執行緒同時也在持有讀鎖,因此不能把獲取讀鎖的執行緒“升級”為寫鎖;
- 而對於獲得寫鎖的執行緒,它一定獨佔了讀寫鎖,因此可以繼續讓它獲取讀鎖
- 當它同時獲取了寫鎖和讀鎖後,還可以先釋放寫鎖繼續持有讀鎖,這樣一個寫鎖就“降級”為了讀鎖。
綜上:
- 一個執行緒要想同時持有寫鎖和讀鎖,必須先獲取寫鎖再獲取讀鎖;
- 寫鎖可以“降級”為讀鎖;
- 讀鎖不能“升級”為寫鎖。