1. 程式人生 > >Go36-27,28-條件變量

Go36-27,28-條件變量

現狀 同步工具 鎖定 接收 cast tro goroutine 使用 ret

條件變量

條件變量(conditional variable),和互斥鎖一樣,也是一個同步工具。我們常常會把條件變量與互斥鎖一起討論。實際上,條件變量是基於互斥鎖的,它必須有互斥鎖的支撐才能發揮作用。

作用

條件變量並不是被用來保護臨界區和共享資源的,它是用於協調想要訪問共享資源的那些線程的。當共享資源的狀態發生變化時,它可以被用來通知被互斥鎖阻塞的線程。
使用條件變量的最大優勢就是在效率方面的提升。當共享資源的狀態不滿足條件的時候,想操作它的線程再也不用循環往復的做檢查了,只要等待通知就好了。

使用條件變量

條件變量需要與互斥鎖配合使用。條件變量的初始化需要互斥鎖,並且它的方法有的也是基於互斥鎖的。

條件變量提供的方法有三個:

  • 等待通知(wait)
  • 單發通知(signal)
  • 廣播通知(broadcast)

在利用條件變量等待通知的時候,需要在它基於的那個互斥鎖的保護下進行。
在進行單發通知或光爆通知的時候,需要在對應的互斥鎖解鎖之後再做操作。

創建條件變量
結合代碼理解上面的含義,先創建幾個變量:

var lock sync.RWMutex
sendCond := sync.NewCond(&lock)
recvCond := sync.NewCond(lock.RLocker())

條件變量的類型
lock是一個讀寫鎖,基於這把鎖,創建了2個代表條件變量的變量,這兩個變量的類型是*sync.Cond,是由sync.NewCond函數來初始化的。

初始化
與互斥鎖鎖不同,這裏不是開箱即用的,只能使用sync.NewCond函數來創建它的指針值,這個函數需要一個sync.Locker類型的參數。
前面說過,條件變量是基於互斥鎖的,它必須有互斥鎖的支持才能夠起作用。因此,這裏的參數是必須的,它也會參與到條件變量的方法實現中去。
sync.Locker接口
sync.Locker其實是一個接口,包含兩個方法Lock()和Unlock():

type Locker interface {
    Lock()
    Unlock()
}

sync.Mutex類型sync,RWMutex類型都擁有這兩個方法,不過都是指針方法。因此這兩個類型的指針類型才是sync.Locker接口的實現類型。

初始化的過程
在為sendCond初始化的時候,把lock變量的指針作為參數。這裏lock變量的Lock方法和Unlock方法分別用於對其中寫鎖的鎖定和解鎖。這裏與實現接口的兩個方法的名稱是對應的。
在為recvCond初始化的時候,需要的是lock變量的讀鎖,並且還得是sync.Locker接口類型,就是要實現了Lock和Unlock方法的讀鎖。可是lock變量中用於讀鎖的方法卻是RLock方法和RUnlock方法,這裏名稱不對應了。不過有一個RLocker方法可以實現這一需求,下面是源碼裏實現的部分,很簡單:

// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
    return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

這裏我有一些小疑惑,3個方法裏面都是類型斷言吧。RLocker方法把原來的讀寫鎖類型轉成一個新的類型然後返回。後面的兩個方法,為了用新類型調用讀寫鎖類型裏的方法,先進行類型斷言,轉成讀寫鎖原本的類型,然後調用它的方法。

使用條件變量
下面是截取的使用時的部分代碼:

lock.Lock()
for !isEmpty {
    sendCond.Wait()
}
isEmpty = false
// 這裏可以做寫入的操作
lock.Unlock()
recvCond.Signal()

