1. 程式人生 > >(2.1.27.11)Java併發程式設計:Lock之ReentrantReadWriteLock 讀寫分離獨享式重入鎖

(2.1.27.11)Java併發程式設計:Lock之ReentrantReadWriteLock 讀寫分離獨享式重入鎖

我們在介紹AbstractQueuedSynchronizer的時候介紹過,AQS支援獨佔式同步狀態獲取/釋放、共享式同步狀態獲取/釋放兩種模式,對應的典型應用分別是ReentrantLock和Semaphore

AQS還可以混合兩種模式使用,讀寫鎖ReentrantReadWriteLock就是如此。

設想以下情景:我們在系統中有一個多執行緒訪問的快取,多個執行緒都可以對快取進行讀或寫操作,但是讀操作遠遠多於寫操作,要求寫操作要執行緒安全,且寫操作執行完成要求對當前的所有讀操作馬上可見。

分析上面的需求:

  • 因為有多個執行緒可能會執行寫操作,因此多個執行緒的寫操作必須同步序列執行;
  • 而寫操作執行完成要求對當前的所有讀操作馬上可見,這就意味著當有執行緒正在讀的時候,要阻塞寫操作,當正在執行寫操作時,要阻塞讀操作。

一個簡單的實現就是將資料直接加上互斥鎖,同一時刻不管是讀還是寫執行緒,都只能有一個執行緒操作資料。但是這樣的問題就是如果當前只有N個讀執行緒,沒有寫執行緒,這N個讀執行緒也要傻呵呵的排隊讀,儘管其實是可以安全併發提高效率的。

因此理想的實現是:

  1. 當有寫執行緒時,則寫執行緒獨佔同步狀態。
  2. 當沒有寫執行緒時只有讀執行緒時,則多個讀執行緒可以共享同步狀態。

讀寫鎖就是為了實現這種效果而生。

一、使用示例

我們先來看一下讀寫鎖怎麼使用,這裡我們基於hashmap(本身執行緒不安全)做一個多執行緒併發安全的快取:

public class ReadWriteCache {
    private static Map<String, Object> data = new HashMap<>();
    private static ReadWriteLock lock = new ReentrantReadWriteLock(false);
    private static Lock rlock = lock.readLock();
    private static Lock wlock = lock.writeLock();

    public static Object get(String key) {
        rlock.lock();
        try {
            return data.get(key);
        } finally {
            rlock.unlock();
        }
    }

    public static Object put(String key, Object value) {
        wlock.lock();
        try {
            return data.put(key, value);
        } finally {
            wlock.unlock();
        }
    }

}

限於篇幅我們只實現2個方法,get和put。從程式碼可以看出:

  1. 我們先建立一個 ReentrantReadWriteLock 物件,建構函式 false 代表是非公平的(非公平的含義和ReentrantLock相同)。
  2. 然後通過readLock、writeLock方法分別獲取讀鎖和寫鎖。
    • 在做讀操作的時候,也就是get方法,我們要先獲取讀鎖;
    • 在做寫操作的時候,即put方法,我們要先獲取寫鎖。

通過以上程式碼,我們就構造了一個執行緒安全的快取,達到我們之前說的:寫執行緒獨佔同步狀態,多個讀執行緒可以共享同步狀態。

二、原始碼分析

2.1 ReentrantReadWriteLock整體結構

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {

    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
	
    final Sync sync;

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }


    abstract static class Sync extends AbstractQueuedSynchronizer {}

    static final class NonfairSync extends Sync {}

    static final class FairSync extends Sync {}

    public static class ReadLock implements Lock, java.io.Serializable {}

    public static class WriteLock implements Lock, java.io.Serializable {}
}

可以看到,在公平鎖與非公平鎖的實現上,與ReentrantLock一樣,也是有一個繼承AQS的內部類Sync,然後NonfairSync和FairSync都繼承Sync,通過建構函式傳入的布林值決定要構造哪一種Sync例項。

讀寫鎖比ReentrantLock多出了兩個內部類:ReadLock和WriteLock, 用來定義讀鎖和寫鎖,然後在建構函式中,會構造一個讀鎖和一個寫鎖例項儲存到成員變數 readerLock 和 writerLock。

需要注意的是:讀鎖和寫鎖例項是共享一個AQS,也就是說,共享一個FIFO佇列。 這也是實現讀寫之間相互影響的關鍵

2.2 讀寫鎖

我們還是先回顧下lock()的內容:

