1. 程式人生 > >go中的資料結構通道-channel

go中的資料結構通道-channel

1. channel的使用

  很多文章介紹channel的時候都和併發揉在一起,這裡我想把它當做一種資料結構來單獨介紹它的實現原理。

  channel,通道。golang中用於資料傳遞的一種資料結構。是golang中一種傳遞資料的方式,也可用作事件通知。

1.1 宣告、傳值、關閉

  使用chan關鍵字宣告一個通道,在使用前必須先建立,操作符 <- 用於指定通道的方向,傳送或接收。如果未指定方向,則為雙向通道。

 1 //宣告和建立
 2 var ch chan int      // 宣告一個傳遞int型別的channel
 3 ch := make(chan int) // 使用內建函式make()定義一個channel
 4 ch2 := make(chan interface{})         // 建立一個空介面型別的通道, 可以存放任意格式
 5 
 6 type Equip struct{ /* 一些欄位 */ }
 7 ch2 := make(chan *Equip)             // 建立Equip指標型別的通道, 可以存放*Equip
 8 
 9 //傳值
10 ch <- value          // 將一個數據value寫入至channel,這會導致阻塞,直到有其他goroutine從這個channel中讀取資料
11 value := <-ch        // 從channel中讀取資料,如果channel之前沒有寫入資料,也會導致阻塞,直到channel中被寫入資料為止
12 
13 ch := make(chan interface{})  // 建立一個空介面通道
14 ch <- 0 // 將0放入通道中
15 ch <- "hello"  // 將hello字串放入通道中
16 
17 //關閉
18 close(ch)            // 關閉channel

  把資料往通道中傳送時,如果接收方一直都沒有接收,那麼傳送操作將持續阻塞。Go 程式執行時能智慧地發現一些永遠無法傳送成功的語句並報錯:

fatal error: all goroutines are asleep - deadlock! 
//執行時發現所有的 goroutine(包括main)都處於等待 goroutine。

1.2 四種重要的通道使用方式

無緩衝通道

  通道預設是無緩衝的,無緩衝通道上的傳送操作將會被阻塞,直到有其他goroutine從對應的通道上執行接收操作,資料傳送完成,通道繼續工作。

package main

import (
    "fmt"
    "time"
)
var done chan bool
func HelloWorld() {
    fmt.Println("Hello world goroutine")
    time.Sleep(1*time.Second)
    done <- true
}
func main() {
    done = make(chan bool)  // 建立一個channel
    go HelloWorld()
    <-done
}
1 //輸出
2 //Hello world goroutine

  由於main不會等goroutine執行結束才返回,前文專門加了sleep輸出為了可以看到goroutine的輸出內容,那麼在這裡由於是阻塞的,所以無需sleep。

  將程式碼中”done <- true”和”<-done”,去掉再執行,沒有上面的輸出內容。

管道

  通道可以用來連線goroutine,這樣一個的輸出是另一個輸入。這就叫做管道:

 

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 var echo chan string
 8 var receive chan string
 9 
10 // 定義goroutine 1 
11 func Echo() {
12     time.Sleep(1*time.Second)
13     echo <- "這是一次測試"
14 }
15 
16 // 定義goroutine 2
17 func Receive() {
18     temp := <- echo // 阻塞等待echo的通道的返回
19     receive <- temp
20 }
21 
22 
23 func main() {
24     echo = make(chan string)
25     receive = make(chan string)
26 
27     go Echo()
28     go Receive()
29 
30     getStr := <-receive   // 接收goroutine 2的返回
31 
32     fmt.Println(getStr)
33 }

  輸出字串:"這是一次測試"。

  在這裡不一定要去關閉channel,因為底層的垃圾回收機制會根據它是否可以訪問來決定是否自動回收它。(這裡不是根據channel是否關閉來決定的)

單向通道型別
 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 // 定義goroutine 1
 9 func Echo(out chan<- string) {   // 定義輸出通道型別
10     time.Sleep(1*time.Second)
11     out <- "這又是一次測試"
12     close(out)
13 }
14 
15 // 定義goroutine 2
16 func Receive(out chan<- string, in <-chan string) { // 定義輸出通道型別和輸入型別
17     temp := <-in // 阻塞等待echo的通道的返回
18     out <- temp
19     close(out)
20 }
21 
22 
23 func main() {
24     echo := make(chan string)
25     receive := make(chan string)
26 
27     go Echo(echo)
28     go Receive(receive, echo)
29 
30     getStr := <-receive   // 接收goroutine 2的返回
31 
32     fmt.Println(getStr)
33 }

  輸出:這又是一次測試。

