Go36-27,28-條件變數
條件變數
條件變數(conditional variable),和互斥鎖一樣,也是一個同步工具。我們常常會把條件變數與互斥鎖一起討論。實際上,條件變數是基於互斥鎖的,它必須有互斥鎖的支撐才能發揮作用。
作用
條件變數並不是被用來保護臨界區和共享資源的,它是用於協調想要訪問共享資源的那些執行緒的。當共享資源的狀態發生變化時,它可以被用來通知被互斥鎖阻塞的執行緒。
使用條件變數的最大優勢就是在效率方面的提升。當共享資源的狀態不滿足條件的時候,想操作它的執行緒再也不用迴圈往復的做檢查了,只要等待通知就好了。
使用條件變數
條件變數需要與互斥鎖配合使用。條件變數的初始化需要互斥鎖,並且它的方法有的也是基於互斥鎖的。
條件變數提供的方法有三個:
- 等待通知(wait)
- 單發通知(signal)
- 廣播通知(broadcast)
在利用條件變數等待通知的時候,需要在它基於的那個互斥鎖的保護下進行。
在進行單發通知或光爆通知的時候,需要在對應的互斥鎖解鎖之後再做操作。
建立條件變數結合程式碼理解上面的含義,先建立幾個變數:
var lock sync.RWMutex sendCond := sync.NewCond(&lock) recvCond := sync.NewCond(lock.RLocker())
條件變數的型別lock是一個讀寫鎖,基於這把鎖,建立了2個代表條件變數的變數,這兩個變數的型別是*sync.Cond,是由sync.NewCond函式來初始化的。
與互斥鎖鎖不同,這裡不是開箱即用的,只能使用sync.NewCond函式來建立它的指標值,這個函式需要一個sync.Locker型別的引數。
前面說過,條件變數是基於互斥鎖的,它必須有互斥鎖的支援才能夠起作用。因此,這裡的引數是必須的,它也會參與到條件變數的方法實現中去。
sync.Locker介面sync.Locker其實是一個介面,包含兩個方法Lock()和Unlock():
type Locker interface { Lock() Unlock() }
sync.Mutex型別sync,RWMutex型別都擁有這兩個方法,不過都是指標方法。因此這兩個型別的指標型別才是sync.Locker介面的實現型別。
在為sendCond初始化的時候,把lock變數的指標作為引數。這裡lock變數的Lock方法和Unlock方法分別用於對其中寫鎖的鎖定和解鎖。這裡與實現介面的兩個方法的名稱是對應的。
在為recvCond初始化的時候,需要的是lock變數的讀鎖,並且還得是sync.Locker介面型別,就是要實現了Lock和Unlock方法的讀鎖。可是lock變數中用於讀鎖的方法卻是RLock方法和RUnlock方法,這裡名稱不對應了。不過有一個RLocker方法可以實現這一需求,下面是原始碼裡實現的部分,很簡單:
// RLocker returns a Locker interface that implements // the Lock and Unlock methods by calling rw.RLock and rw.RUnlock. func (rw *RWMutex) RLocker() Locker { return (*rlocker)(rw) } type rlocker RWMutex func (r *rlocker) Lock(){ (*RWMutex)(r).RLock() } func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
這裡我有一些小疑惑,3個方法裡面都是型別斷言吧。RLocker方法把原來的讀寫鎖型別轉成一個新的型別然後返回。後面的兩個方法,為了用新型別呼叫讀寫鎖型別裡的方法,先進行型別斷言,轉成讀寫鎖原本的型別,然後呼叫它的方法。
使用條件變數下面是擷取的使用時的部分程式碼:
lock.Lock() for !isEmpty { sendCond.Wait() } isEmpty = false // 這裡可以做寫入的操作 lock.Unlock() recvCond.Signal()
上面是一個寫入的流程。之前的程式碼定義了一個狀態變數isEmpty,只有狀態為空的時候,才允許寫入,寫入後把狀態設定為非空。
這裡要先呼叫Lock方法,等待通知(wait)是要在互斥鎖的保護下進行的。
然後再操作完之後,先呼叫Unlock方法,再發送通知,傳送通知的操作要在互斥鎖解鎖之後。
這裡等待的出sendCond的訊號,而最後傳送的是recvCond的訊號。在另一個讀取的流程裡則正好相反。利用條件變數可以實現單向的通知,而這裡要實現雙向的通知,就需要兩個條件變數。這是條件變數的基本使用原則。
示例程式碼
上面把關鍵的程式碼分析了一下,下面是完整的示例程式碼:
package main import ( "fmt" "sync" "time" "flag" ) var useCond bool func init() { flag.BoolVar(&useCond, "cond", false, "是否使用條件變數") } type msgBox struct { messagestring isEmptybool sendCond *sync.Cond recvCond *sync.Cond } func main() { flag.Parse() fmt.Println("是否開啟了條件變數保護:", useCond) var lock sync.RWMutex msgBox := msgBox{ isEmpty:true,// 預設值是false,狀態初始值應該為true sendCond: sync.NewCond(&lock),// 不是開箱即用的,需要在使用前初始化 recvCond: sync.NewCond(lock.RLocker()), } done := make(chan struct{}) max := 5 // 寫操作的goroutine go func(max int) { defer func() { done <- struct{}{} }() for i := 0; i < max; i++ { time.Sleep(time.Millisecond * 200) // 先進行保護 lock.Lock() // 再等待通知 for useCond && !msgBox.isEmpty { msgBox.sendCond.Wait() } msgBox.isEmpty = false msg := fmt.Sprintf("第 %d 條訊息", i) msgBox.message = msg fmt.Printf("傳送訊息[%d]: %s\n", i, msg) // 先解鎖 lock.Unlock() // 再發送通知 msgBox.recvCond.Signal() } }(max) // 讀操作的goroutine go func(max int) { defer func() { done <- struct{}{} }() for j := 0; j < max; j++ { time.Sleep(time.Millisecond * 500) lock.RLock() for useCond && msgBox.isEmpty { msgBox.recvCond.Wait() } msgBox.isEmpty = true msg := msgBox.message fmt.Printf("接收訊息[%d]: %s\n", j, msg) lock.RUnlock() msgBox.sendCond.Signal() } }(max) <-done <-done fmt.Println("Over") }
在這個例子裡,寫的時候要獲取到寫鎖,讀的時候要獲取到讀鎖,這個邏輯和之前互斥鎖是一樣的。但是隻是獲取到鎖還不能做操作,這裡還要再做一個限制,所以就用到了條件變數。
在這個例子裡,寫操作和讀操作是需要成對出現的。寫完一次之後,依然能獲取到寫鎖,但是不能立刻寫。而是要等待讀操作把之前寫入的資料讀過之後,才能再次寫入,把之前的內容覆蓋掉。讀操作也是一樣。這裡就需要兩個goroutine之間傳遞訊號了。
通過命令列引數分別在開啟/關閉條件變數的環境下執行,可以看到其中的作用:
go run main.go go run main.go -cond
Wait方法
條件變數的Wait方法主要做了4件事:
- 把呼叫它的goroutine加入到當前條件變數的通知佇列中
- 解鎖當前的條件變數基於的那個互斥鎖
- 讓當前的goroutine處於等待狀態,等到通知來了再決定是否喚醒它。此時阻塞在呼叫Wait方法的那行程式碼上
- 如果通知來了並且決定喚醒當前goroutine,就在喚醒它之後重新鎖定當前條件變數基於的互斥鎖
先解鎖,在阻塞在Wait方法裡,必須要先解鎖,在阻塞當前goroutine。否則就違背了互斥鎖要成對出現的原則。並且當前goroutine在解鎖千就阻塞的話,當前goroutine就不可能在執行解鎖了。即使不考慮原則,讓別的goroutine來解鎖,又會有重複解鎖可能。
使用for語句並且Wait方法建議是放在一個for迴圈裡的。這裡似乎也是可以用if語句的。但是if語句只能檢查狀態一次,而for的話可以進行多次檢查。如果goroutine收到了通知而喚醒,但是此時檢查時發現狀態還是不對,那麼就應該再次呼叫Wait方法。保險起見,在包裹條件變數的Wait方法總是應該使用for語句。
Signal方法和Broadcast方法
這2個方法都是用來發送通知的。Signal方法的通知只會喚醒一個goroutine,而Broadcast方法的通知會喚醒所有等待的goroutine。Wait方法會把當前的goroutine新增到通知佇列的隊尾,而Signal方法會從通知佇列的隊首開始查詢可以被喚醒的goroutine。因此Signal方法喚醒的一般是最早等待的那個goroutine。
適用場景這2個方法的行為決定他們的適用場景。確定只有一個goroutine在等待通知,或者值需要喚醒一個goroutine的時候,就使用Signal方法。否則,使用Broadcast方法總是沒錯的,Broadcast方法的適用場景更多。
通知的即時性
條件變數的通知具有即時性。如果傳送通知的時候沒有goroutine在等待,那麼該次通知就會被直接丟棄。之後再開始等待的goroutine需要等待之後的通知。
示例程式碼2
還是前面那個示例,稍微改了改,把讀寫鎖換成了互斥鎖,通知方法把Signal換成了Broadcast:
package main import ( "fmt" "sync" "time" ) var lock sync.Mutex // 匿名結構體,定義並初始化賦值 // 嵌入式鎖(Embedded lock)的場景適合使用匿名結構體 var msgBox = struct { messagestring isEmptybool sendCond *sync.Cond recvCond *sync.Cond }{ isEmpty: true, sendCond: sync.NewCond(&lock), recvCond: sync.NewCond(&lock), } // 用於設定訊息的函式 func send(id, index int) { lock.Lock() for !msgBox.isEmpty { msgBox.sendCond.Wait() } msg := fmt.Sprintf("msg: [%d-%d]", id, index) msgBox.message = msg fmt.Printf("傳送訊息[%d-%d]: %s\t", id, index, msg) msgBox.isEmpty = false lock.Unlock() msgBox.recvCond.Broadcast() } // 用於讀取訊息的函式 func recv(id, index int) { lock.Lock() for msgBox.isEmpty { msgBox.recvCond.Wait() } msg := msgBox.message msgBox.message = "" fmt.Printf("接收訊息[%d-%d]: %s\n", id, index, msg) msgBox.isEmpty = true lock.Unlock() msgBox.sendCond.Broadcast() } func main() { done := make(chan struct{}) count := 5 // 啟動一個goroutine用於傳送 go func(id, count int) { defer func() { done <- struct{}{} }() for i := 0; i < count; i++ { time.Sleep(time.Millisecond * 100) send(id, i) } }(0, count * 2) // 啟動兩個goroutine用於接收 go func(id, count int) { defer func() { done <- struct{}{} }() for i := 0; i < count; i++ { time.Sleep(time.Millisecond * 300) recv(id, i) } }(1, count) go func(id, count int) { defer func() { done <- struct{}{} }() for i := 0; i < count; i++ { time.Sleep(time.Millisecond * 400) recv(id, i) } }(2, count) <- done <- done <- done fmt.Println("Over") }