方法名稱 描述
void lock() 獲取鎖. 成功則向下執行,失敗則阻塞
void lockInterruptibly() throws InterruptedException 可中斷地獲取鎖,在當前執行緒獲取鎖的過程中可以響應中斷訊號
boolean tryLock() 嘗試非阻塞獲取鎖,呼叫方法後立即返回,成功返回true,失敗返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException 在超時時間內獲取鎖,到達超時時間將返回false,也可以響應中斷
void unlock(); 釋放鎖
Condition newCondition(); 獲取等待通知元件實現訊號控制,等待通知元件實現類似於Object.wait()方法的功能
  • 共享式讀鎖
    • 內部呼叫共享式AQS操作,因此真實的實現就是Sync的 acquireShared 和 releaseShared
	public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164L;
        private final Sync sync;
		
		 protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
		
		public void lock() {
            sync.acquireShared(1);
        }
		public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
		public boolean tryLock() {
            return sync.tryReadLock();//非阻塞式嘗試獲取讀鎖
        }
		public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
		public void unlock() {
            sync.releaseShared(1);
        }
		public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
	}
  • 獨享式寫鎖
    • 內部呼叫獨享式AQS操作,因此真實的實現就是Sync的 acquire 和 release
    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164L;
        private final Sync sync;
		
		protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
		
		public void lock() {
            sync.acquire(1);
        }
		public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
		public boolean tryLock() {
            return sync.tryWriteLock();//非阻塞式嘗試獲取寫鎖
        }
		public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
		public void unlock() {
            sync.release(1);
        }
		public Condition newCondition() {
            return sync.newCondition();
        }
		
		
		public boolean isHeldByCurrentThread() {
            return sync.isHeldExclusively();
        }
		public int getHoldCount() {
            return sync.getWriteHoldCount();
        }
	}

我們可以看到基本核心的程式碼還是在AQS中實現

2.3 AQS的實現

在上篇我們已經講到了,AQS需要重寫的鉤子方法:

方法名稱 描述
boolean tryAcquire(int arg) 獨佔式嘗試獲取同步狀態(通過CAS操作設定同步狀態),如果成功返回true,反之返回false
boolean tryRelease(int arg) 獨佔式釋放同步狀態,成功返回true,失敗返回false。
int tryAcquireShared(int arg) 共享式的獲取同步狀態,返回大於等於0的值,表示獲取成功,反之失敗。
boolean tryReleaseShared(int arg) 共享式釋放同步狀態,成功返回true,失敗返回false。
boolean isHeldExclusively() 判斷同步器是否在獨佔模式下被佔用,一般用來表示同步器是否被當前執行緒佔用

2.3.1 state的改變

之前在ReentrantLock中,我們知道鎖的狀態是儲存在Sync例項的state欄位中的(繼承自父類AQS): 0代表無鎖狀態; >0時代表有鎖,具體值為重入次數

現在有了讀寫兩把鎖,然而可以看到還是隻有一個Sync例項,那麼一個Sync例項的state是如何同時儲存兩把鎖的狀態的呢?

答案就是用了位分隔:

abstract static class Sync extends AbstractQueuedSynchronizer {
		static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT); //每次要讓共享鎖+1,就應該讓state加 1<<16
		
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  //每種鎖的最大重入數量
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** 要獲取共享鎖當前的重入數量 */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** 獲取獨佔鎖當前的重入數量  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
		
		
		private transient Thread firstReader;
        private transient int firstReaderHoldCount;

        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        }

}

state欄位是32位的int,讀寫鎖用state的低16位儲存寫鎖(獨佔鎖)的狀態;高16位儲存讀鎖(共享鎖)的狀態

因此:

  1. 要獲取獨佔鎖當前的重入數量,就是 state & ((1 << 16) -1) (即 exclusiveCount 方法)
  2. 要獲取共享鎖當前的重入數量,就是 state >>> 16 (即 sharedCount 方法)

2.3.2 獨享式寫鎖對應的AQS

獨享式寫鎖內部呼叫獨享式AQS操作,因此真實的實現就是Sync的 acquire 和 release。 對應的式AQS的鉤子函式 tryAcquiretryRelease

2.3.2.1 獨享式寫鎖的獲取

abstract static class Sync extends AbstractQueuedSynchronizer {
	protected final boolean tryAcquire(int acquires) {
				Thread current = Thread.currentThread();
				
				int c = getState();//獲取鎖狀態
				int w = exclusiveCount(c); //獲取獨佔鎖的重入數
				
				// 【實現重入】當前state不為0(已被讀或寫佔據當前鎖)
				if (c != 0) { 
					if (w == 0 || current != getExclusiveOwnerThread())//如果寫鎖重入為0說明讀鎖此時被佔用, 或者不被當前寫執行緒佔據,則返回false;
						return false;
					if (w + exclusiveCount(acquires) > MAX_COUNT)
						throw new Error("Maximum lock count exceeded"); //寫鎖重入數溢位
					// Reentrant acquire
					setState(c + acquires); //走到這裡標識,被寫鎖佔據且時當前執行緒佔據,重入數增加
					return true;
				}
				
				//【實現搶佔】到這裡了說明state為0(沒有被佔用)。
				if (writerShouldBlock() ||  //writerShouldBlock是為了實現公平或非公平策略的
					!compareAndSetState(c, c + acquires))//則嘗試通過CAS來搶佔,搶佔成功,直接返回true
					return false;
					
				setExclusiveOwnerThread(current);
				return true;
			}
			
	}
}