上面是一個寫入的流程。之前的代碼定義了一個狀態變量isEmpty,只有狀態為空的時候,才允許寫入,寫入後把狀態設置為非空。
這裏要先調用Lock方法,等待通知(wait)是要在互斥鎖的保護下進行的。
然後再操作完之後,先調用Unlock方法,再發送通知,發送通知的操作要在互斥鎖解鎖之後。
這裏等待的出sendCond的信號,而最後發送的是recvCond的信號。在另一個讀取的流程裏則正好相反。利用條件變量可以實現單向的通知,而這裏要實現雙向的通知,就需要兩個條件變量。這是條件變量的基本使用原則。

示例代碼

上面把關鍵的代碼分析了一下,下面是完整的示例代碼:

package main

import (
    "fmt"
    "sync"
    "time"
    "flag"
)

var useCond bool

func init() {
    flag.BoolVar(&useCond, "cond", false, "是否使用條件變量")
}

type msgBox struct {
    message  string
    isEmpty  bool
    sendCond *sync.Cond
    recvCond *sync.Cond
}

func main() {
    flag.Parse()
    fmt.Println("是否開啟了條件變量保護:", useCond)

    var lock sync.RWMutex
    msgBox := msgBox{
        isEmpty:  true,  // 默認值是false,狀態初始值應該為true
        sendCond: sync.NewCond(&lock),  // 不是開箱即用的,需要在使用前初始化
        recvCond: sync.NewCond(lock.RLocker()),
    }

    done := make(chan struct{})
    max := 5

    // 寫操作的goroutine
    go func(max int) {
        defer func() {
            done <- struct{}{}
        }()
        for i := 0; i < max; i++ {
            time.Sleep(time.Millisecond * 200)
            // 先進行保護
            lock.Lock()
            // 再等待通知
            for useCond && !msgBox.isEmpty {
                msgBox.sendCond.Wait()
            }
            msgBox.isEmpty = false
            msg := fmt.Sprintf("第 %d 條消息", i)
            msgBox.message = msg
            fmt.Printf("發送消息[%d]: %s\n", i, msg)
            // 先解鎖
            lock.Unlock()
            // 再發送通知
            msgBox.recvCond.Signal()
        }
    }(max)

    // 讀操作的goroutine
    go func(max int) {
        defer func() {
            done <- struct{}{}
        }()
        for j := 0; j < max; j++ {
            time.Sleep(time.Millisecond * 500)
            lock.RLock()
            for useCond && msgBox.isEmpty {
                msgBox.recvCond.Wait()
            }
            msgBox.isEmpty = true
            msg := msgBox.message
            fmt.Printf("接收消息[%d]: %s\n", j, msg)
            lock.RUnlock()
            msgBox.sendCond.Signal()
        }
    }(max)
    <-done
    <-done
    fmt.Println("Over")
}

代碼中條件變量的作用
在這個例子裏,寫的時候要獲取到寫鎖,讀的時候要獲取到讀鎖,這個邏輯和之前互斥鎖是一樣的。但是只是獲取到鎖還不能做操作,這裏還要再做一個限制,所以就用到了條件變量。
在這個例子裏,寫操作和讀操作是需要成對出現的。寫完一次之後,依然能獲取到寫鎖,但是不能立刻寫。而是要等待讀操作把之前寫入的數據讀過之後,才能再次寫入,把之前的內容覆蓋掉。讀操作也是一樣。這裏就需要兩個goroutine之間傳遞信號了。
通過命令行參數分別在開啟/關閉條件變量的環境下運行,可以看到其中的作用:

go run main.go
go run main.go -cond

Wait方法

條件變量的Wait方法主要做了4件事:

  1. 把調用它的goroutine加入到當前條件變量的通知隊列中
  2. 解鎖當前的條件變量基於的那個互斥鎖
  3. 讓當前的goroutine處於等待狀態,等到通知來了再決定是否喚醒它。此時阻塞在調用Wait方法的那行代碼上
  4. 如果通知來了並且決定喚醒當前goroutine,就在喚醒它之後重新鎖定當前條件變量基於的互斥鎖