緩衝管道

  goroutine的通道預設是是阻塞的,那麼有什麼辦法可以緩解阻塞? 答案是:加一個緩衝區。

  建立一個緩衝通道:

1 ch := make(chan string, 3) // 建立了緩衝區為3的通道
2 
3 //==
4 len(ch)   // 長度計算
5 cap(ch)   // 容量計算

  緩衝通道傳遞資料示意圖:

 

2. 內部結構 

  Go語言channel是first-class的,意味著它可以被儲存到變數中,可以作為引數傳遞給函式,也可以作為函式的返回值返回。作為Go語言的核心特徵之一,雖然channel看上去很高階,但是其實channel僅僅就是一個數據結構而已,具體定義在 $GOROOT/src/runtime/chan.go裡。如下:

 1 type hchan struct {
 2   qcount uint   // 佇列中的總資料
 3   dataqsiz uint   // 迴圈佇列的大小
 4   buf unsafe.Pointer // 指向dataqsiz元素陣列
 5   elemsize uint16  // 
 6   closed uint32 
 7   elemtype *_type // 元素型別
 8   sendx uint // 傳送索引
 9   recvx uint // 接收索引
10   recvq waitq // 接待員名單, 因recv而阻塞的等待佇列。
11   sendq waitq // 傳送服務員列表, 因send而阻塞的等待佇列。
12   //鎖定保護hchan中的所有欄位,以及幾個在此通道上阻止的sudogs中的欄位。
13   //按住此鎖定時不要更改另一個G的狀態(尤其是不要準備G),因為這可能會導致死鎖堆疊縮小。
14   lock mutex 
15 }

   其中一個核心的部分是存放channel資料的環形佇列,由qcount和elemsize分別指定了佇列的容量和當前使用量。dataqsize是佇列的大小。elemalg是元素操作的一個Alg結構體,記錄下元素的操作,如copy函式,equal函式,hash函式等。

  如果是帶緩衝區的chan,則緩衝區資料實際上是緊接著Hchan結構體中分配的。不帶緩衝的 channel ,環形佇列 size 則為 0。

1 c = (Hchan*)runtime.mal(n + hint*elem->size);

  另一重要部分就是recvq和sendq兩個連結串列,一個是因讀這個通道而導致阻塞的goroutine,另一個是因為寫這個通道而阻塞的goroutine。如果一個goroutine阻塞於channel了,那麼它就被掛在recvq或sendq中。WaitQ是連結串列的定義,包含一個頭結點和一個尾結點:

1 struct    WaitQ
2 {
3     SudoG*    first;
4     SudoG*    last;
5 };

  佇列中的每個成員是一個SudoG結構體變數:

1 struct    SudoG
2 {
3     G*    g;        // g和selgen構成
4     uint32    selgen;        // 指向g的弱指標
5     SudoG*    link;
6     int64    releasetime;
7     byte*    elem;        // 資料元素
8 };

  該結構中主要的就是一個g和一個elem。elem用於儲存goroutine的資料。讀通道時,資料會從Hchan的佇列中拷貝到SudoG的elem域。寫通道時,資料則是由SudoG的elem域拷貝到Hchan的佇列中。

 

  基本的寫channel操作,在底層執行時庫中對應的是一個runtime.chansend函式。

1 c <- v
  在執行時庫中會執行:
1 void runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)

  其中c就是channel,ep是取變數v的地址。這裡的傳值約定是呼叫者負責分配好ep的空間,僅需要簡單的取變數地址就夠了。pres引數是在select中的通道操作使用的。

  這個函式首先會區分是同步還是非同步。同步是指chan是不帶緩衝區的,因此可能寫阻塞,而非同步是指chan帶緩衝區,只有緩衝區滿才阻塞。在同步的情況下,由於channel本身是不帶資料快取的,這時首先會檢視Hchan結構體中的recvq連結串列時否為空,即是否有因為讀該管道而阻塞的goroutine。如果有則可以正常寫channel,否則操作會阻塞。

  recvq不為空的情況下,將一個SudoG結構體出佇列,將傳給通道的資料(函式引數ep)拷貝到SudoG結構體中的elem域,並將SudoG中的g放到就緒佇列中,狀態置為ready,然後函式返回。如果recvq為空,否則要將當前goroutine阻塞。此時將一個SudoG結構體,掛到通道的sendq連結串列中,這個SudoG中的elem域是引數eq,SudoG中的g是當前的goroutine。當前goroutine會被設定為waiting狀態並掛到等待佇列中。

  在非同步的情況,如果緩衝區滿了,也是要將當前goroutine和資料一起作為SudoG結構體掛在sendq佇列中,表示因寫channel而阻塞。否則也是先看有沒有recvq連結串列是否為空,有就喚醒。跟同步不同的是在channel緩衝區不滿的情況,這裡不會阻塞寫者,而是將資料放到channel的緩衝區中,呼叫者返回。

  讀channel的操作也是類似的,對應的函式是runtime.chansend。一個是收一個是發,基本的過程都是差不多的。

  當協程嘗試從未關閉的 channel 中讀取資料時,內部的操作如下:

  • 當 buf 非空時,此時 recvq 必為空,buf 彈出一個元素給讀協程,讀協程獲得資料後繼續執行,此時若 sendq 非空,則從 sendq 中彈出一個寫協程轉入 running 狀態,待寫資料入佇列 buf ,此時讀取操作 <- ch 未阻塞;
  • 當 buf 為空但 sendq 非空時(不帶緩衝的 channel),則從 sendq 中彈出一個寫協程轉入 running 狀態,待寫資料直接傳遞給讀協程,讀協程繼續執行,此時讀取操作 <- ch 未阻塞;
  • 當 buf 為空並且 sendq 也為空時,讀協程入佇列 recvq 並轉入 blocking 狀態,當後續有其他協程往 channel 寫資料時,讀協程才會重新轉入 running 狀態,此時讀取操作 <- ch 阻塞。

  類似的,當協程嘗試往未關閉的 channel 中寫入資料時,內部的操作如下:

  • 當佇列 recvq 非空時,此時佇列 buf 必為空,從 recvq 彈出一個讀協程接收待寫資料,此讀協程此時結束阻塞並轉入 running 狀態,寫協程繼續執行,此時寫入操作 ch <- 未阻塞;
  • 當佇列 recvq 為空但 buf 未滿時,此時 sendq 必為空,寫協程的待寫資料入 buf 然後繼續執行,此時寫入操作 ch <- 未阻塞;
  • 當佇列 recvq 為空並且 buf 為滿時,此時寫協程入佇列 sendq 並轉入 blokcing 狀態,當後續有其他協程從 channel 中讀資料時,寫協程才會重新轉入 running 狀態,此時寫入操作 ch <- 阻塞。

  當關閉 non-nil channel 時,內部的操作如下:

  • 當佇列 recvq 非空時,此時 buf 必為空,recvq 中的所有協程都將收到對應型別的零值然後結束阻塞狀態;
  • 當佇列 sendq 非空時,此時 buf 必為滿,sendq 中的所有協程都會產生 panic ,在 buf 中資料仍然會保留直到被其他協程讀取。

  空通道是指將一個channel賦值為nil,或者定義後不呼叫make進行初始化。按照Go語言的語言規範,讀寫空通道是永遠阻塞的。其實在函式runtime.chansend和runtime.chanrecv開頭就有判斷這類情況,如果發現引數c是空的,則直接將當前的goroutine放到等待佇列,狀態設定為waiting。

  讀一個關閉的通道,永遠不會阻塞,會返回一個通道資料型別的零值。這個實現也很簡單,將零值複製到呼叫函式的引數ep中。寫一個關閉的通道,則會panic。關閉一個空通道,也會導致panic。

3. channel的高階用法

3.1 條件變數(condition variable)

  型別於 POSIX 介面中執行緒通知其他執行緒某個事件發生的條件變數,channel 的特性也可以用來當成協程之間同步的條件變數。因為 channel 只是用來通知,所以 channel 中具體的資料型別和值並不重要,這種場景一般用 strct {} 作為 channel 的型別。

