RWMutex —— 細粒度的讀寫鎖

我們之前有講過 Mutex 互斥鎖。這是在任何時刻下只允許一個 goroutine 執行的序列化的鎖。而現在這個 RWMutex 就是在 Mutex 的基礎上進行了拓展能支援多個 goroutine 持有讀鎖,而在嘗試持有寫鎖時就會如 Mutex 一樣就會陷入等待鎖的釋放。它是一種細粒度的鎖。雖然可以允許多次持有讀鎖,但是 Go 團隊還特意囑咐,為了確保鎖的可用性,不能用於遞迴讀鎖。一個阻塞的鎖要排除正在持有鎖的新讀。

那麼上面說到的這些功能,RWMutex 是如何實現的呢?首先我們來看它的內部結構:

type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}

只有 5 個物件,其中最重要的就是 Mutex 鎖的欄位 w,它就是實現寫鎖的關鍵。

  • writerSem 是寫等待讀完成的訊號量
  • readerSem 是讀等待寫完成的訊號量
  • readerCount 正處於讀鎖的個數
  • readerWait 嘗試獲取寫鎖時讀等待的個數(這個怎麼理解?)

其中還有一個全域性的常數變數 rwmutexMaxReaders,表示最多的讀操作。

我們先來看寫鎖

Lock/UnLock 寫鎖/解鎖

func (rw *RWMutex) Lock() {
...
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}

這裡直接用到了 Mutex 互斥鎖來保證只有一個 goroutine 能進來。接下來就會判斷在獲取寫鎖的時候如果還存在其他的讀鎖沒有釋放,那麼這個時候就會陷入睡眠進入等待者佇列中等待所有的讀鎖被釋放之後喚醒

可能有些人對這個限制有些不懂,其實這就是為了保證鎖的區間的讀的值順序性的正確性。因為在獲取寫的時候,目的就是進行寫操作,所謂我就必須要在此時還存在其他可能會讀這個變數的讀鎖全部釋放才行。

而釋放寫鎖就是 UnLock 操作了。如果呼叫此操作時,本就沒有上鎖那麼就會直接拋異常。

func (rw *RWMutex) Unlock() {
...
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
...
}

如果還存在讀鎖時,那麼就會進入 runtime.Semrelease 對那些阻塞的讀鎖解鎖(找到對應的訊號量等待者佇列然後彈出喚醒)。最後釋放 w 鎖。

RLock/RUnlock 讀鎖/解鎖

func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
...
}

讀鎖就非常簡單了,僅僅只是對 readerCount 欄位自增。這裡的判斷要注意,這個判斷成立說明有協程呼叫了 rw.Lock 獲取了寫鎖。所以就要等待其它協程的釋放。

知道讀鎖的機制,那麼就能想到釋放讀鎖其實就是撤銷讀鎖,將 readerCount 欄位減1即可。

func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
...
}

同樣在釋放讀鎖時會判斷 r 是否為負數,如果為負數就說明有其它協程獲取了寫鎖,就會進入 rUnlockSlow 方法。

func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

如果鎖狀態已經是解鎖狀態則拋異常。

如果是隻剩下一個讀等待,則釋放寫訊號量通知其他正在嘗試持有寫鎖的協程上鎖。

關於訊號量的細節

我們上面分析了讀寫鎖的上鎖與解鎖的過程,其實有一個點不知道大家有沒有注意。就是關於訊號量的操作物件的細節。

  1. 呼叫 Lock 獲取寫鎖,會持有 writerSem 訊號
  2. 呼叫 Unlock 釋放寫鎖時,會釋放 readerSem 訊號
  3. 呼叫 RLock 獲取讀鎖時,會持有 readerSem 訊號
  4. 呼叫 RUnlock 釋放讀鎖時,會釋放 writerSem 訊號

大家有沒有發現其中的規律,這麼做的目的是什麼呢?

也就是說:我們在獲取寫鎖之前,會先等待讀鎖的釋放操作。而在獲取讀鎖時,會先等待寫鎖的釋放操作。

我們用反證法來假設這個場景:我這裡有一個連續的寫操作;那麼也就是說我要連續反覆的呼叫 Lock + Unlock 操作。如果沒有上面的訊號量的互相牽制,那麼就很容易出現讀操作沒法執行的問題,也就是說會”餓死“。

所以 RWMutex 加入讀寫訊號量的機制是為了更好達到 RW 的目的,而不是一直 W。

總結

  • 在呼叫 Lock 獲取寫鎖時,會先等待 RUnlock 將其 readerCount 置為 0,然後成功獲取寫鎖。

    • 還有一個操作是將 readerCount - rwmutexMaxReaders,其目的是為了阻塞後續的 RLock 操作。即在讀取寫鎖其他任何讀寫操作都不允許了。
  • 在呼叫 Unlock 釋放寫鎖時,會通知所有讀操作,解鎖那些阻塞的讀鎖,然後成功釋放寫鎖。