1. 程式人生 > >ReentrantReadWriteLock源碼分析(一)

ReentrantReadWriteLock源碼分析(一)

rac tel 機制 進行 current exce 記錄 分別是 imu

此處源碼分析,主要是基於讀鎖,非公平機制,JDK1.8。

問題:

1、ReentrantReadWriteLock是如何創建讀鎖與寫鎖?

2、讀鎖與寫鎖的區別是什麽?

3、鎖的重入次數與獲取鎖的線程數分別是用哪種方式記錄的?

4、當隊列中出現多個共享模式的線程節點連續排列時,那麽當第一個共享模式的線程拿到鎖之後,後面的共享線程節點怎麽獲取鎖?

一、創建ReadLock。

ReentrantReadWriteLock rrw = new ReentrantReadWriteLock();
public ReentrantReadWriteLock(boolean fair) {
	sync = fair ? new FairSync() : new NonfairSync();
	readerLock = new ReadLock(this);
	writerLock = new WriteLock(this);
}
rrw.readLock().lock();

1、當fair的值為false時,非公平的方式創建鎖,當fair的值為true時,公平的方式創建鎖。

2、初始化readerLock與writerLock,這兩個變量是ReentrantReadWriteLock的內部變量。

3、sync執行非公平的鎖。

二、lock()源碼分析

2.1、sync.acquireShared(1)

public void lock() {
	sync.acquireShared(1);
}
public final void acquireShared(int arg) {
	if (tryAcquireShared(arg) < 0)
		doAcquireShared(arg);
}

(1)tryAcquireShared的作用是當前線程獲取讀鎖,當返回1時,表示獲取成功,-1表示獲取失敗。

(2)doAcquireShared,表示獲取失敗的時候調用。將獲取失敗的線程加入到等待隊列中,並調用LockSupport.park方法阻塞住,等待線程釋放permit。

2.2、tryAcquireShared(arg)

protected final int tryAcquireShared(int unused) {

	Thread current = Thread.currentThread();
	// 獲取到占有鎖的線程數
	int c = getState();
	// 如果寫鎖被占領了且不是當前線程占領,那麽直接返回 -1
	if (exclusiveCount(c) != 0 &&
		getExclusiveOwnerThread() != current)
		return -1;
	int r = sharedCount(c); // 占有共享鎖的線程數
	if (!readerShouldBlock() && // 如果隊列的頭節點的next節點是獨享模式的線程節點即獲取寫鎖的線程節點,返回true
		r < MAX_COUNT && 	// 共享的數據不能超過65535
		compareAndSetState(c, c + SHARED_UNIT)) {  // cas設置state
		if (r == 0) {   // 線程來拿讀鎖,讀鎖和寫鎖沒有被任何線程擁有,那麽r==0
			firstReader = current; // 
			firstReaderHoldCount = 1;
		} else if (firstReader == current) { // 如果線程重復獲取讀鎖,那麽從這裏開始重入
			firstReaderHoldCount++;
		} else { // 如果讀鎖被線程x占領,線程y也要來申請讀鎖,那麽分支就走到這裏了

		// HoldCounter類中存儲了兩個屬性,一個是count,用於記錄線程的重入次數,一個是tid,記錄當前線程的id
			HoldCounter rh = cachedHoldCounter;
			
			// 線程x擁有讀鎖之後,線程y第一次申請的時候會走到這裏
			//cachedHoldCounter 是一個緩存,保存當前操作線程的上一個線程的操作結果。線程y操作完之後,就會保存線程y的信息
			// 如果另外一個線程z來獲取到讀鎖的時候,雖然rh!=null,但是rh.tid != getThreadId(current),
			//那麽會創建一個默認的HoldCounter,並保存到cachedHoldCounter,並且默認的count=0
			if (rh == null || rh.tid != getThreadId(current))
			// readHolds.get(),查看源碼可以知道,在這個方法中包含了數據初始化的過程,會調用ReentrantReadWriteLock.java
			// 下面的方法
			/**
			* public HoldCounter initialValue() {
               *             return new HoldCounter();
                 *       }
                 */
				cachedHoldCounter = rh = readHolds.get(); 
			else if (rh.count == 0) // 這個分支也會來到,當線程釋放鎖,但是沒有關閉,當再次調用線程時,readHolds中會存在HoldCounter,count=0
				readHolds.set(rh);
			rh.count++; // 計算重入的次數
		}
		return 1;
	}
	return fullTryAcquireShared(current);
}

請註意:

