Go36-26-互斥鎖與讀寫鎖
從同步講起
相比於Go語言宣揚的“用通訊的方式共享資料”,通過共享資料的方式來傳遞資訊和協調執行緒執行的做法其實更加主流。本篇就是討論一些與多執行緒、共享資源以及同步有關的知識。
sync包,就是一個與併發程式設計關係緊密的程式碼包。這裡“sync”的中文意思就是“同步”。
重要的併發程式設計概念
這裡會講一些重要的併發程式設計概念:競態條件、臨界區、互斥量、死鎖。死鎖會在互斥鎖裡引出。
一旦資料被多個執行緒共享,那麼就很可能會產生爭用和衝突的情況。這種情況也被稱為競態條件 (race condition),這往往會破幻共享資料的一致性。
概括來講,同步的用途有兩個:
- 避免多個執行緒在同一時刻操作同一個資料塊
- 協調多個執行緒,避免它們在同一時刻執行同一個程式碼塊
由於這樣的資料塊和程式碼塊的背後都隱含著一種或多種資源,可以把他們看作是共享資源 。
同步就是在控制多個執行緒對共享資源的訪問。針對某個資源的訪問,同一時刻只能有一個執行緒訪問到該資源。那麼可以說,多個併發進行的執行緒對這個共享資源的訪問是完全序列的。只要一個程式碼片段需要實現對共享資源的序列化訪問,就可以被視為一個臨界區 (critical section)。也就是說,要訪問到資源就必須進入到這個區域。如果針對一個共享資源,這樣的程式碼片段有多個,那麼它們就可以被稱為相關臨界區 。
應對競態條件的問題,就需要施加一些保護的手段。方法之一就是使用實現了某種同步機制的工具,也稱為同步工具 。在Go語言中,可供我們選擇的同步工具 並不少。其中,最重要且最常用的同步工具當屬互斥量 (mutual exclusion,簡稱 mutex)。sync包中的Mutex就是與其對應的型別,該型別的值可以被稱為互斥量 或者互斥鎖 。
互斥鎖
雖然Go語言是以“用通訊的方式共享資料”為亮點,但是依然提供了一些易用的同步工具。而互斥鎖就是最常用到的一個。
一個互斥鎖可以被用來保護一個臨界區或者一組相關臨界區。保證同一時刻只有一個goroutine處於改臨界區之內。每當有goroutine想進入臨界區是,需要對它進行鎖定,並且在離開臨界區時進行解鎖。
程式碼示例
使用互斥鎖時,鎖定操作可以通過呼叫互斥鎖的Lock方法實現,而解鎖是呼叫Unlock方法。示例如下:
package main import ( "fmt" "flag" "os" "sync" "bytes" "io" ) var lock bool func init() { flag.BoolVar(&lock, "lock", false, "是否加鎖") } const ( max1 = 5// 準備啟用多個goroutine max2 = 10// 每個goroutine裡寫入這麼多組資料 max3 = 10// 每組資料就是重複寫入多個數字 ) func main () { // 解析命令列引數 flag.Parse() // bytes.Buffer是一個緩衝byte型別的緩衝器,存放的都是byte型別 var buffer bytes.Buffer var mu sync.Mutex// 互斥鎖 done := make(chan struct{})// 每當一個goroutine執行完畢了,就往這裡發一個訊號 for i := 0; i < max1; i++ { go func(id int, writer io.Writer) { defer func() { done <- struct{}{} }() for j := 0; j < max2; j++ { // 準備資料 header := fmt.Sprintf("\n[%d %d]", id, j) data := fmt.Sprintf(" %d-%d", id, j) // 加鎖 if lock { mu.Lock() } // 寫入資料 _, err := writer.Write([]byte(header)) if err != nil { fmt.Fprintf(os.Stderr, "ERROR when write header in %d: %s\n", id, err) } for k := 0; k < max3; k++ { _, err := writer.Write([]byte(data)) if err != nil { fmt.Fprintf(os.Stderr, "ERROR when write data in %d: %s\n", id, err) } } // 解鎖 if lock { mu.Unlock() } } }(i, &buffer) } // 等待goroutine退出 for i := 0; i < max1; i++ { <- done } // 列印結果 fmt.Println(buffer.String()) }
這個示例提供了一個命令列引數-lock,可以選擇加鎖或者不加鎖來執行這個程式。這樣可以方便的比較在程式碼中加鎖的作用。
注意事項和建議
使用互斥鎖時的注意事項:
- 不要重複加鎖
- 不要忘記解鎖,最好是使用defer語句
- 不要對尚未加鎖或者已經解鎖的互斥鎖解鎖
- 不要在多個函式之間直接傳遞互斥鎖
對一個已經被鎖定的互斥鎖進行鎖定,是會立即阻塞當前goroutine的。會一直等到該互斥鎖在別的goroutine裡被解鎖,並且這裡的鎖定操作完成為止。如果那邊解鎖後又被別的goroutine鎖定了,那就繼續等,一直到搶到鎖完成鎖定操作。
雖然沒有任何的強制規定,你是可以用同一個互斥鎖保護多個無關的臨界區的。但是這樣做,一定會使你的程式變的複雜,就是說不要這麼做,需要的話,就多搞幾把鎖。如果真的把一個互斥鎖同時用在了多個地方,必然會有更多的goroutine徵用這把鎖。這不但會使得程式變慢,還會打打增加死鎖 (deadlock)的可能性。
所謂死鎖,就是當前程式中的主goroutine,以及啟用的那個goroutine都已經被阻塞。這些goroutine可以被統稱為使用者級的goroutine 。就是說整個程式都停滯不前了。
Go語言執行時,系統是不允許死鎖的情況出現的。只要發現所有的使用者級goroutine都處於等待狀態,就會自行丟擲panic。隨便寫個函式,連續上2次鎖就死鎖了:
func main() { var mu sync.Mutex mu.Lock() mu.Lock() mu.Unlock() mu.Unlock() }
丟擲的資訊如下,主要就看第一行fatal error: all goroutines are asleep - deadlock!
:
PS H:\Go\src\Go36\article26\example02> go run main.go fatal error: all goroutines are asleep - deadlock! goroutine 1 [semacquire]: sync.runtime_SemacquireMutex(0xc042046004, 0x0) D:/Go/src/runtime/sema.go:71 +0x44 sync.(*Mutex).Lock(0xc042046000) D:/Go/src/sync/mutex.go:134 +0xf5 main.main() H:/Go/src/Go36/article26/example02/main.go:8 +0x55 exit status 2 PS H:\Go\src\Go36\article26\example02>
這種在Go執行時系統自行丟擲的panic都屬於致命錯誤,是無法被恢復的。呼叫recover函式也不起作用。就是說,一旦死鎖,程式必然崩潰。
要避免這種情況,最有效的做法就是,讓每一個互斥鎖只保護一個臨界區或一組相關的臨界區。
還要注意,對同一個goroutine而言,既不要重複鎖定一個互斥鎖,也不要忘記進行解鎖。這裡不要忘記解鎖的一個很重要的原因就是為了避免重複鎖定。在很多時候,一個函式執行的流程並不是單一的,流程中間可能會有分叉、也可能會被中斷。最保險的做法就是使用defer語句來進行解鎖,並且這樣的defer語句應該緊跟在鎖定操作的後面。
上面的那個示例,沒有按這裡說的來做,因為整個寫操作是在for迴圈裡的。解鎖操作後還有其他語句要執行,這裡是for迴圈裡的其他迭代要處理。而defer語句是隻有程式退出後才會執行的。不過這都不是藉口,要按這裡最保險的做法來做,只需要把for迴圈裡的語句再寫一個函式或匿名函式就可以用defer了:
for i := 0; i < max1; i++ { go func(id int, writer io.Writer) { defer func() { done <- struct{}{} }() for j := 0; j < max2; j++ { // 準備資料 header := fmt.Sprintf("\n[%d %d]", id, j) data := fmt.Sprintf(" %d-%d", id, j) func () { // 加鎖 if lock { mu.Lock() defer func() { // 解鎖 mu.Unlock() }() } // 寫入資料 _, err := writer.Write([]byte(header)) if err != nil { fmt.Fprintf(os.Stderr, "ERROR when write header in %d: %s\n", id, err) } for k := 0; k < max3; k++ { _, err := writer.Write([]byte(data)) if err != nil { fmt.Fprintf(os.Stderr, "ERROR when write data in %d: %s\n", id, err) } } }() } }(i, &buffer) }
解鎖未鎖定的互斥鎖也會立即引發panic。並且與死鎖一樣,也是無法被恢復的。從這一定看,也是需要保證對於沒一個鎖定操作,都必須且只能由一個對應的解鎖操作。就是要讓他們成對出現,這也算是互斥鎖一個很重要的使用原則。而利用defer語句進行解鎖就可以很容易的做到這一點。
互斥鎖是結構體、值型別Go語言中的互斥鎖時開箱即用的,就是一旦聲明瞭一個sync.Mutex型別的變數,就可以直接使用它。不過要注意,該型別是一個結構體,屬於值型別:
type Mutex struct { state int32 semauint32 }
對於值型別,把它傳遞給一個函式、將他從函式中返回、把它賦值給其他變數、讓它進入某個通道都會導致它的副本的產生。這裡,原值和副本以及多個副本之間都是完全獨立的,是不同的互斥鎖。舉例說明,如果你把一個互斥鎖作為引數值傳給了一個函式,那麼在這個函式中對傳入的鎖的所有操作,都不會對存在於該函式之外的那個原鎖產生任何影響。
這就是為什麼“不要在多個函式之間直接傳遞互斥鎖”。避免歧義,即使你希望的是在這個函式中使用另外一個互斥鎖也不要這樣做。
示例程式碼2
學習了上面的注意事項和建議,就來看看如何更好的使用互斥鎖。下面是一個使用互斥鎖的示例:
package main import ( "bytes" "fmt" "os" "io" "sync" "time" ) // 建立互斥鎖 var mu sync.Mutex // singleHandler 代表單次處理函式的型別,讀和寫用的函式內容有些不同,但是簽名都是這樣的 type singleHandler func() (data string, n int, err error) // 生成一個寫入當前時間的函式 func genWriter(writer io.Writer) singleHandler { return func() (data string, n int, err error) { // 準備資料 data = fmt.Sprintf("%s\t", time.Now().Format(time.StampNano)) // 寫入資料 mu.Lock() defer mu.Unlock() n, err = writer.Write([]byte(data)) return } } // 生成一個讀取資料的函式 func genReader(reader io.Reader) singleHandler { return func() (data string, n int, err error) { // 型別斷言,把io.Reader介面轉成*bytes.Buffer型別 // 下面要呼叫*bytes.Buffer型別的ReadString方法 // 因為函式的引數要求是一個介面型別,但是後面的讀操作用的是*bytes.Buffer的ReadString方法 // 所以在呼叫方法前,必須要檢查介面的實際型別(動態型別) // 實際在主函式裡呼叫genReader函式是,傳入的就是*bytes.Buffer // 型別斷言x.(T),這裡x必須為一個介面型別,但並非必須是空介面 // 這裡reader是個io.Reader介面。如果要對非介面型別的變數做型別斷言,就要先轉成空介面 buffer, ok := reader.(*bytes.Buffer) if !ok { err = fmt.Errorf("unsupported reader") return } // 讀取資料 mu.Lock() defer mu.Unlock() data, err = buffer.ReadString('\t') n = len(data) return } } // 處理流程配置的型別,這裡把處理流程相關的資訊全部寫在下面的結構體型別裡 type handlerConfig struct { handlersingleHandler // 處理函式 goNumint// 要啟用的goroutine的數量 numberint// 單個goroutine中處理的次數 intervaltime.Duration // 單個goroutine中,多次處理中間間隔的時間 counterint// 資料量計數器,位元組數 counterMu sync.Mutex// 上面的資料量計數器專用的互斥鎖 } // 增加資料量計數器的方法 func (hc *handlerConfig) count(skip int) int { hc.counterMu.Lock() defer hc.counterMu.Unlock() hc.counter += skip return hc.counter } func main() { // 建立緩衝區由於下面的讀和寫 var buffer bytes.Buffer // 寫入資料的配置,分6個goroutine分別寫入4次,一個24次 writingConfig := handlerConfig{ handler: genWriter(&buffer), goNum: 6, number: 4, interval: time.Millisecond * 100, } // 讀取資料的配置,分8個goroutine分別讀取3次,一個也是24次 readingConfig := handlerConfig{ handler: genReader(&buffer), goNum: 8, number: 3, interval: time.Millisecond * 100, } done := make(chan struct{}) // 啟用多個goroutine對緩衝區進行多次寫入 for i := 0; i < writingConfig.goNum; i++ { go func(i int) { defer func() { done <- struct{}{} }() for j :=0; j < writingConfig.number; j++ { // 進入迭代前等待,邏輯稍微簡單一點 // 如果寫在最後,那麼因為err而退出時這次迭代就不會等待了 time.Sleep(writingConfig.interval) data, n, err := writingConfig.handler() if err != nil { fmt.Fprintf(os.Stderr, "writer [%d-%d] ERROR: %s\n", i, j, err) continue } total := writingConfig.count(n) fmt.Printf("writer [%d-%d] Report: %s (total %d)\n", i, j, data, total) } }(i) } // 啟用多個goroutine對緩衝區進行多次讀取 for i := 0; i < readingConfig.goNum; i++ { go func (i int) { defer func() { done <- struct{}{} }() for j := 0; j < readingConfig.number; j++ { var ( data string n int err error ) // 下面的無限for迴圈是一個程式碼塊,裡面的data、n、err如果用短變數賦值就是區域性變數 // 所以上面在程式碼塊外面,聲明瞭i的for迴圈內部的變數 for { // 如果讀比寫快,被讀的是空的,一讀就到末尾了,就會返回EOF錯誤, time.Sleep(readingConfig.interval) data, n, err = readingConfig.handler() // 這個判斷邏輯是讀取的EOF錯誤,就無限迴圈等待。讀到內容或其他錯誤就跳出迴圈 if err == nil || err != io.EOF { break } } if err != nil { fmt.Fprintf(os.Stderr, "reader [%d-%d] ERROR: %s\n", i, j, err) } total := readingConfig.count(n) fmt.Printf("reader [%d-%d] Report: %s (total %d)\n", i, j, data, total) } }(i) } // 等待所有goroutine結束 doneNum := writingConfig.goNum + readingConfig.goNum for i := 0; i < doneNum; i++ { <- done } }
這個示例中,分別有讀和寫的兩個處理函式。而處理函式裡做的事情就是:加鎖、defer解鎖,完成讀或寫操作然後返回。這裡就做到了加鎖和解鎖操作成對出現,並且把鎖和要保護的共享資源放在一起了。
示例中還有一個互斥鎖在handlerConfig結構體中,要保護的共享資源也是handlerConfig結構體中的counter欄位。並且寫了一個方法count實現對counter欄位的鎖定和修改。
讀寫鎖
讀寫鎖是讀/寫互斥鎖的簡稱。在Go語言中,讀寫鎖有sync.RWMutex型別的值代表。與sync.Mutex一樣,這個型別也是開箱即用的。開箱即用,應該就是指不用賦值,定義了之後直接就能用了。就是讓它的零值也具有意義。
讀寫鎖就是把共享資源的“讀操作”和“寫操作”區別對待了。為兩種操作施加了不同程度的保護。相比於互斥鎖,讀寫鎖可以實現更加細膩的訪問控制。
一個讀寫鎖中實際包含了兩個鎖,讀鎖和寫鎖:
- 寫鎖,它的Lock方法和Unlock方法分別用於對寫鎖進行鎖定和解鎖
- 讀鎖,它的RLock方法和RUnlock方法分別用於對讀鎖進行鎖定和解鎖
讀寫鎖規則
對於同一個讀寫鎖,有如下的規則:
- 在寫鎖已被鎖定的情況下,再檢視鎖定寫鎖,會阻塞當前goroutine
- 在寫鎖已被鎖定的情況下,試圖鎖定讀鎖,也會阻塞當前goroutine
- 在讀鎖已被鎖定的情況下,試圖鎖定寫鎖,同樣會阻塞當前goroutine
- 在讀寫已被鎖定的情況下,再檢視鎖定讀鎖,並不會阻塞當前的goroutine
總結一下,就是可以有多個讀操作,讀鎖鎖定的情況下,別的goroutine也可以讀。其他的情況下要操作,只能等之前鎖定的操作完成釋放鎖,並且搶到鎖了。再換個角度說,就是多個讀操作可以同時進行,多個寫操作不能同時進行,讀和寫操作也不能同時進行。
讀寫鎖對寫操作之間的互斥,其實是通過它內含的一個互斥鎖實現的。因此,讀寫鎖是互斥鎖的一種擴充套件。所以無論是互斥鎖還是讀寫鎖,都不要試圖去解鎖未鎖定的鎖,因為這樣會引發不可恢復的panic。
示例程式碼
之前互斥鎖的示例中,使用互斥鎖保護了對緩衝區的讀寫操作,而這裡又講了讀寫鎖,不要被這裡讀和寫的說法鎖迷惑。對緩衝區的讀操作是會把讀到的內容從緩衝區裡去除的,所以是有類似寫的操作在裡面的,使用互斥鎖時正確的做法,並且不能使用這裡的讀寫鎖。
而這個示例中的讀操作,就僅僅只是去獲取到值而已了,在讀操作的時候加個讀鎖正合適:
package main import( "fmt" "sync" "time" ) // 計數器 type counter struct { num uint // 計數 mu sync.RWMutex //讀寫鎖 } // 獲取num值的操作,加讀鎖 func (c *counter) number() uint { c.mu.RLock() defer c.mu.RUnlock() return c.num } // 修改num值的操作,加寫鎖 func (c *counter) add (increment uint) uint { c.mu.Lock() defer c.mu.Unlock() c.num += increment return c.num } // 跑一下上面的兩個方法看看效果 func main() { c := counter{} done := make(chan struct{}) // 增加計數器 go func() { defer func() { done <- struct{}{} }() for i := 0; i < 10; i++ { time.Sleep(time.Millisecond * 500) c.add(1) } }() go func() { defer func() { done <- struct{}{} }() for j := 0; j < 20; j++ { time.Sleep(time.Millisecond * 200) fmt.Printf("[%d-%02d] 讀數: %d\n", 1, j, c.number()) } }() go func() { defer func() { done <- struct{}{} }() for k := 0; k < 20; k++ { time.Sleep(time.Millisecond * 300) fmt.Printf("[%d-%02d] 讀數: %d\n", 2, k, c.number()) } }() <- done <- done <- done }