一對一通知

  類似 pthread_cond_signal() 的功能,用來在一個協程中通知另個某一個協程事件發生:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func main() {
 9     ch := make(chan struct{})
10     nums := make([]int, 100)
11 
12     go func() {
13         time.Sleep(time.Second)
14         for i := 0; i < len(nums); i++ {
15             nums[i] = i
16         }
17         // send a finish signal
18         ch <- struct{}{}
19     }()
20 
21     // wait for finish signal
22     <-ch
23     fmt.Println(nums)
24 }
廣播通知

  類似 pthread_cond_broadcast() 的功能。利用從已關閉的 channel 讀取資料時總是非阻塞的特性,可以實現在一個協程中向其他多個協程廣播某個事件發生的通知:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6 )
 7 
 8 func main() {
 9     N := 10
10     exit := make(chan struct{})
11     done := make(chan struct{}, N)
12 
13     // start N worker goroutines
14     for i := 0; i < N; i++ {
15         go func(n int) {
16             for {
17                 select {
18                 // wait for exit signal
19                 case <-exit:
20                     fmt.Printf("worker goroutine #%d exit\n", n)
21                     done <- struct{}{}
22                     return
23                 case <-time.After(time.Second):
24                     fmt.Printf("worker goroutine #%d is working...\n", n)
25                 }
26             }
27         }(i)
28     }
29 
30     time.Sleep(3 * time.Second)
31     // broadcast exit signal
32     close(exit)
33     // wait for all worker goroutines exit
34     for i := 0; i < N; i++ {
35         <-done
36     }
37     fmt.Println("main goroutine exit")
38 }

3.2 訊號量

  channel 的讀/寫相當於訊號量的 P / V 操作,下面的示例程式中 channel 相當於訊號量:

 1 package main
 2 
 3 import (
 4     "log"
 5     "math/rand"
 6     "time"
 7 )
 8 
 9 type Seat int
10 type Bar chan Seat
11 
12 func (bar Bar) ServeConsumer(customerId int) {
13     log.Print("-> consumer#", customerId, " enters the bar")
14     seat := <-bar // need a seat to drink
15     log.Print("consumer#", customerId, " drinks at seat#", seat)
16     time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
17     log.Print("<- consumer#", customerId, " frees seat#", seat)
18     bar <- seat // free the seat and leave the bar
19 }
20 
21 func main() {
22     rand.Seed(time.Now().UnixNano())
23 
24     bar24x7 := make(Bar, 10) // the bar has 10 seats
25     // Place seats in an bar.
26     for seatId := 0; seatId < cap(bar24x7); seatId++ {
27         bar24x7 <- Seat(seatId) // none of the sends will block
28     }
29 
30     // a new consumer try to enter the bar for each second
31     for customerId := 0; ; customerId++ {
32         time.Sleep(time.Second)
33         go bar24x7.ServeConsumer(customerId)
34     }
35 }

3.3 互斥量

  互斥量相當於二元訊號裡,所以 cap 為 1 的 channel 可以當成互斥量使用:

 1 package main
 2 
 3 import "fmt"
 4 
 5 func main() {
 6     mutex := make(chan struct{}, 1) // the capacity must be one
 7 
 8     counter := 0
 9     increase := func() {
10         mutex <- struct{}{} // lock
11         counter++
12         <-mutex // unlock
13     }
14 
15     increase1000 := func(done chan<- struct{}) {
16         for i := 0; i < 1000; i++ {
17             increase()
18         }
19         done <- struct{}{}
20     }
21 
22     done := make(chan struct{})
23     go increase1000(done)
24     go increase1000(done)
25     <-done; <-done
26     fmt.Println(counter) // 2000
27 }

4. 關閉 channel

  關閉不再需要使用的 channel 並不是必須的。跟其他資源比如開啟的檔案、socket 連線不一樣,這類資源使用完後不關閉後會造成控制代碼洩露,channel 使用完後不關閉也沒有關係,channel 沒有被任何協程用到後最終會被 GC 回收。關閉 channel 一般是用來通知其他協程某個任務已經完成了。golang 也沒有直接提供判斷 channel 是否已經關閉的介面,雖然可以用其他不太優雅的方式自己實現一個:

1 func isClosed(ch chan int) bool {
2     select {
3     case <-ch:
4         return true
5     default:
6     }
7     return false
8 }

  不過實現一個這樣的介面也沒什麼必要。因為就算通過 isClosed() 得到當前 channel 當前還未關閉,如果試圖往 channel 裡寫資料,仍然可能會發生 panic ,因為在呼叫 isClosed() 後,其他協程可能已經把 channel 關閉了。
關閉 channel 時應該注意以下準則:

  • 不要在讀取端關閉 channel ,因為寫入端無法知道 channel 是否已經關閉,往已關閉的 channel 寫資料會 panic ;
  • 有多個寫入端時,不要再寫入端關閉 channle ,因為其他寫入端無法知道 channel 是否已經關閉,關閉已經關閉的 channel 會發生 panic ;
  • 如果只有一個寫入端,可以在這個寫入端放心關閉 channel 。

  關閉 channel 粗暴一點的做法是隨意關閉,如果產生了 panic 就用 recover 避免程序掛掉。稍好一點的方案是使用標準庫的 sync 包來做關閉 channel 時的協程同步,不過使用起來也稍微複雜些。下面介紹一種優雅些的做法。

