Go36-31-sync.WaitGroup和sync.Once
sync.WaitGroup
之前在協調多個goroutine的時候,使用了通道。基本都是按下面這樣來使用的:
package main import "fmt" func main() { done := make(chan struct{}) count := 5 for i := 0; i < count; i++ { go func(i int) { defer func() { done <- struct{}{} }() fmt.Println(i) }(i) } for j := 0; j < count; j++ { <- done } fmt.Println("Over") }
這裡有一個問題,要保證主goroutine最後從通道接收元素的的次數需要與之前其他goroutine傳送元素的次數相同。
其實,在這種應用場景下,可以選用另外一個同步工具,就是這裡要講的sync包的WaitGroup型別。
使用方法
sync.WaitGroup型別,它比通道更加適合實現這種一對多的goroutine協作流程。WaitGroup是開箱即用的,也是併發安全的。同時,與之前提到的同步工具一樣,它一旦被真正的使用就不能被複制了。
WaitGroup擁有三個指標方法,可以想象該型別中有一個計數器,預設值是0,下面的方法就是操作或判斷計數器:
Add(-1)
現在就用WaitGroup來改造開篇的程式:
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup// 開箱即用,所以直接宣告就好了,沒必要用短變數宣告 // wg := sync.WaitGroup{}// 短變數宣告可以這麼寫 count := 5 for i := 0; i < count; i++ { wg.Add(1) go func(i int) { defer wg.Done() fmt.Println(i) }(i) } wg.Wait() fmt.Println("Over") }
改造後,在主goroutine最後等待退出的部分現在看著要美觀多了。這個就是WaitGroup典型的應用場景了。
注意的事項
計數器不能小於0在sync.WaitGroup型別值中計數器的值是不可以小於0的。一旦小於0會引發panic,不適當的呼叫Done方法和Add方法就有可能使它小於0而引發panic。
儘早增加計數器的值如果在對它的Add方法的首次呼叫,與對它的Wait方法的呼叫是同時發起的。比如,在同時啟動的兩個goroutine中,分別呼叫這兩個方法,那就就有可能會讓這裡的Add方法丟擲一個panic。並且這種情況不太容易,應該予以重視。所以雖然WaitGroup值本身並不需要初始化,但是儘早的增加其計數器的值是非要必要的。
複用的情況
WaitGroup的值是可以被複用的,但需要保證其計數週期 的完整性。這裡的計數週期指的是這樣一個過程:該值中的計數器值由0變為了某個正整數,而後又經過一系列的變化,最終由某個正整數又變回了0。這個過程可以被視為一個計數週期。在一個此類的生命週期中,它可以經歷任意多個計數週期。但是,只有在它走完當前的計數週期後,才能夠開始下一個計數週期。
也就是說,如果一個此類值的Wait方法在它的某個計數週期中被呼叫,那麼就會立即阻塞當前的goroutine,直至這個計數週期完成。在這種情況下,該值的下一個計數週期必須要等到這個Wait方法執行結束之後,才能夠開始。
Wait方法是有一個執行的過程的,如果在這個方法執行期間,跨越了兩個計數週期,就會引發一個panic。比如,當前的goroutine呼叫了Wait方法而阻塞了。另一個goroutine呼叫了Done方法使計數器變成了0。此時會喚醒之前阻塞的goroutine,並且去執行Wait方法中其餘的程式碼(這裡還在這行Wait方法,執行的是原始碼sync.Wait方法裡的程式碼,不是我們自己寫的程式的Wait之後的程式碼)。在這個時候,又有一個goroutine呼叫了Add方法,使計數器的值又從0變為了某個正整數。此時正在執行的Wait方法就會立即丟擲一個panic。
小結
上面給了3種會引發panic的情況。關於後兩種情況,建議如下:
不要把增加計數器值的操作和呼叫Wait方法的程式碼,放在不同的goroutine中執行。
就是要杜絕對同一個WatiGroup值的兩種操作的併發執行。
後面提到的兩種情況,不是每次都會發生,通常需要反覆的實驗才能夠引發panic的情況。雖然不是每次都發生,但是在長期執行的過程中,這種情況是必然會出現的,應該予以重視並且避免。
如果對復現這些異常情況感興趣,可以看一下sync程式碼包中的waitgroup_test.go檔案。其中的名稱以TestWaitGroupMisuse為字首的測試函式,很好的展示了這些異常情況發生的條件。
sync.Once
與sync.WaitGroup型別一樣,Sync.Once型別也屬於結構體型別,同樣也是開箱即用和併發安全的。由於這個型別中包含了一個sync.Mutex型別的欄位,所以複製改型別的值也會導致功能失效。
使用方法
Once型別的Do方法只接收一個引數,引數的型別必須是func(),即無引數無返回的函式。該方法的功能並不是對每一種引數函式都只執行一次,而是隻執行首次被呼叫時傳入的那個函式,並且之後不會再執行任何引數函式。所以,如果有多個需要執行一次的函式,應該為它們每一個都分配一個sync.Once型別的值。
基本用法如下:
package main import ( "fmt" "sync" "sync/atomic" ) func main() { var counter uint32 var once sync.Once once.Do(func() { atomic.AddUint32(&counter, 1) }) fmt.Println("counter:", counter) // 這次呼叫不會被執行 once.Do(func() { atomic.AddUint32(&counter, 2) }) fmt.Println("counter:", counter) }
Once型別中還要一個名為done的uint32型別的欄位。它的作用是記錄所屬值的Do方法被呼叫的次數。不過改欄位的值只可能是0或1.一旦Do方法的首次呼叫完成,它的值就會從0變為1。
關於done的型別,其實用布林型別就夠了,這裡只所以用uint32型別的原因是它的操作必須是原子操作,只能使用原子操作支援的資料型別。
Do方法在一開始就會通過atomic.LoadUint32來獲取done欄位的值,並且如果發現值為1就直接返回。這步只是初步保證了Do方法只會執行首次呼叫是傳入的函式。
不過單憑上面的判斷是不夠的。如果兩個goroutine都呼叫了同一個新的Once值的Do方法,並且幾乎同時執行到了其中的這個條件判斷程式碼,那麼它們就都會因判斷結果為false而繼續執行Do方法中剩餘的程式碼。
基於上面的可能,在初步保證的判斷之後,Do方法會立即鎖定其所屬值中的那個sync.Mutex型別的m欄位。然後,它會在臨界區中再次檢查done欄位的值。此時done的值應該仍然是0,並且已經加鎖。此時才認為是條件滿足,才會去呼叫引數函式。並且用原子操作把done的值變為1。
如果熟悉設計模式中的單例模式的話,這個Do方法的實現方式,與單例模式有很多相似之處。都會先在臨界區之外判斷一次關鍵條件,若條件不滿足則立即返回。這通常被稱為快路徑 ,或者叫做快速失敗路徑 。
如果條件滿足,那麼到了臨界區中還要再對關鍵條件進行一次判斷,這主要是為了更加嚴謹。這兩次條件判斷常被統稱為(跨臨界區的)雙重檢查 。由於進入臨界區前要加鎖,顯然會降低程式碼的執行速度,所以其中的第二次條件判斷,以及後續的操作就被稱為慢路徑 或者常規路徑 。
Do方法中的程式碼不多,但它卻應用了一個很經典的程式設計正規化。
功能方面的特點
一、由於Do方法只會在引數函式執行結束之後把done欄位的值變為1,因此,如果引數函式的執行需要很長的時間或者根本就不會結束,那麼就有可能會導致相關goroutine的同時阻塞。
比如,有多個goroutine併發的呼叫了同一個Once值的Do方法,並且傳入的函式都會一直執行而不結束。那麼,這些goroutine就都會因呼叫了這個Do方法而阻塞。此時,那個搶先執行了引數函式的goroutine之外,其他的goroutine都會被阻塞在該Once值的互斥鎖m的那行程式碼上。
效果演示的示例程式碼:
package main import ( "fmt" "sync" "time" ) func main() { once := sync.Once{}// 這裡換短變數宣告 wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() // 這個函式會被執行 once.Do(func() { for i := 0; i < 10; i++ { fmt.Printf("\r任務[1-%d]執行中...", i) time.Sleep(time.Millisecond * 400) } }) fmt.Printf("\n任務[1]執行完畢\n") }() wg.Add(1) go func() { defer wg.Done() time.Sleep(time.Millisecond * 300) // 這句Do方法的呼叫會一直阻塞,知道上面的函式執行完畢 // 然後Do方法裡的函式不會執行 once.Do(func() { fmt.Println("任務[2]執行中...") }) // 上面Do方法阻塞結束後,直接會執行下面的程式碼 fmt.Println("任務[2]執行完畢") }() wg.Add(1) go func() { defer wg.Done() time.Sleep(time.Millisecond * 300) once.Do(func() { fmt.Println("任務[3]執行中...") }) fmt.Println("任務[3]執行完畢") }() wg.Wait() fmt.Println("Over") }
二、Do方法在引數函式執行結束後,對done欄位的賦值用的是原子操作,並且這一操作是被掛載defer語句中的。因此,不論引數函式的執行會以怎樣的方式結束,done欄位的值都會變為1。
這樣就是說即時引數函式沒有執行成功,比如引發了panic。也是無法使用同一個Once值重新執行別的函數了。所以,如果需要為引數函式的執行設定重試機制,就要考慮在適當的時候替換Once值。
參考下面的示例:
package main import ( "fmt" "sync" "time" ) func main() { once := sync.Once{} wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() defer func() { if p := recover(); p != nil { fmt.Printf("PANIC: %v\n", p) // 下面的語句會給once變數替換一個新的Once值,這樣下面的第二個任務還能被執行 // once = sync.Once{} } }() once.Do(func() { fmt.Println("開始執行引數函式,緊接著會引發panic") panic(fmt.Errorf("主動引發了一個panic"))// panic之後就去呼叫defer了 fmt.Println("引數函式執行完畢")// 這行不會執行,後面的都不會執行 }) fmt.Println("Do方法呼叫完畢")// 這行也不會執行 }() wg.Add(1) go func() { defer wg.Done() time.Sleep(time.Millisecond * 500) once.Do(func() { fmt.Println("第二個任務執行中...") time.Sleep(time.Millisecond * 800) fmt.Println("第二個任務執行結束") }) fmt.Println("第二個任務結束") }() wg.Wait() fmt.Println("Over") }
延遲初始化
延遲一個昂貴的初始化步驟到有實際需求的時刻是一個很好的實踐。這也是sync.Once的一個使用場景。
下面是從書上改的示例程式碼:
package main import ( "fmt" "sync" ) var once sync.Once var testmap map[string] int32 // 對testmap進行初始化的函式 func loadTestmap() { testmap = map[string] int32{ "k1": 1, "k2": 2, "k3": 3, } } // 獲取testmap對應key的值,如果沒有初始化,會先執行初始化 // 書上說這個函式是併發安全的,這裡的map初始化之後,內容不會再變 func getKey(key string) int32 { once.Do(loadTestmap) // 最後的return這句可能不是併發安全的,不過執行緒安全的map不是這裡的重點 // 假定這裡的map在初始化之後只會被多個goroutine讀取,其內容不會再改變 return testmap[key] } func main() { fmt.Println(getKey("k1")) }
這裡不考慮map執行緒安全的問題,而且書上的例子這裡的map只用來存放資料,初始化之後不會對其內容進行修改。
這裡主要是保證在變數初始化過程中的併發安全。以這種方式來使用sync.Once,可以避免變數在正確構造之前就被其它goroutine分享。否則,在別的goroutine中可能會獲取到一個內容不完整的變數。
總結
sync程式碼包的WaitGroup型別和Once型別都是非常易用的同步工具。它們都是開箱即用和併發安全的。
Once型別使用互斥鎖和原子操作實現了功能,而WatiGroup型別中只用到了原子操作。所以可以說,它們都是更高層次的同步工具。它們都基於基本的同步工具,實現了某種特定的功能。sync包中的其他高階同步工具,其實也都是這樣的。