ReentrantReadWriteLock源碼分析(一)
此處源碼分析,主要是基於讀鎖,非公平機制,JDK1.8。
問題:
1、ReentrantReadWriteLock是如何創建讀鎖與寫鎖?
2、讀鎖與寫鎖的區別是什麽?
3、鎖的重入次數與獲取鎖的線程數分別是用哪種方式記錄的?
4、當隊列中出現多個共享模式的線程節點連續排列時,那麽當第一個共享模式的線程拿到鎖之後,後面的共享線程節點怎麽獲取鎖?
一、創建ReadLock。
ReentrantReadWriteLock rrw = new ReentrantReadWriteLock();
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
rrw.readLock().lock();
1、當fair的值為false時,非公平的方式創建鎖,當fair的值為true時,公平的方式創建鎖。
2、初始化readerLock與writerLock,這兩個變量是ReentrantReadWriteLock的內部變量。
3、sync執行非公平的鎖。
二、lock()源碼分析
2.1、sync.acquireShared(1)
public void lock() { sync.acquireShared(1); }
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
(1)tryAcquireShared的作用是當前線程獲取讀鎖,當返回1時,表示獲取成功,-1表示獲取失敗。
(2)doAcquireShared,表示獲取失敗的時候調用。將獲取失敗的線程加入到等待隊列中,並調用LockSupport.park方法阻塞住,等待線程釋放permit。
2.2、tryAcquireShared(arg)
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); // 獲取到占有鎖的線程數 int c = getState(); // 如果寫鎖被占領了且不是當前線程占領,那麽直接返回 -1 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); // 占有共享鎖的線程數 if (!readerShouldBlock() && // 如果隊列的頭節點的next節點是獨享模式的線程節點即獲取寫鎖的線程節點,返回true r < MAX_COUNT && // 共享的數據不能超過65535 compareAndSetState(c, c + SHARED_UNIT)) { // cas設置state if (r == 0) { // 線程來拿讀鎖,讀鎖和寫鎖沒有被任何線程擁有,那麽r==0 firstReader = current; // firstReaderHoldCount = 1; } else if (firstReader == current) { // 如果線程重復獲取讀鎖,那麽從這裏開始重入 firstReaderHoldCount++; } else { // 如果讀鎖被線程x占領,線程y也要來申請讀鎖,那麽分支就走到這裏了 // HoldCounter類中存儲了兩個屬性,一個是count,用於記錄線程的重入次數,一個是tid,記錄當前線程的id HoldCounter rh = cachedHoldCounter; // 線程x擁有讀鎖之後,線程y第一次申請的時候會走到這裏 //cachedHoldCounter 是一個緩存,保存當前操作線程的上一個線程的操作結果。線程y操作完之後,就會保存線程y的信息 // 如果另外一個線程z來獲取到讀鎖的時候,雖然rh!=null,但是rh.tid != getThreadId(current), //那麽會創建一個默認的HoldCounter,並保存到cachedHoldCounter,並且默認的count=0 if (rh == null || rh.tid != getThreadId(current)) // readHolds.get(),查看源碼可以知道,在這個方法中包含了數據初始化的過程,會調用ReentrantReadWriteLock.java // 下面的方法 /** * public HoldCounter initialValue() { * return new HoldCounter(); * } */ cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 這個分支也會來到,當線程釋放鎖,但是沒有關閉,當再次調用線程時,readHolds中會存在HoldCounter,count=0 readHolds.set(rh); rh.count++; // 計算重入的次數 } return 1; } return fullTryAcquireShared(current); }
請註意:
(1)ReentrantReadWriteLock中維持了一個類ThreadLocalHoldCounter,這個類會生成一個map,key是線程的id,value是HoldCounter對象,HoldCounter對象如下:
static final class HoldCounter { int count = 0; // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); }
其中count就是線程的重入次數,tid就是當前線程的id。這個是與ReentrantLock區別的地方。
(2)ReentrantReadWriteLock使用32位int類型來表示占有鎖的線程數,其中高16位是獲取到讀鎖的線程數,低16位是獲取到寫鎖的線程數,提供了計算線程數的方法。
static final int SHARED_SHIFT = 16;(1) static final int SHARED_UNIT = (1 << SHARED_SHIFT);(2) static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;(3) static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;(4) /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; }(5) /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }(6) c + SHARED_UNIT
其中(1)是共享移動常量;(2)是共享添加的常量;(3)是最大線程數65535(也就是11111111 11111111);(4)跟(3)一樣;(5)計算共享線程數,把c的值向右移16為,並且高位補0; >>> 無符號右移,高位補0;(6)計算獨享的線程數,把c的值與11111111 11111111 按位與,這樣其實就是取到了寫鎖的線程數;(7)是共享線程+1。
源碼分析:
2.2.1、readerShouldBlock()
這個方法的作用是把判斷當前獲取讀鎖的線程是否需要阻塞,條件是:在等待隊列中頭節點的下一個節點是獨享模式的線程。
// 讀鎖應該被阻塞 final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } /** * Returns {@code true} if the apparent first queued thread, if one * exists, is waiting in exclusive mode. If this method returns * {@code true}, and the current thread is attempting to acquire in * shared mode (that is, this method is invoked from {@link * #tryAcquireShared}) then it is guaranteed that the current thread * is not the first queued thread. Used only as a heuristic in * ReentrantReadWriteLock. *如果第一個入隊列的線程節點存在,並且工作在獨享模式下,那麽返回true; *如果這個方法返回true,並且當前線程以共享的模式獲取鎖,這個方法保證了它不是第一個入隊列的 *(讀鎖與讀鎖都是共存的,所以不會入隊,只有當隊列中有獨享模式的線程節點的時候,獲取共享模式的線程才會加入到隊列中。) */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; // 頭節點存在,並且存在下一個節點,下一個節點是獨享模式,下一個節點的thread不是空,則返回true return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
2.2.2、fullTryAcquireShared(current)
這個方法的作用與tryAcquireShared的作用很類似。
// 進入這個方法的條件, /**條件1:!readerShouldBlock() && // 如果第一個入隊列的線程節點存在,並且工作在獨享模式下,那麽返回true; * 條件2:r < MAX_COUNT && // 共享的數據不能超過65535,讀鎖的線程數已經超過了65535 * 條件3:compareAndSetState(c, c + SHARED_UNIT) // 兩個競爭讀鎖的線程都運行到這裏,第一個競爭成功,那麽第二個就會競爭失敗,返回false * 其實這個方法分別對這三種狀態進行處理 */ /** * Full version of acquire for reads, that handles CAS misses * and reentrant reads not dealt with in tryAcquireShared. */ final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); // 如果排他鎖被別的線程拿了,直接返回-1 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // 這裏是對條件1的處理 // 如果隊列的頭的下一個節點是請求的排他鎖的線程在等待,那麽就返回true // Make sure we‘re not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); // 如果當前線程的count==0,也就是說當前線程才進來,沒有獲取到鎖,那麽直接把它從readHolds中移除 if (rh.count == 0) // 移除當前線程的HoldCounter readHolds.remove(); } } // 移除之後,返回-1 if (rh.count == 0) return -1; } } // 這裏是對條件2的處理,直接拋出錯誤! if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 這裏是對條件3的處理,競爭設置state,如果競爭還是失敗,那麽就要再循環一次,直到死循環能夠跳出去 if (compareAndSetState(c, c + SHARED_UNIT)) { // 如果共享鎖的數量為0 if (sharedCount(c) == 0) { // 設置第一個線程為當前的線程 firstReader = current; // 設置HoldCount =1 firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
ReentrantReadWriteLock源碼分析(一)