先解鎖,在阻塞
在Wait方法裏,必須要先解鎖,在阻塞當前goroutine。否則就違背了互斥鎖要成對出現的原則。並且當前goroutine在解鎖千就阻塞的話,當前goroutine就不可能在執行解鎖了。即使不考慮原則,讓別的goroutine來解鎖,又會有重復解鎖可能。

使用for語句
並且Wait方法建議是放在一個for循環裏的。這裏似乎也是可以用if語句的。但是if語句只能檢查狀態一次,而for的話可以進行多次檢查。如果goroutine收到了通知而喚醒,但是此時檢查時發現狀態還是不對,那麽就應該再次調用Wait方法。保險起見,在包裹條件變量的Wait方法總是應該使用for語句。

Signal方法和Broadcast方法

這2個方法都是用來發送通知的。Signal方法的通知只會喚醒一個goroutine,而Broadcast方法的通知會喚醒所有等待的goroutine。Wait方法會把當前的goroutine添加到通知隊列的隊尾,而Signal方法會從通知隊列的隊首開始查找可以被喚醒的goroutine。因此Signal方法喚醒的一般是最早等待的那個goroutine。

適用場景
這2個方法的行為決定他們的適用場景。確定只有一個goroutine在等待通知,或者值需要喚醒一個goroutine的時候,就使用Signal方法。否則,使用Broadcast方法總是沒錯的,Broadcast方法的適用場景更多。

通知的即時性
條件變量的通知具有即時性。如果發送通知的時候沒有goroutine在等待,那麽該次通知就會被直接丟棄。之後再開始等待的goroutine需要等待之後的通知。

示例代碼2

還是前面那個示例,稍微改了改,把讀寫鎖換成了互斥鎖,通知方法把Signal換成了Broadcast:

package main

import (
    "fmt"
    "sync"
    "time"
)

var lock sync.Mutex

// 匿名結構體,定義並初始化賦值
// 嵌入式鎖(Embedded lock)的場景適合使用匿名結構體
var msgBox = struct {
    message  string
    isEmpty  bool
    sendCond *sync.Cond
    recvCond *sync.Cond
}{
    isEmpty: true,
    sendCond: sync.NewCond(&lock),
    recvCond: sync.NewCond(&lock),
}

// 用於設置消息的函數
func send(id, index int) {
    lock.Lock()
    for !msgBox.isEmpty {
        msgBox.sendCond.Wait()
    }
    msg := fmt.Sprintf("msg: [%d-%d]", id, index)
    msgBox.message = msg
    fmt.Printf("發送消息[%d-%d]: %s\t", id, index, msg)
    msgBox.isEmpty = false
    lock.Unlock()
    msgBox.recvCond.Broadcast()
}

// 用於讀取消息的函數
func recv(id, index int) {
    lock.Lock()
    for msgBox.isEmpty {
        msgBox.recvCond.Wait()
    }
    msg := msgBox.message
    msgBox.message = ""
    fmt.Printf("接收消息[%d-%d]: %s\n", id, index, msg)
    msgBox.isEmpty = true
    lock.Unlock()
    msgBox.sendCond.Broadcast()
}

func main() {
    done := make(chan struct{})
    count := 5

    // 啟動一個goroutine用於發送
    go func(id, count int) {
        defer func() {
            done <- struct{}{}
        }()
        for i := 0; i < count; i++ {
            time.Sleep(time.Millisecond * 100)
            send(id, i)
        }
    }(0, count * 2)

    // 啟動兩個goroutine用於接收
    go func(id, count int) {
        defer func() {
            done <- struct{}{}
        }()
        for i := 0; i < count; i++ {
            time.Sleep(time.Millisecond * 300)
            recv(id, i)
        }
    }(1, count)
    go func(id, count int) {
        defer func() {
            done <- struct{}{}
        }()
        for i := 0; i < count; i++ {
            time.Sleep(time.Millisecond * 400)
            recv(id, i)
        }
    }(2, count)

    <- done
    <- done
    <- done
    fmt.Println("Over")
}

Go36-27,28-條件變量