static final class NonfairSync extends Sync {

	//不公平鎖一直返回 fasle, 從而直接誘發CAS搶佔
	final boolean writerShouldBlock() {
            return false; // writers can always barge
    }
}		

static final class NonfairSync extends Sync {

	//公平鎖則判斷是否由前置阻塞的節點, 從而判斷是否誘發CAS搶佔。 
	final boolean writerShouldBlock() {
			return hasQueuedPredecessors();
	}
}	
  • writerShouldBlock 實現公平或非公平策略的
    • 不公平鎖一直返回 fasle, 從而直接誘發CAS搶佔
    • 公平鎖則判斷是否由前置阻塞的節點, 從而判斷是否誘發CAS搶佔。
      • true標識有前置阻塞節點,則tryAcquire直接返回false
      • false標識沒有前置阻塞節點,誘發CAS搶佔。

2.3.2.2 獨享式寫鎖的釋放

abstract static class Sync extends AbstractQueuedSynchronizer {
	protected final boolean tryRelease(int releases) {
				if (!isHeldExclusively())
					throw new IllegalMonitorStateException();  //非當前執行緒佔據則直接拋異常
					
				int nextc = getState() - releases;//【計算 鎖狀態】
				
				boolean free = exclusiveCount(nextc) == 0;//作為可重入數,state在大於0時標識重入次數,必須退出對應次數時,才算真正的退出
				if (free) 
					setExclusiveOwnerThread(null); //如果獨佔模式重入數為0了,說明獨佔模式被釋放
					
				setState(nextc);  //更新鎖狀態
				return free;
	}
}

2.3.3 共享式讀鎖對應的AQS

類似於寫鎖,讀鎖的lock和unlock的實際實現對應Sync的 tryAcquireShared 和 tryReleaseShared方法。

2.3.3.1 共享式讀鎖的獲取

abstract static class Sync extends AbstractQueuedSynchronizer {

	private transient Thread firstReader;//首個獲取讀鎖的執行緒
	private transient int firstReaderHoldCount;//首個獲取讀鎖的執行緒的重入數

	Sync() {
		readHolds = new ThreadLocalHoldCounter();
		setState(getState()); // ensures visibility of readHolds
	}
			
	protected final int tryAcquireShared(int unused) {
				Thread current = Thread.currentThread();
				
				int c = getState();//【1】獲取鎖狀態
				
				//【2】如果獨佔模式被佔且不是當前執行緒持有,則獲取失敗
				//表示:如果存在寫鎖,則當前讀鎖獲取失敗
				if (exclusiveCount(c) != 0 &&
					getExclusiveOwnerThread() != current)
					return -1; 
				
				//【3】獲取 共享鎖重入數
				int r = sharedCount(c);
				
				//【4】如果公平策略沒有要求阻塞且重入數沒有到達最大值,則直接嘗試CAS更新state
				if (!readerShouldBlock() &&
					r < MAX_COUNT &&
					compareAndSetState(c, c + SHARED_UNIT)) {
					
					//【5】成功獲取讀鎖後,內部變數的更新操作
					if (r == 0) {
						//如果r=0, 表示,當前執行緒為第一個獲取讀鎖的執行緒。
						firstReader = current;
						firstReaderHoldCount = 1;
					} else if (firstReader == current) {
						//如果第一個獲取讀鎖的物件為當前物件,將firstReaderHoldCount 加一。
						firstReaderHoldCount++;
					} else {
						//當前執行緒功獲取鎖後,如果不是第一個獲取多鎖的執行緒
						// 將該執行緒持有鎖的次數資訊,放入執行緒本地變數中,方便計算當前執行緒的重入次數
						HoldCounter rh = cachedHoldCounter;
						if (rh == null || rh.tid != getThreadId(current))
							cachedHoldCounter = rh = readHolds.get();
						else if (rh.count == 0)
							readHolds.set(rh);
						rh.count++;
					}
					
					return 1;
				}
				
				return fullTryAcquireShared(current); //用來處理CAS沒成功的情況,邏輯和上面的邏輯是類似的,就是加了無限迴圈
	}
}

static final class NonfairSync extends Sync {

