Java 基礎--重入鎖與讀寫鎖
本篇基於Java%20%E5%9F%BA%E7%A1%80--%E9%98%9F%E5%88%97%E5%90%8C%E6%AD%A5%E5%99%A8(AQS" target="_blank" rel="nofollow,noindex">Java 基礎–佇列同步器(AQS) ),對重入鎖(ReentrantLock)和讀寫鎖(ReentrantReadWriteLock)進行解析。
重入鎖
重入鎖(ReentrantLock)表示支援一個執行緒對資源的重複加鎖,也就是說一個執行緒可以多次獲取到同一個鎖。重入鎖實現了 Lock 介面,內部實現是基於佇列同步器(AbstractQueuedSynchronizer):
public class ReentrantLockimplements Lock,java.io.Serializable{ abstract static class Syncextends AbstractQueuedSynchronizer{...} // 非公平 Sync static final class NonfairSyncextends Sync{...} // 公平 Sync static final class FairSyncextends Sync{...} }
重入鎖還支援獲取鎖時的公平性與非公平性選擇,可通過其構造方法進行設定,預設是非公平的。
// 構造方法 public ReentrantLock(){ sync = new NonfairSync(); } public ReentrantLock(boolean fair){ sync = fair ? new FairSync() : new NonfairSync(); }
關於公平與非公平:如果在絕對時間上,先獲取鎖的請求一定是先被滿足的,那麼這個鎖就是公平的,反之就是非公平的。也就是說如果在同步佇列中排隊時間最長或者排在最前面的節點先獲取到同步狀態,那麼就是公平的。
非公平性獲取鎖
重入鎖預設是非公平性的,所以先看看非公平性的實現:
static final class NonfairSyncextends Sync{ private static final long serialVersionUID = 7316153563782823691L; final void lock(){ // 先判斷是否可以獲取同步狀態(鎖) if (compareAndSetState(0, 1)) // 快取當前執行緒,用來判斷下次獲取鎖的是不是該執行緒 setExclusiveOwnerThread(Thread.currentThread()); else // 內部呼叫 tryAcquire() acquire(1); } protected final boolean tryAcquire(int acquires){ return nonfairTryAcquire(acquires); } }
NonfairSync 只有上面 10 來行程式碼,當獲取同步狀態失敗後就通過acquire(1)
去獲取同步狀態,所以看看nonfairTryAcquire(int acquires)
方法的實現:
final boolean nonfairTryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 這部分和前面的一樣 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 此時表示該鎖已經被當前執行緒獲取過了 // 記錄被獲取的次數 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 修改同步狀態(加 1) setState(nextc); return true; } return false; }
nonfairTryAcquire(int acquires)
方法中也比較簡單,首先判斷同步狀態是否被成功獲取過,如果已經被成功獲取過了就判斷之前獲取成功的執行緒和當前執行緒是否一樣,如果一樣就表示當前執行緒再次獲取成功,並給同步狀態加 1,此時的同步狀態表示被成功獲取的次數。
公平性獲取鎖
非公平性看完了就來看看公平性的實現:
static final class FairSyncextends Sync{ private static final long serialVersionUID = -3000897897090466540L; final void lock(){ // 內部會呼叫 tryAcquire() acquire(1); } protected final boolean tryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // hasQueuedPredecessors() 表示是否還有前驅節點 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 此時表示該鎖已經被當前執行緒獲取過了 // 記錄被獲取的次數 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 修改同步狀態(加 1) setState(nextc); return true; } return false; } }
可以發現公平與非公平獲取鎖的區別主要就是hasQueuedPredecessors()
這句程式碼,意思是當前節點是否還有前驅節點,因為公平性獲取鎖表示排在前面的節點一定是先獲取到鎖的,所以這裡多了一個判斷。
同樣,它們都是通過改變同步狀態(加 1)來表示被獲取的次數,每獲取成功一次就修改一次(加 1)。
釋放鎖
在公平性與非公平性獲取鎖的時候,如果當前執行緒已經獲取過鎖了,那麼就修改同步狀態(加 1),所以當釋放鎖的時候就必然的需要減少同步狀態。重入鎖可以通過unlock()
方法來釋放鎖:
public void unlock(){ // 釋放一次鎖 sync.release(1); } public final boolean release(int arg){ if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 喚醒處於阻塞的執行緒 unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases){ // 每釋放一次鎖就減 1 int c = getState() - releases; // 判斷當前執行緒和擁有該鎖的執行緒是否一樣 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 當同步狀態恢復到 0 才表示真正釋放鎖完成 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
通過上面可以看到tryRelease()
方法才是核心,每次呼叫unlock()
都會修改同步狀態(減 1),當同步狀態恢復到 0 時才算是真正的釋放鎖成功。
可以發現,重入鎖是個排他鎖,當前執行緒可以多次獲取,但是同一時刻只有一個執行緒能獲取成功。
讀寫鎖
讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,在同一個時刻可以允許多個讀執行緒訪問,但是在寫執行緒訪問時,除了當前執行緒,其他所有執行緒的讀/寫操作均被阻塞。所以讀鎖是個共享鎖,而寫鎖是個獨佔鎖(排他鎖),但它們都是重入鎖。
讀寫鎖和重入鎖類似,都是用同步狀態來表示獲取鎖的次數,而讀寫鎖的表示更復雜些:用同步狀態的高 16 位表示讀鎖獲取次數(簡稱讀次數),用低 16 位表示寫鎖獲取次數(簡稱寫次數):
static final int SHARED_SHIFT= 16; static final int SHARED_UNIT= (1 << SHARED_SHIFT);// 65536 static final int MAX_COUNT= (1 << SHARED_SHIFT) - 1;// 65535 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 65535 // 讀鎖獲取次數(簡稱讀次數) static int sharedCount(int c){ return c >>> SHARED_SHIFT; } // 寫鎖獲取次數(簡稱寫次數) static int exclusiveCount(int c){ return c & EXCLUSIVE_MASK; }
讀寫鎖實現了 ReadWriteLock 介面,ReadWriteLock 定義:
public interface ReadWriteLock{ LockreadLock(); LockwriteLock(); }
讀寫鎖內部實現是基於佇列同步器(AbstractQueuedSynchronizer),同時也支援獲取鎖時的公平性與非公平性選擇,可通過其構造方法進行設定,預設是非公平的。
public class ReentrantReadWriteLockimplements ReadWriteLock,java.io.Serializable{ // 內部類讀鎖 private final ReentrantReadWriteLock.ReadLock readerLock; // 內部類寫鎖 private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; public ReentrantReadWriteLock(){ this(false); } public ReentrantReadWriteLock(boolean fair){ sync = fair ? new FairSync() : new NonfairSync(); // 建立讀/寫鎖物件 readerLock = new ReadLock(this); writerLock = new WriteLock(this); } // 獲取寫鎖物件 public ReentrantReadWriteLock.WriteLockwriteLock(){ return writerLock; } // 獲取讀鎖物件 public ReentrantReadWriteLock.ReadLockreadLock(){ return readerLock; } abstract static class Syncextends AbstractQueuedSynchronizer{...} // 公平 Sync static final class FairSyncextends Sync{...} // 非公平 Sync static final class NonfairSyncextends Sync{...} }
構造方法中預設建立了讀/寫鎖物件,所以呼叫readLock()
和writeLock()
就可以獲取讀鎖和寫鎖。
公平性與非公平性
讀寫鎖預設也是非公平性的,所以先看看非公平性的實現:
static final class NonfairSyncextends Sync{ private static final long serialVersionUID = -8159625535654395037L; // 是否阻塞寫執行緒 final boolean writerShouldBlock(){ return false; } // 是否阻塞讀執行緒 final boolean readerShouldBlock(){ return apparentlyFirstQueuedIsExclusive(); } } // 判斷佇列的第一個(頭節點的後繼節點)節點是否獨佔式,即是否是寫執行緒 final boolean apparentlyFirstQueuedIsExclusive(){ Node h, s; return (h = head) != null && (s = h.next)!= null && !s.isShared()&& s.thread != null; }
writerShouldBlock()
方法直接返回了 false,而readerShouldBlock()
方法稍微做了下判斷,按照重入鎖的非公平性實現,此時應該直接返回 false,為什麼要有這個判斷呢?因為讀鎖是可以被多個執行緒獲取的,如果某個執行緒又獲取了寫鎖並更新了資料,那麼這個更新對其他獲取讀鎖的執行緒是不可見的。
再看看公平性的實現:
static final class FairSyncextends Sync{ private static final long serialVersionUID = -2274990926593161451L; // 是否阻塞寫執行緒 final boolean writerShouldBlock(){ // 是否還有前驅節點 return hasQueuedPredecessors(); } // 是否阻塞讀執行緒 final boolean readerShouldBlock(){ // 是否還有前驅節點 return hasQueuedPredecessors(); } }
這個就簡單了,直接返回是否還有前驅節點,是我們想要的。
寫鎖
讀寫鎖中用 WriteLock 來表示寫鎖,它是讀寫鎖的內部類,定義如下:
public static class WriteLockimplements Lock,java.io.Serializable{...}
獲取寫鎖
通過寫鎖的lock()
方法可以獲取寫鎖,下面先看看寫鎖的獲取:
public void lock(){ sync.acquire(1); } public final void acquire(int arg){ if (!tryAcquire(arg) && // 構建獨佔式節點加入同步佇列 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires){ Thread current = Thread.currentThread(); int c = getState(); // 寫次數 int w = exclusiveCount(c); // 鎖是否被獲取過 if (c != 0) { // 同步狀態不為 0 而寫次數為 0,說明讀鎖被獲取過 // 此時如果獲取寫鎖的執行緒就是當前執行緒,那麼仍然可以獲取寫鎖,而如果是不同執行緒則不能獲取寫鎖 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 說明獲取鎖的次數是有上限的 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 修改同步狀態 setState(c + acquires); return true; } // 是否阻塞寫執行緒需要根據當前的獲取策略:公平與非公平 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 快取當前的寫執行緒 setExclusiveOwnerThread(current); return true; }
在tryAcquire(int acquires)
方法中,writerShouldBlock()
需要分以下情況:
-
公平獲取:
- 有前驅節點,返回 true,即不能獲取寫鎖
- 沒有前驅節點,返回 false
- 非公平獲取:直接返回 false
所以,假如讀/寫鎖被獲取過了(c != 0
),此時如果讀鎖還沒有被獲取過或者被當前執行緒獲取過,那麼此次獲取成功,否則此次獲取失敗;假如讀/寫鎖都還沒有被獲取過(c == 0
),此時如果是公平獲取,當存在前驅節點時獲取失敗,否則獲取成功,如果是非公平獲取,那麼將獲取成功。
st=>start: Start cond1=>condition: 鎖被獲取過 cond2=>condition: 讀鎖被其他執行緒獲取過 cond3=>condition: 公平獲取 cond4=>condition: 存在前驅節點 oper1=>operation: 獲取失敗 oper2=>operation: 獲取成功 oper3=>operation: 獲取失敗 oper4=>operation: 獲取成功 e=>end: End st->cond1 cond1(yes)->cond2 cond1(no)->cond3 cond3(yes)->cond4 cond3(no)->oper4 cond4(yes)->oper3 cond4(no)->oper4 cond2(yes)->oper1 cond2(no)->oper2 oper1->e oper2->e oper3->e oper4->e
釋放寫鎖
呼叫寫鎖的unlock()
方法可以釋放寫鎖:
public void unlock(){ sync.release(1); } public final boolean release(int arg){ if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases){ // 判斷快取執行緒是否是當前執行緒 // 因為獲取寫鎖之後,當前執行緒仍可以獲取讀/寫鎖,而其他執行緒都被阻塞 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 每釋放一次鎖就減 1 int nextc = getState() - releases; // 當寫次數為 0 時才表示真正釋放成功 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
寫鎖的釋放相對簡單,每釋放一次鎖同步狀態就減 1,當寫鎖的獲取次數為 0 時表示真正釋放成功。
讀鎖
讀寫鎖中用 ReadLock 來表示讀鎖,它也是讀寫鎖的內部類,定義如下:
public static class ReadLockimplements Lock,java.io.Serializable{...}
獲取讀鎖
通過讀鎖的lock()
方法可以獲取讀鎖:
public void lock(){ sync.acquireShared(1); } public final void acquireShared(int arg){ if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected final int tryAcquireShared(int unused){ Thread current = Thread.currentThread(); int c = getState(); // 其他執行緒已經獲取了寫鎖,此時不能獲取讀鎖 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 讀次數 int r = sharedCount(c); // 是否阻塞讀執行緒需要分公平與非公平情況 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 第一次獲取讀鎖 // 快取第一個讀執行緒及該執行緒獲取過次數 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 當前執行緒重複獲取讀鎖,說明讀鎖是可重入的 firstReaderHoldCount++; } else { // 其他讀執行緒到來時,說明讀鎖是共享的 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 獲取當前執行緒的 cachedHoldCounter cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
tryAcquireShared(int unused)
方法中,readerShouldBlock()
方法同樣需要分情況:
-
公平獲取:
- 有前驅節點,返回 true
- 沒有前驅節點,返回 false
- 非公平獲取:如果佇列的第一個(頭節點的後繼節點)節點是獨佔式(寫執行緒),返回 true,否則返回 false
這裡出現了兩個物件:cachedHoldCounter、readHolds,看看它們的定義:
static final class HoldCounter{ int count = 0; final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter>{ public HoldCounter initialValue(){ return new HoldCounter(); } } Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds }
根據 ThreadLocal 的特性,所以此處每個執行緒都有一個自己的 HoldCounter,HoldCounter 裡面記錄的是當前執行緒獲取鎖的次數和當前執行緒的 ID。
回到tryAcquireShared(int unused)
方法,什麼時候會進入fullTryAcquireShared(current)
方法呢?
- 假設執行緒 A 獲取了寫鎖,當執行緒 B 來獲取讀鎖的時候將被阻塞,這時如果執行緒 A 來獲取讀鎖就進入該方法;
- 假設執行緒 A 獲取了寫鎖,當執行緒 B、C 來獲取讀鎖的時候將被阻塞,同步佇列為 B->C,當 A 釋放鎖時 B、C 就會進入該方法
final int fullTryAcquireShared(Thread current){ HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { // 其他執行緒已經獲取了寫鎖,此時不能獲取讀鎖 if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // 讀次數是否達到了 65535 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 修改同步狀態 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { // 第一次獲取讀鎖 // 快取第一個讀執行緒及該執行緒獲取過次數 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 當前執行緒重複獲取讀鎖 firstReaderHoldCount++; } else { // 其他讀執行緒到來時 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 獲取當前執行緒的 cachedHoldCounter rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
fullTryAcquireShared(Thread current)
主要處理讀執行緒被阻塞和 CAS 失敗的情況,基本上和tryAcquireShared(int unused)
方法差不多。
所以,整個tryAcquireShared(int unused)
方法可以分為三部分:
exclusiveCount(c) != 0
釋放讀鎖
呼叫讀鎖的unlock()
方法可以釋放讀鎖:
public void unlock(){ sync.releaseShared(1); } public final boolean releaseShared(int arg){ if (tryReleaseShared(arg)) { // 修改等待狀態,喚醒後繼節點 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused){ Thread current = Thread.currentThread(); // 當前執行緒是否是第一個獲取讀鎖的執行緒 if (firstReader == current) { if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 修改其他執行緒的 readHolds HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); // 獲取讀鎖成功時 +SHARED_UNIT,所以此處要 -SHARED_UNIT int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // 當讀/鎖都釋放完後 nextc 才為 0 return nextc == 0; } }
每次釋放讀鎖都會給相應的狀態減 1,這個和寫鎖的釋放一樣,但是有一點不同的是讀鎖的釋放需要判斷寫鎖是否釋放完成。
每次獲取寫鎖的時候修改同步狀態都是 +1,釋放寫鎖的時候都是 -1;而獲取讀鎖時是 +SHARED_UNIT,釋放讀鎖是 -SHARED_UNIT。當一個執行緒獲取了寫鎖後再獲取了讀鎖,在釋放鎖時如果沒有先釋放寫鎖,那麼nextc == 0
將永遠不會成立。
所以如果當前執行緒獲取了寫鎖和讀鎖,在釋放鎖的時候必須先釋放寫鎖再釋放讀鎖。
鎖降級
讀寫鎖中還有個鎖降級的概念,意思就是把寫鎖降級成為讀鎖。如果一個執行緒獲取了寫鎖並釋放後再去獲取讀鎖,這個過程不能稱為鎖降級。鎖降級是指在獲取了寫鎖後,在釋放寫鎖之前去獲取讀鎖,然後才釋放寫鎖。看個示例:
class CachedData{ final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); Object data; volatile boolean cacheValid; void processCachedData(){ rwl.readLock().lock(); if (!cacheValid) { // 必須先釋放讀鎖 rwl.readLock().unlock(); // 鎖降級從獲取寫鎖開始 rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true; } // 釋放寫鎖之前獲取讀鎖 rwl.readLock().lock(); } finally { // 釋放寫鎖 rwl.writeLock().unlock(); } } try { use(data); } finally { // 釋放讀鎖 rwl.readLock().unlock(); } } }
上述示例中,在釋放寫鎖之前去獲取讀鎖,然後才釋放寫鎖。這個過程其實就是:一個執行緒獲取寫鎖後仍然可以獲取讀鎖,而其他執行緒的讀/寫都不能獲取。
讀寫鎖不支援鎖升級,也就是讀鎖升級成為寫鎖。因為讀鎖是共享的,可以被多個執行緒獲取,如果多其中某個執行緒又獲取了寫鎖並更新了資料,這個更新對其他獲取讀鎖的執行緒是不可知的。其實從上面的分析獲取寫鎖的過程也能發現:如果其他執行緒獲取了讀鎖,那麼寫鎖將獲取失敗。
總結
重入鎖和讀寫鎖內部的功能實現都是基於佇列同步器(AbstractQueuedSynchronizer),它們都是可重入的,即同一執行緒可以多次獲取。但重入鎖是排他鎖,即同一時刻只允許一個執行緒獲取成功;在讀寫鎖中,讀鎖是共享鎖,允許多個執行緒獲取,而寫鎖是排他鎖,同一時刻只允許一個執行緒獲取成功。此外,當一個執行緒獲取了寫鎖後還可以繼續獲取讀/寫鎖,但是其他執行緒獲取讀/寫鎖將被阻塞。