Go基礎-channel通道
不同的並行協程之間交流的方式有兩種,一種是通過共享變數,另一種是通過佇列。Go 語言鼓勵使用佇列的形式來交流,它單獨為協程之間的佇列資料交流定製了特殊的語法 —— 通道。
通道是協程的輸入和輸出。作為協程的輸出,通道是一個容器,它可以容納資料。作為協程的輸入,通道是一個生產者,它可以向協程提供資料。通道作為容器是有限定大小的,滿了就寫不進去,空了就讀不出來。通道還有它自己的型別,它可以限定進入通道的資料的型別。
建立通道
建立通道只有一種語法,那就是 make 全域性函式,提供第一個型別引數限定通道可以容納的資料型別,再提供第二個整數引數作為通道的容器大小。大小引數是可選的,如果不填,那這個通道的容量為零,叫著「非緩衝型通道」,非緩衝型通道必須確保有協程正在嘗試讀取當前通道,否則寫操作就會阻塞直到有其它協程來從通道中讀東西。非緩衝型通道總是處於既滿又空的狀態。與之對應的有限定大小的通道就是緩衝型通道。在 Go 語言裡不存在無界通道,每個通道都是有限定最大容量的。
// 緩衝型通道,裡面只能放整數 var bufferedChannel = make(chan int, 1024) // 非緩衝型通道 var unbufferedChannel = make(chan int)
讀寫通道
Go 語言為通道的讀寫設計了特殊的箭頭語法糖 <-
,讓我們使用通道時非常方便。把箭頭寫在通道變數的右邊就是寫通道,把箭頭寫在通道的左邊就是讀通道。一次只能讀寫一個元素。
package main import "fmt" func main() { var ch chan int = make(chan int, 4) for i := 0; i < cap(ch); i++ { ch <- i // 寫通道 } for len(ch) > 0 { var value int = <-ch // 讀通道 fmt.Println(value) } }
通道作為容器,它可以像切片一樣,使用 cap() 和 len() 全域性函式獲得通道的容量和當前內部的元素個數。通道一般作為不同的協程交流的媒介,在同一個協程裡它也是可以使用的。
讀寫阻塞
通道滿了,寫操作就會阻塞,協程就會進入休眠,直到有其它協程讀通道挪出了空間,協程才會被喚醒。如果有多個協程的寫操作都阻塞了,一個讀操作只會喚醒一個協程。
通道空了,讀操作就會阻塞,協程也會進入睡眠,直到有其它協程寫通道裝進了資料才會被喚醒。如果有多個協程的讀操作阻塞了,一個寫操作也只會喚醒一個協程。
package main import "fmt" import "time" import "math/rand" func send(ch chan int) { for { var value = rand.Intn(100) ch <- value fmt.Printf("send %d\n", value) } } func recv(ch chan int) { for { value := <- ch fmt.Printf("recv %d\n", value) time.Sleep(time.Second) } } func main() { var ch = make(chan int, 1) // 子協程迴圈讀 go recv(ch) // 主協程迴圈寫 send(ch) } -------- send 81 send 87 recv 81 recv 87 send 47 recv 47 send 59
關閉通道
Go 語言的通道有點像檔案,不但支援讀寫操作, 還支援關閉。讀取一個已經關閉的通道會立即返回通道型別的「零值」,而寫一個已經關閉的通道會拋異常。如果通道里的元素是整型的,讀操作是不能通過返回值來確定通道是否關閉的。
package main import "fmt" func main() { var ch = make(chan int, 4) ch <- 1 ch <- 2 close(ch) value := <- ch fmt.Println(value) value = <- ch fmt.Println(value) value = <- ch fmt.Println(value) } ------- 1 2 0
這時候就需要引入一個新的知識點 —— 使用 for range 語法糖來遍歷通道
for range
語法我們已經見了很多次了,它是多功能的,除了可以遍歷陣列、切片、字典,還可以遍歷通道,取代箭頭操作符。當通道空了,迴圈會暫停阻塞,當通道關閉時,阻塞停止,迴圈也跟著結束了。當迴圈結束時,我們就知道通道已經關閉了。
package main import "fmt" func main() { var ch = make(chan int, 4) ch <- 1 ch <- 2 close(ch) // for range 遍歷通道 for value := range ch { fmt.Println(value) } } ------ 1 2
通道如果沒有顯式關閉,當它不再被程式使用的時候,會自動關閉被垃圾回收掉。不過優雅的程式應該將通道看成資源,顯式關閉每個不再使用的資源是一種良好的習慣。
通道寫安全
上面提到向一個已經關閉的通道執行寫操作會丟擲異常,這意味著我們在寫通道時一定要確保通道沒有被關閉。
package main import "fmt" func send(ch chan int) { i := 0 for { i++ ch <- i } } func recv(ch chan int) { value := <- ch fmt.Println(value) value = <- ch fmt.Println(value) close(ch) } func main() { var ch = make(chan int, 4) go recv(ch) send(ch) } --------- 1 2 panic: send on closed channel goroutine 1 [running]: main.send(0xc00008e000) /Users/cass/go/src/demo/main.go:9 +0x44 main.main() /Users/cass/go/src/demo/main.go:24 +0x66 exit status 2
那如何確保呢?Go 語言並不存在一個內建函式可以判斷出通道是否已經被關閉。即使存在這樣一個函式,當你判斷時通道沒有關閉,並不意味著當你往通道里寫資料時它就一定沒有被關閉,併發環境下,它是可能被其它協程隨時關閉的。
確保通道寫安全的最好方式是由負責寫通道的協程自己來關閉通道,讀通道的協程不要去關閉通道。
package main import "fmt" func send(ch chan int) { ch <- 1 ch <- 2 ch <- 3 ch <- 4 close(ch) } func recv(ch chan int) { for v := range ch { fmt.Println(v) } } func main() { var ch = make(chan int, 1) go send(ch) recv(ch) } ----------- 1 2 3 4
這個方法確實可以解決單寫多讀的場景,可要是遇上了多寫單讀的場合該怎麼辦呢?任意一個讀寫通道的協程都不可以隨意關閉通道,否則會導致其它寫通道協程丟擲異常。這時候就必須讓其它不相干的協程來幹這件事,這個協程需要等待所有的寫通道協程都結束執行後才能關閉通道。那其它協程要如何才能知道所有的寫通道已經結束運行了呢?這個就需要使用到內建 sync 包提供的 WaitGroup 物件,它使用計數來等待指定事件完成。
package main import "fmt" import "time" import "sync" func send(ch chan int, wg *sync.WaitGroup) { defer wg.Done() // 計數值減一 i := 0 for i < 4 { i++ ch <- i } } func recv(ch chan int) { for v := range ch { fmt.Println(v) } } func main() { var ch = make(chan int, 4) var wg = new(sync.WaitGroup) wg.Add(2) // 增加計數值 go send(ch, wg)// 寫 go send(ch, wg)// 寫 go recv(ch) // Wait() 阻塞等待所有的寫通道協程結束 // 待計數值變成零,Wait() 才會返回 wg.Wait() // 關閉通道 close(ch) time.Sleep(time.Second) } --------- 1 2 3 4 1 2 3 4
多路通道
在真實的世界中,還有一種訊息傳遞場景,那就是消費者有多個消費來源,只要有一個來源生產了資料,消費者就可以讀這個資料進行消費。這時候可以將多個來源通道的資料匯聚到目標通道,然後統一在目標通道進行消費。
package main import "fmt" import "time" // 每隔一會生產一個數 func send(ch chan int, gap time.Duration) { i := 0 for { i++ ch <- i time.Sleep(gap) } } // 將多個原通道內容拷貝到單一的目標通道 func collect(source chan int, target chan int) { for v := range source { target <- v } } // 從目標通道消費資料 func recv(ch chan int) { for v := range ch { fmt.Printf("receive %d\n", v) } } func main() { var ch1 = make(chan int) var ch2 = make(chan int) var ch3 = make(chan int) go send(ch1, time.Second) go send(ch2, 2 * time.Second) go collect(ch1, ch3) go collect(ch2, ch3) recv(ch3) } --------- receive 1 receive 1 receive 2 receive 2 receive 3 receive 4 receive 3 receive 5 receive 6 receive 4 receive 7 receive 8 receive 5 receive 9 ....
但是上面這種形式比較繁瑣,需要為每一種消費來源都單獨啟動一個匯聚協程。Go 語言為這種使用場景帶來了「多路複用」語法糖,也就是下面要講的 select 語句,它可以同時管理多個通道讀寫,如果所有通道都不能讀寫,它就整體阻塞,只要有一個通道可以讀寫,它就會繼續。下面我們使用 select 語句來簡化上面的邏輯
package main import "fmt" import "time" func send(ch chan int, gap time.Duration) { i := 0 for { i++ ch <- i time.Sleep(gap) } } func recv(ch1 chan int, ch2 chan int) { for { select { case v := <- ch1: fmt.Printf("recv %d from ch1\n", v) case v := <- ch2: fmt.Printf("recv %d from ch2\n", v) } } } func main() { var ch1 = make(chan int) var ch2 = make(chan int) go send(ch1, time.Second) go send(ch2, 2 * time.Second) recv(ch1, ch2) } ------------ recv 1 from ch2 recv 1 from ch1 recv 2 from ch1 recv 3 from ch1 recv 2 from ch2 recv 4 from ch1 recv 3 from ch2 recv 5 from ch1
上面是多路複用 select 語句的讀通道形式,下面是它的寫通道形式,只要有一個通道能寫進去,它就會打破阻塞。
select { case ch1 <- v: fmt.Println("send to ch1") case ch2 <- v: fmt.Println("send to ch2") }
非阻塞讀寫
前面我們講的讀寫都是阻塞讀寫,Go 語言還提供了通道的非阻塞讀寫。當通道空時,讀操作不會阻塞,當通道滿時,寫操作也不會阻塞。非阻塞讀寫需要依靠 select 語句的 default 分支。當 select 語句所有通道都不可讀寫時,如果定義了 default 分支,那就會執行 default 分支邏輯,這樣就起到了不阻塞的效果。下面我們演示一個單生產者多消費者的場景。生產者同時向兩個通道寫資料,寫不進去就丟棄。
package main import "fmt" import "time" func send(ch1 chan int, ch2 chan int) { i := 0 for { i++ select { case ch1 <- i: fmt.Printf("send ch1 %d\n", i) case ch2 <- i: fmt.Printf("send ch2 %d\n", i) default: } } } func recv(ch chan int, gap time.Duration, name string) { for v := range ch { fmt.Printf("receive %s %d\n", name, v) time.Sleep(gap) } } func main() { // 無緩衝通道 var ch1 = make(chan int) var ch2 = make(chan int) // 兩個消費者的休眠時間不一樣,名稱不一樣 go recv(ch1, time.Second, "ch1") go recv(ch2, 2*time.Second, "ch2") send(ch1, ch2) } ------------ send ch1 27 send ch2 28 receive ch1 27 receive ch2 28 send ch1 6708984 receive ch1 6708984 send ch2 13347544 send ch1 13347775 receive ch2 13347544 receive ch1 13347775 send ch1 20101642 receive ch1 20101642 send ch2 26775795 receive ch2 26775795 ...
從輸出中可以明顯看出有很多的資料都丟棄了,消費者讀到的資料是不連續的。如果將 select 語句裡面的 default 分支幹掉,再執行一次,結果如下
send ch2 1 send ch1 2 receive ch1 2 receive ch2 1 receive ch1 3 send ch1 3 receive ch2 4 send ch2 4 send ch1 5 receive ch1 5 receive ch1 6 send ch1 6 receive ch1 7
可以看到消費者讀到的資料都連續了,但是每個資料只給了一個消費者。select 語句的 default 分支非常關鍵,它是決定通道讀寫操作阻塞與否的關鍵。
併發與安全
雖然 Go 語言官方推薦使用通道的方式來共享資料,但是通過變數來共享才是基礎,因為通道在底層也是通過共享變數的方式來實現的。通道的內部資料結構包含一個數組,對通道的讀寫就是對內部陣列的讀寫。
在併發環境下共享讀寫變數必須要使用鎖來控制資料結構的安全,Go 語言內建了 sync 包,裡面包含了我們平時需要經常使用的互斥鎖物件 sync.Mutex。Go 語言內建的字典不是執行緒安全的,所以下面我們嘗試使用互斥鎖物件來保護字典,讓它變成執行緒安全的字典。
執行緒不安全的字典
Go 語言內建了資料結構「競態檢查」工具來幫我們檢查程式中是否存線上程不安全的程式碼。當我們在執行程式碼時,開啟 -run 開關,程式就會在內建的通用資料結構中進行埋點檢查。競態檢查工具在 Go 1.1 版本中引入,該功能幫助 Go 語言「元團隊」找出了 Go 語言標準庫中幾十個存線上程安全隱患的 bug,這是一個非常了不起的功能。同時這也說明了即使是猿界的神仙,寫出來的程式碼也避免不了有 bug。下面我們來嘗試一下
package main import "fmt" func write(d map[string]int) { d["fruit"] = 2 } func read(d map[string]int) { fmt.Println(d["fruit"]) } func main() { d := map[string]int{} go read(d) write(d) }
上面的程式碼明視訊記憶體在安全隱患,執行下面的競態檢查指令觀察輸出結果
go run -race main.go 10019 2.07 ================== WARNING: DATA RACE Read at 0x00c000080180 by goroutine 6: runtime.mapaccess1_faststr() /usr/local/Cellar/go/1.11.2/libexec/src/runtime/map_faststr.go:12 +0x0 main.read() /Users/cass/go/src/demo/main.go:10 +0x5d Previous write at 0x00c000080180 by main goroutine: runtime.mapassign_faststr() /usr/local/Cellar/go/1.11.2/libexec/src/runtime/map_faststr.go:190 +0x0 main.main() /Users/cass/go/src/demo/main.go:6 +0x88 Goroutine 6 (running) created at: main.main() /Users/cass/go/src/demo/main.go:15 +0x59 ================== ================== WARNING: DATA RACE Read at 0x00c0000827d8 by goroutine 6: main.read() /Users/cass/go/src/demo/main.go:10 +0x70 Previous write at 0x00c0000827d8 by main goroutine: main.main() /Users/cass/go/src/demo/main.go:6 +0x9d Goroutine 6 (running) created at: main.main() /Users/cass/go/src/demo/main.go:15 +0x59 ================== 2 Found 2 data race(s) exit status 66
競態檢查工具是基於執行時程式碼檢查,而不是通過程式碼靜態分析來完成的。這意味著那些沒有機會執行到的程式碼邏輯中如果存在安全隱患,它是檢查不出來的。
執行緒安全的字典
讓字典變的執行緒安全,就需要對字典的所有讀寫操作都使用互斥鎖保護起來。
package main import "fmt" import "sync" type SafeDict struct { datamap[string]int mutex *sync.Mutex } func NewSafeDict(data map[string]int) *SafeDict { return &SafeDict{ data:data, mutex: &sync.Mutex{}, } } func (d *SafeDict) Len() int { d.mutex.Lock() defer d.mutex.Unlock() return len(d.data) } func (d *SafeDict) Put(key string, value int) (int, bool) { d.mutex.Lock() defer d.mutex.Unlock() old_value, ok := d.data[key] d.data[key] = value return old_value, ok } func (d *SafeDict) Get(key string) (int, bool) { d.mutex.Lock() defer d.mutex.Unlock() old_value, ok := d.data[key] return old_value, ok } func (d *SafeDict) Delete(key string) (int, bool) { d.mutex.Lock() defer d.mutex.Unlock() old_value, ok := d.data[key] if ok { delete(d.data, key) } return old_value, ok } func write(d *SafeDict) { d.Put("banana", 5) } func read(d *SafeDict) { fmt.Println(d.Get("banana")) } func main() { d := NewSafeDict(map[string]int{ "apple": 2, "pear":3, }) go read(d) write(d) }
嘗試使用競態檢查工具執行上面的程式碼,會發現沒有了剛才一連串的警告輸出,說明 Get 和 Put 方法已經做到了協程安全,但是還不能說明 Delete() 方法是否安全,因為它根本沒有機會得到執行。
在上面的程式碼中我們再次看到了 defer 語句的應用場景 —— 釋放鎖。defer 語句總是要推遲到函式尾部執行,所以如果函式邏輯執行時間比較長,這會導致鎖持有的時間較長,這時使用 defer 語句來釋放鎖未必是一個好注意。
避免鎖複製
上面的程式碼中還有一個需要特別注意的地方是 sync.Mutex 是一個結構體物件,這個物件在使用的過程中要避免被複制 —— 淺拷貝。複製將會導致鎖被「分裂」了,也就起不到保護的作用。所以在平時的使用中要儘量使用它的指標型別。讀者可以嘗試將上面的型別換成非指標型別,然後執行一下競態檢查工具,會看到警告資訊再次佈滿整個螢幕。鎖複製存在於結構體變數的賦值、函式引數傳遞、方法引數傳遞中,都需要注意。
使用匿名鎖欄位
在結構體章節,我們知道外部結構體可以自動繼承匿名內部結構體的所有方法。如果將上面的 SafeDict 結構體進行改造,將鎖欄位匿名,就可以稍微簡化一下程式碼。
package main import "fmt" import "sync" type SafeDict struct { data map[string]int *sync.Mutex } func NewSafeDict(data map[string]int) *SafeDict { return &SafeDict{data, &sync.Mutex{}} } func (d *SafeDict) Len() int { d.Lock() defer d.Unlock() return len(d.data) } func (d *SafeDict) Put(key string, value int) (int, bool) { d.Lock() defer d.Unlock() old_value, ok := d.data[key] d.data[key] = value return old_value, ok } func (d *SafeDict) Get(key string) (int, bool) { d.Lock() defer d.Unlock() old_value, ok := d.data[key] return old_value, ok } func (d *SafeDict) Delete(key string) (int, bool) { d.Lock() defer d.Unlock() old_value, ok := d.data[key] if ok { delete(d.data, key) } return old_value, ok } func write(d *SafeDict) { d.Put("banana", 5) } func read(d *SafeDict) { fmt.Println(d.Get("banana")) } func main() { d := NewSafeDict(map[string]int{ "apple": 2, "pear":3, }) go read(d) write(d) }
使用讀寫鎖
日常應用中,大多數併發資料結構都是讀多寫少的,對於讀多寫少的場合,可以將互斥鎖換成讀寫鎖,可以有效提升效能。sync 包也提供了讀寫鎖物件 RWMutex,不同於互斥鎖只有兩個常用方法 Lock() 和 Unlock(),讀寫鎖提供了四個常用方法,分別是寫加鎖 Lock()、寫釋放鎖 Unlock()、讀加鎖 RLock() 和讀釋放鎖 RUnlock()。寫鎖是拍他鎖,加寫鎖時會阻塞其它協程再加讀鎖和寫鎖,讀鎖是共享鎖,加讀鎖還可以允許其它協程再加讀鎖,但是會阻塞加寫鎖。
讀寫鎖在寫併發高的情況下效能退化為普通的互斥鎖
下面我們將上面程式碼中 SafeDict 的互斥鎖改造成讀寫鎖。
package main import "fmt" import "sync" type SafeDict struct { data map[string]int *sync.RWMutex } func NewSafeDict(data map[string]int) *SafeDict { return &SafeDict{data, &sync.RWMutex{}} } func (d *SafeDict) Len() int { d.RLock() defer d.RUnlock() return len(d.data) } func (d *SafeDict) Put(key string, value int) (int, bool) { d.Lock() defer d.Unlock() old_value, ok := d.data[key] d.data[key] = value return old_value, ok } func (d *SafeDict) Get(key string) (int, bool) { d.RLock() defer d.RUnlock() old_value, ok := d.data[key] return old_value, ok } func (d *SafeDict) Delete(key string) (int, bool) { d.Lock() defer d.Unlock() old_value, ok := d.data[key] if ok { delete(d.data, key) } return old_value, ok } func write(d *SafeDict) { d.Put("banana", 5) } func read(d *SafeDict) { fmt.Println(d.Get("banana")) } func main() { d := NewSafeDict(map[string]int{ "apple": 2, "pear":3, }) go read(d) write(d) }