4.1 一寫多讀

  這種場景下這個唯一的寫入端可以關閉 channel 用來通知讀取端所有資料都已經寫入完成了。讀取端只需要用 for range 把 channel 中資料遍歷完就可以了,當 channel 關閉時,for range 仍然會將 channel 緩衝中的資料全部遍歷完然後再退出迴圈:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "sync"
 6 )
 7 
 8 func main() {
 9     wg := &sync.WaitGroup{}
10     ch := make(chan int, 100)
11 
12     send := func() {
13         for i := 0; i < 100; i++ {
14             ch <- i
15         }
16         // signal sending finish
17         close(ch)
18     }
19 
20     recv := func(id int) {
21         defer wg.Done()
22         for i := range ch {
23             fmt.Printf("receiver #%d get %d\n", id, i)
24         }
25         fmt.Printf("receiver #%d exit\n", id)
26     }
27 
28     wg.Add(3)
29     go recv(0)
30     go recv(1)
31     go recv(2)
32     send()
33 
34     wg.Wait()
35 }

4.2 多寫一讀

  這種場景下雖然可以用 sync.Once 來解決多個寫入端重複關閉 channel 的問題,但更優雅的辦法設定一個額外的 channel ,由讀取端通過關閉來通知寫入端任務完成不要再繼續再寫入資料了:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "sync"
 6 )
 7 
 8 func main() {
 9     wg := &sync.WaitGroup{}
10     ch := make(chan int, 100)
11     done := make(chan struct{})
12 
13     send := func(id int) {
14         defer wg.Done()
15         for i := 0; ; i++ {
16             select {
17             case <-done:
18                 // get exit signal
19                 fmt.Printf("sender #%d exit\n", id)
20                 return
21             case ch <- id*1000 + i:
22             }
23         }
24     }
25 
26     recv := func() {
27         count := 0
28         for i := range ch {
29             fmt.Printf("receiver get %d\n", i)
30             count++
31             if count >= 1000 {
32                 // signal recving finish
33                 close(done)
34                 return
35             }
36         }
37     }
38 
39     wg.Add(3)
40     go send(0)
41     go send(1)
42     go send(2)
43     recv()
44 
45     wg.Wait()
46 }

4.2 多寫多讀

  這種場景稍微複雜,和上面的例子一樣,也需要設定一個額外 channel 用來通知多個寫入端和讀取端。另外需要起一個額外的協程來通過關閉這個 channel 來廣播通知:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "sync"
 6     "time"
 7 )
 8 
 9 func main() {
10     wg := &sync.WaitGroup{}
11     ch := make(chan int, 100)
12     done := make(chan struct{})
13 
14     send := func(id int) {
15         defer wg.Done()
16         for i := 0; ; i++ {
17             select {
18             case <-done:
19                 // get exit signal
20                 fmt.Printf("sender #%d exit\n", id)
21                 return
22             case ch <- id*1000 + i:
23             }
24         }
25     }
26 
27     recv := func(id int) {
28         defer wg.Done()
29         for {
30             select {
31             case <-done:
32                 // get exit signal
33                 fmt.Printf("receiver #%d exit\n", id)
34                 return
35             case i := <-ch:
36                 fmt.Printf("receiver #%d get %d\n", id, i)
37                 time.Sleep(time.Millisecond)
38             }
39         }
40     }
41 
42     wg.Add(6)
43     go send(0)
44     go send(1)
45     go send(2)
46     go recv(0)
47     go recv(1)
48     go recv(2)
49 
50     time.Sleep(time.Second)
51     // signal finish
52     close(done)
53     // wait all sender and receiver exit
54     wg.Wait()
55 }

  channle 作為 golang 最重要的特性,用起來還是比較方便的。傳統的 C 裡要實現類似的功能的話,一般需要用到 socket 或者 FIFO 來實現,另外還要考慮資料包的完整性與併發衝突的問題,channel 則遮蔽了這些底層細節,使用者只需要考慮讀寫就可以了。 channel 是引用型別,瞭解一下 channel 底層的機制對更好的使用 channel 還是很用必要的。雖然操作原語簡單,但涉及到阻塞的問題,使用不當可能會造成死鎖或者無限制的協程建立最終導致程序掛