深入理解 sync.RWMutex:解決讀者-寫者問題
在某個資料需要被多個執行緒共享訪問的時候,會出現讀者-寫者問題(這裡的「問題」是複數形式的,因為讀者-寫者問題有多個變種)。訪問共享資料的執行緒有兩種型別:讀者和寫者。讀者只會讀取資料,而寫者則是修改它。當寫者擁有了訪問資料的許可權後,其它的執行緒(不管是讀者還是寫者)都不能訪問這個資料。這種約束的需求在現實中是存在的,比如:當寫者不能原子性地修改某個資料(例如資料庫)時,在修改完成之前,要讀取這個資料的讀者要被阻塞,以免讀者獲取到損壞的資料(髒資料)。對於讀者-寫者問題的核心,還有很多修訂的限制,比如:
- 寫者不能餓死(無限地等待執行的機會)
- 讀者不能餓死
- 所有執行緒都不能餓死
像ofollow,noindex" target="_blank">sync.RWMutex 這種多讀者-單寫者互斥鎖的實現,可以解決其中的一些讀者-寫者問題。我們現在來看看要怎麼用 Go 解決這些問題,並瞭解這種解決方案能給到怎樣的保證。
作為彩蛋,我們還會了解到怎麼對互斥體(mutex)進行效能分析。
用法
在我們深入到程式碼實現細節之前,我們先來實戰演練一下sync.RWMutex
的用法。下面的程式使用讀寫鎖來保護臨界區 ——sleep()
。為了展示整個過程,臨界區還添加了一些程式碼來記錄當前有多少讀者和寫者正在臨界區裡面。(完整原始碼
)
package main import ( "fmt" "math/rand" "strings" "sync" "time" ) func init() { rand.Seed(time.Now().Unix()) } func sleep() { time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) } func reader(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) { sleep() m.RLock() c <- 1 sleep() c <- -1 m.RUnlock() wg.Done() } func writer(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) { sleep() m.Lock() c <- 1 sleep() c <- -1 m.Unlock() wg.Done() } func main() { var m sync.RWMutex var rs, ws int rsCh := make(chan int) wsCh := make(chan int) go func() { for { select { case n := <-rsCh: rs += n case n := <-wsCh: ws += n } fmt.Printf("%s%s\n", strings.Repeat("R", rs), strings.Repeat("W", ws)) } }() wg := sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) go reader(rsCh, &m, &wg) } for i := 0; i < 3; i++ { wg.Add(1) go writer(wsCh, &m, &wg) } wg.Wait() }
在 play.golang.org 網站上執行的程式,它們的執行環境是固定的,(比如時間的初始值,time.Now().Unix() 每次都會返回一個固定的值),所以rand.Seed(time.Now().Unix())
會產生相同的隨機數種子,從而導致程式輸出每次都一樣。你可以每次都手動給定一個不一樣的隨機數種子,或者可以在你的機器上執行上述程式碼。
示例輸出:
W R RR RRR RRRR RRRRR RRRR RRR RRRR RRR RR R W R RR RRR RRRR RRR RR R W
當在臨界區裡面的 go routine 數量發生變化時,程式就會換行。這些輸出可以體現RWMutex
“要麼允許多個讀者訪問,要麼允許一個寫者訪問”的特性。
還有一個重點是,一旦有一個寫者呼叫Lock()
後 ,後續試圖訪問臨界區的讀者將會被阻塞,如果當前已經有讀者正在臨界區內,則寫者會等待這些讀者離開臨界區後,再鎖定臨界區。這個特性體現在程式的輸出上,就是每次 W 出現的前幾行, R 的數量都是逐個減少的。
... RRRRR RRRR RRR RR R W ...
當寫者執行完畢後,前面被阻塞的讀者就可以恢復執行,並且新的寫者也可以開始嘗試進入臨界區了。值得一提的是,如果一個寫者剛剛執行完畢,而現在同時有讀者和寫者等待進入臨界區的話,那麼永遠都是在等待的讀者們先等到進入臨界區的機會,下一個才到寫者。這樣一來,讀者和寫者都不會被餓死了。
程式碼實現
注意作者編寫本文時分析的程式碼版本(718d6c58 )與最新的版本相比可能會有差異。
RWMutex
為讀者暴露了兩個方法(RLock
和RUnlock
),同時專門為寫者也暴露了兩個方法(Lock
和Unlock
)。
RLock
為了程式碼簡潔起見,我們跳過那些跟競態檢測器相關的程式碼(用...
表示)。
func (rw *RWMutex) RLock() { ... if atomic.AddInt32(&rw.readerCount, 1) < 0 { runtime_SemacquireMutex(&rw.readerSem, false) } ... }
readerCount
欄位的型別是int32
,它表示當前啟用的讀者——包括了所有正在臨界區裡面的讀者或者被寫者阻塞的等待進入臨界區讀者的數量。相當於是當前呼叫了RLock
函式並且還沒呼叫RUnLock
函式的讀者的數量。
atomic.AddInt32 是下面程式碼的原子版本:
*addr += delta return *addr
其中addr
是*int32
型別的而delta
是int32
型別的。由於這是個原子性的操作,所以增加delta
不會有干擾到其它執行緒的風險。(更多關於 Fetch-and-add 的資料詳見這裡
)
如果我們完全沒有用到寫者的話,readerCount
會一直大於或等於 0 (譯註:後面會講到,一旦有寫者呼叫Lock
,Lock
函式就會把readerCount
設定為負數),並且讀者獲取鎖的過程會走較快的非阻塞的分支,因為這時候讀者獲取鎖的過程只涉及到atomic.AddInt32
的呼叫。
訊號量(semaphore)
這是一個由 Edsger Dijkstra 提出的資料結構,解決很多關於同步的問題時,它都很好用。它是一個提供了兩種操作的整數:
- 獲取(acquire,又稱 wait、decrement 或者 P)
- 釋放(release,又稱 signal、increment 或者 V)
獲取操作把訊號量減一,如果減一的結果是非負數,那麼執行緒可以繼續執行。如果結果是負數,那麼執行緒將會被阻塞,除非有其它執行緒把訊號量增加回非負數,該執行緒才有可能恢復執行)。
釋放操作把訊號量加一,如果當前有被阻塞的執行緒,那麼它們其中一個會被喚醒,恢復執行。
Go 語言的執行時提供了runtime_SemacquireMutex
和runtime_Semrelease
函式,像sync.RWMutex
這些物件的實現會用到這兩個函式。
Lock 方法
func (rw *RWMutex) Lock() { ... rw.w.Lock() // 通過把 rw.readerCount 設定成負數,來告知讀者當前有寫者正在等待進入臨界區 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false) } ... }
Lock
方法讓寫者可以獲得對共享資料的獨佔
訪問權:
首先它會獲取一個叫w
的互斥量(mutex),這會使得其它的寫者無法訪問這個共享資料,這個w
只有在Unlock
函式快結束的時候,才會被解鎖,從而保證一次最多隻能有一個寫者進入臨界區。
然後Lock
方法會把readerCount
的值設定成負數,(通過把readerCount
減掉rwmutexMaxReaders
(即1 << 30
))。然後接下來任何讀者呼叫RLock
函式時,都會被阻塞掉了:
if atomic.AddInt32(&rw.readerCount, 1) < 0 { // rw.readerCount 是負數,說明有寫者正在等待進入臨界區或者正在臨界區內,等待寫者執行完成 runtime_SemacquireMutex(&rw.readerSem, false) }
後續來到臨界區的讀者們將會被阻塞,那正在執行的讀者們會怎樣呢?readerWait
欄位就是用來記錄當前有多少讀者正在執行。寫者阻塞在訊號量rw.writerSem
裡,直到最後一個正在執行的讀者執行完畢,它呼叫的RUnlock
方法會把rw.writerSem
訊號量加一(我後面會講到),這時寫者才能被喚醒、進入臨界區。
如果沒有正在執行的讀者,那麼寫者就可以直接進入臨界區了。
rwmutexMaxReaders
(譯註:原文大量使用的pending
這個詞常常被翻譯為「掛起」(有暫停的語義),但是在本文中,pending 表示的是「等待進入臨界區(這時是執行緒是暫停的)或者正在臨界區裡面(這時是執行緒正在執行的)」這個狀態。「掛起」不能很好的表達該語義,所以 pending 保留原文不翻譯,但讀者要注意 pending 在本文的語義,
例如:「一個 pending 的讀者」可以理解為是一個呼叫了RLock
函式但是還沒呼叫RUnlock
函式的讀者。「一個 pending 的寫者」則相應地表示一個呼叫了Lock
函式但是還沒呼叫Unlock
函式的寫者
)
在rwmutex.go 裡面有一個常量:
const rwmutexMaxReaders = 1 << 30
這個1 << 30
是什麼意思、做什麼用的呢?
readerCount
欄位是int32
型別的,它的有效範圍是:
[-1 << 31, (1 << 31) - 1] 或者說 [-2147483648, 2147483647]
RWMutex 使用這個欄位來記錄當前 pending 的讀者數,並且這個欄位還標記著當前是否有寫者在 pending 狀態。在Lock
方法裡面:
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
readerCount
欄位被減掉了1<<30
。當readerCount
的值為負數時,說明當前存在 pending 狀態的寫者。而readerCount
再加回1<<30
,又能表示當前 pending 的讀者的數量。最後,rwmutexMaxReaders
還限制了 pending 讀者的數量。如果我們的當前 pending 的讀者數量比rwmutexMaxReaders
還要多的話,那麼readerCount
減去rwmutexMaxReaders
就不是負數了,這樣整個機制都會被破壞掉。從中我們可以知道,pending 的讀者數量不能大於rwmutexMaxReaders - 1
,它的值超過了 10 億——1073741823。
RUnlock
func (rw *RWMutex) RUnlock() { ... if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false) } } ... }
這個方法會把readerCount
減一 (之前是RLock
方法把這個值增加了的),如果readerCount
是負數,意味著當前存在 pending 狀態的寫者,因為正如上面所說的,在寫者呼叫Lock
方法的時候,readerCount
的值會減掉rwmutexMaxReaders
,從而使readerCount
變成負數。
然後這個方法會檢查當前正在臨界區裡面的讀者數是不是已經是 0 了,如果是的話,意味著等待進入臨界區的寫者可以獲取到rw.writerSem
訊號量、進入臨界區了。
Unlock
func (rw *RWMutex) Unlock() { ... r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false) } rw.w.Unlock() ... }
要解鎖寫者擁有的寫鎖,首先readerCount
的值要增加rwmutexMaxReaders
,這個操作會使得readerCount
恢復成非負數,如果這時候readerCount
大於 0,這意味著當前有讀者在等待著寫者離開臨界區。最後寫者釋放掉它擁有的w
這個互斥量(譯註:上文說過,這個互斥量是寫者用來防止其它寫者
進入臨界區的),這使得其它寫者能夠有機會再次鎖定w
這個互斥量。
如果讀者或寫者嘗試在一個已經解鎖的 RWMutex 上呼叫Unlock
和RUnlock
方法會丟擲錯誤(程式碼
):
m := sync.RWMutex{} m.Unlock()
輸出:
fatal error: sync: Unlock of unlocked RWMutex ...
遞迴地讀鎖定
文件裡面寫道:
如果一個 goroutine 擁有一個讀鎖,而另外一個 goroutine 又呼叫了Lock
函式,那麼在第一個讀鎖被釋放之前,沒有讀者可以獲得讀鎖。這尤其限制了我們不能遞迴地獲取讀鎖,因為只有這樣才能確保鎖都能變得可用,一個Lock
的呼叫會阻止新的讀者獲取到讀鎖。(上文已經多次提到這一點了)
因為 RWMutex 就是這麼實現的:如果當前有一個 pending 的寫者,那麼所有嘗試呼叫RLock
的讀者都會被阻塞,即使在這之前已經有讀者獲取到了讀鎖(原始碼
):
package main import ( "fmt" "sync" "time" ) var m sync.RWMutex func f(n int) int { if n < 1 { return 0 } fmt.Println("RLock") m.RLock() defer func() { fmt.Println("RUnlock") m.RUnlock() }() time.Sleep(100 * time.Millisecond) return f(n-1) + n } func main() { done := make(chan int) go func() { time.Sleep(200 * time.Millisecond) fmt.Println("Lock") m.Lock() fmt.Println("Unlock") m.Unlock() done <- 1 }() f(4) <-done }
輸出:
RLock RLock RLock Lock RLock fatal error: all goroutines are asleep - deadlock!
(譯註:上面的程式碼有兩個goroutine,一個是寫者 routine,一個是主 goroutine(也是讀者),通過程式的輸出可以知道:前三行都是輸出 RLock,表示這時候已經有 3 個讀者獲取到了讀鎖。後面接著輸出了 Lock, 表示這時候寫者開始請求寫鎖,後面接著輸出一個 RLock,表示這時又多了一個讀者請求讀鎖。因為 pending 的寫者會阻塞掉後續呼叫RLock
的讀者,所以最後一個 RLock 的呼叫堵塞了主 routine,而寫者的 routine 也在堵塞等待前面三個讀者釋放它們的讀鎖,所以兩個 goroutine 都堵塞了,因此程式報錯:fatal error: all goroutines are asleep - deadlock!
)
鎖的拷貝
go tool vet
可以檢測到是否有鎖被按值拷貝了,因為這種情況會導致死鎖,具體的情況可以看之前的一篇文章:Detect locks passed by value in Go
(譯註:GCTT 譯文:檢測 Go 程式中按值傳遞的 locks
效能
之前有人提出:隨著 CPU 核心數量的增加,RWMutex 的效能會降低,詳見:https://github.com/golang/go/issues/17973
鎖的爭用
Go 1.8 版本開始支援分析 mutex 的爭用情況(譯註:原文 Contention,參考維基百科 )(patch 補丁 ),我們來看看它是怎麼用的:
import ( "net/http" _ "net/http/pprof" "runtime" "sync" "time" ) func main() { var mu sync.Mutex runtime.SetMutexProfileFraction(5) for i := 0; i < 10; i++ { go func() { for { mu.Lock() time.Sleep(100 * time.Millisecond) mu.Unlock() } }() } http.ListenAndServe(":8888", nil) }
> go build mutexcontention.go > ./mutexcontention
當程式mutexcontention 執行時:
> go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1 Fetching profile over HTTP from http://localhost:8888/debug/pprof/mutex?debug=1 Saved profile in /Users/mlowicki/pprof/pprof.mutexcontention.contentions.delay.003.pb.gz File: mutexcontention Type: delay Entering interactive mode (type "help" for commands, "o" for options) (pprof) list main Total: 57.28s ROUTINE ======================== main.main.func1 in /Users/mlowicki/projects/golang/src/github.com/mlowicki/mutexcontention/mutexcontention.go 057.28s (flat, cum)100% of Total ..14:for i := 0; i < 10; i++ { ..15:go func() { ..16:for { ..17:mu.Lock() ..18:time.Sleep(100 * time.Millisecond) .57.28s19:mu.Unlock() ..20:} ..21:}() ..22:} ..23: ..24:http.ListenAndServe(":8888", nil)
上面的 57.28s 是什麼,它為什麼挨著mu.Unlock()
呢?
當 goroutine 因為呼叫Lock
方法而被阻塞的時候,這個時間點會被記錄下來——aquiretime(獲取時間)。當其他 goroutine 解鎖了這個鎖,並且起碼有一個 goroutine 在等待獲取這個鎖的時候。其中一個 goroutine 可以獲取到這個鎖,這時他會自動呼叫mutexevent
函式。函式mutexevent
根據SetMutexProfileFraction
函式設定的比率,來確定是否應該儲存或忽略掉該事件。這種事件都包含了等待時間(當前時間 - 獲取時間)。上述的程式碼中,所有阻塞在這個鎖的 goroutine 的總等待時間會被收集和顯示出來,
對於讀鎖(Rlock 和RUnlock
)爭用的分析功能,將會在 Go 1.11 版本加入 (patch 補丁
)