(1)ReentrantReadWriteLock中維持了一個類ThreadLocalHoldCounter,這個類會生成一個map,key是線程的id,value是HoldCounter對象,HoldCounter對象如下:

   static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

其中count就是線程的重入次數,tid就是當前線程的id。這個是與ReentrantLock區別的地方。

(2)ReentrantReadWriteLock使用32位int類型來表示占有鎖的線程數,其中高16位是獲取到讀鎖的線程數,低16位是獲取到寫鎖的線程數,提供了計算線程數的方法。

static final int SHARED_SHIFT   = 16;(1)
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);(2)
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;(3)
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;(4)

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }(5)
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }(6)
c + SHARED_UNIT

其中(1)是共享移動常量;(2)是共享添加的常量;(3)是最大線程數65535(也就是11111111 11111111);(4)跟(3)一樣;(5)計算共享線程數,把c的值向右移16為,並且高位補0; >>> 無符號右移,高位補0;(6)計算獨享的線程數,把c的值與11111111 11111111 按位與,這樣其實就是取到了寫鎖的線程數;(7)是共享線程+1。

源碼分析:

2.2.1、readerShouldBlock()

這個方法的作用是把判斷當前獲取讀鎖的線程是否需要阻塞,條件是:在等待隊列中頭節點的下一個節點是獨享模式的線程。

// 讀鎖應該被阻塞
final boolean readerShouldBlock() {
	return apparentlyFirstQueuedIsExclusive();
}

/**
 * Returns {@code true} if the apparent first queued thread, if one
 * exists, is waiting in exclusive mode.  If this method returns
 * {@code true}, and the current thread is attempting to acquire in
 * shared mode (that is, this method is invoked from {@link
 * #tryAcquireShared}) then it is guaranteed that the current thread
 * is not the first queued thread.  Used only as a heuristic in
 * ReentrantReadWriteLock.
 *如果第一個入隊列的線程節點存在,並且工作在獨享模式下,那麽返回true;
 *如果這個方法返回true,並且當前線程以共享的模式獲取鎖,這個方法保證了它不是第一個入隊列的
 *(讀鎖與讀鎖都是共存的,所以不會入隊,只有當隊列中有獨享模式的線程節點的時候,獲取共享模式的線程才會加入到隊列中。)
 */
final boolean apparentlyFirstQueuedIsExclusive() {
	Node h, s;
	// 頭節點存在,並且存在下一個節點,下一個節點是獨享模式,下一個節點的thread不是空,則返回true
	return (h = head) != null &&
		(s = h.next)  != null &&
		!s.isShared()         &&
		s.thread != null;
}

2.2.2、fullTryAcquireShared(current)

這個方法的作用與tryAcquireShared的作用很類似。

// 進入這個方法的條件,
/**條件1:!readerShouldBlock() && // 如果第一個入隊列的線程節點存在,並且工作在獨享模式下,那麽返回true;
	*	條件2:r < MAX_COUNT && 	// 共享的數據不能超過65535,讀鎖的線程數已經超過了65535
	*	條件3:compareAndSetState(c, c + SHARED_UNIT) // 兩個競爭讀鎖的線程都運行到這裏,第一個競爭成功,那麽第二個就會競爭失敗,返回false
    *  其實這個方法分別對這三種狀態進行處理
   */
   /**
         * Full version of acquire for reads, that handles CAS misses
         * and reentrant reads not dealt with in tryAcquireShared.
         */
        final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                // 如果排他鎖被別的線程拿了,直接返回-1
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {  // 這裏是對條件1的處理
                // 如果隊列的頭的下一個節點是請求的排他鎖的線程在等待,那麽就返回true
                    // Make sure we‘re not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                // 如果當前線程的count==0,也就是說當前線程才進來,沒有獲取到鎖,那麽直接把它從readHolds中移除
                                if (rh.count == 0)
                                // 移除當前線程的HoldCounter
                                    readHolds.remove();
                            }
                        }
                        // 移除之後,返回-1
                        if (rh.count == 0)
                            return -1;
                    }
                }
                // 這裏是對條件2的處理,直接拋出錯誤!
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                    // 這裏是對條件3的處理,競爭設置state,如果競爭還是失敗,那麽就要再循環一次,直到死循環能夠跳出去
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                // 如果共享鎖的數量為0
                    if (sharedCount(c) == 0) {
                    // 設置第一個線程為當前的線程
                        firstReader = current;
                        // 設置HoldCount =1
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

  

 

  

ReentrantReadWriteLock源碼分析(一)