	 final boolean readerShouldBlock() {
		//不公平鎖一直返回 fasle, 從而直接誘發CAS搶佔
		return apparentlyFirstQueuedIsExclusive();
	 }
	 
	 //這個方法判斷佇列的head.next是否正在等待獨佔鎖(寫鎖)。這個方法的意思是:讀鎖不應該讓寫鎖始終等待。
	 //該方法如果頭節點不為空,並頭節點的下一個節點不為空,並且不是共享模式【獨佔模式,寫鎖】、並且執行緒不為空,則返回true。
	 final boolean apparentlyFirstQueuedIsExclusive() {
		Node h, s;
		return (h = head) != null &&
			(s = h.next)  != null &&
			!s.isShared()         &&
			s.thread != null;
	 }

}

static final class FairSync extends Sync {
	//公平鎖則判斷是否由前置阻塞的節點, 從而判斷是否誘發CAS搶佔。 
	final boolean readerShouldBlock() {
		return hasQueuedPredecessors();
	}
}

我們主要看下CAS更新成功後(獲取讀鎖成功)的操作:

在firstReaderHoldCount中或readHolds(ThreadLocal型別的)的本執行緒副本中記錄當前執行緒重入數(淺藍色程式碼),這是為了實現jdk1.6中加入的getReadHoldCount()方法的

這個方法能獲取當前執行緒重入共享鎖的次數(state中記錄的是多個執行緒的總重入次數)

加入了這個方法讓程式碼複雜了不少,但是其原理還是很簡單的:如果當前只有一個執行緒的話,還不需要動用ThreadLocal,直接往firstReaderHoldCount這個成員變數裡存重入數,當有第二個執行緒來的時候,就要動用ThreadLocal變數readHolds了,每個執行緒擁有自己的副本,用來儲存自己的重入數。

fullTryAcquireShared如果CAS失敗或readerShouldBlock方法返回true,我們呼叫fullTryAcquireShared方法繼續試圖獲取讀鎖。fullTryAcquireShared方法是tryAcquireShared方法的完整版,或者叫升級版,它處理了CAS失敗的情況和readerShouldBlock返回true的情況。

final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
				//【1】獲取鎖狀態
                int c = getState();
				
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
					//【2】如果獨佔模式被佔且不是當前執行緒持有,則獲取失敗
				    //表示:如果存在寫鎖,則當前讀鎖獲取失敗
                        return -1;
                } else if (readerShouldBlock()) {
                    //【3】如果沒有執行緒正在持有寫鎖,則呼叫readerShouldBlock檢測根據公平原則,當前執行緒是否應該進入等待佇列。
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
				
				//共享鎖溢位
                if (sharedCount(c) == MAX_COUNT)
					
                    throw new Error("Maximum lock count exceeded");
				
				//嘗試CAS更新state
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
}

2.3.3.2 共享式讀鎖的釋放


protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            //下邊程式碼也是為了實現jdk1.6中加入的getReadHoldCount()方法,在更新當前執行緒的重入數。
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
			
            //這裡是真正的釋放同步狀態的邏輯,就是直接同步狀態-SHARED_UNIT,然後CAS更新,沒啥好說的
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }
  • 共享式鎖的【更新鎖狀態】不能通過setState,而是通過CAS操作。 這是由於獨享式鎖釋放時肯定是單執行緒模型的,而共享式的則可能是多執行緒模型
  • 不需要呼叫setExclusiveOwnerThread 設定獨佔狀態

三、補充內容

通過上面的原始碼分析,我們可以發現一個現象:

  1. 線上程持有讀鎖的情況下,該執行緒不能取得寫鎖(因為獲取寫鎖的時候,如果發現當前的讀鎖被佔用,就馬上獲取失敗,不管讀鎖是不是被當前執行緒持有)
  2. 線上程持有寫鎖的情況下,該執行緒可以繼續獲取讀鎖(獲取讀鎖時如果發現寫鎖被佔用,只有寫鎖沒有被當前執行緒佔用的情況才會獲取失敗)

細想想,這個設計是合理的:因為

  1. 當執行緒獲取讀鎖的時候,可能有其他執行緒同時也在持有讀鎖,因此不能把獲取讀鎖的執行緒“升級”為寫鎖;
  2. 而對於獲得寫鎖的執行緒,它一定獨佔了讀寫鎖,因此可以繼續讓它獲取讀鎖
    • 當它同時獲取了寫鎖和讀鎖後,還可以先釋放寫鎖繼續持有讀鎖,這樣一個寫鎖就“降級”為了讀鎖。

綜上:

  • 一個執行緒要想同時持有寫鎖和讀鎖,必須先獲取寫鎖再獲取讀鎖;
  • 寫鎖可以“降級”為讀鎖;
  • 讀鎖不能“升級”為寫鎖。