前言

總所周知,go 裡面只有兩種 channel,一種是 unbuffered channel, 其宣告方式為

  1. ch := make(chan interface{})

另一種是 buffered channel,其宣告方式為

  1. bufferSize := 5
  2. ch := make(chan interface{},bufferSize)

對於一個 buffered channel,無論它的 buffer 有多大,它終究是有極限的。這個極限就是該 channel 最初被 make 時,所指定的 bufferSize 。

  1. jojo,buffer channel 的大小是有極限的,我不做 channel 了。

一旦 channel 滿了的話,再往裡面新增元素的話,將會阻塞。

so how can we make a infinite buffer channel?

本文參考了 medinum 上面的一篇文章,有興趣的同學可以直接閱讀原文。

實現
介面的設計

首先當然是建一個 struct,在百度翻譯的幫助下,我們將這個 struct 取名為 InfiniteChannel

  1. type InfiniteChannel struct {
  2. }

思考一下 channel 的核心行為,實際上就兩個,一個流入(Fan in),一個流出(Fan out),因此我們新增如下幾個 method。

  1. func (c *InfiniteChannel) In(val interface{}) {
  2. // todo
  3. }
  4. func (c *InfiniteChannel) Out() interface{} {
  5. // todo
  6. }
內部實現

通過 In() 接收的資料,總得需要一個地方來存放。我們可以用一個 slice 來存放,就算用 In() 往裡面添加了很多元素,也可以通過 append() 來拓展 sliceslice 的容量可以無限拓展下去(記憶體足夠的話),所以 channel 也是 infiniteInfiniteChannel 的第一個成員就這麼敲定下來的。

  1. type InfiniteChannel struct {
  2. data []interface{}
  3. }

使用者呼叫 In()Out() 時,可能是併發的環境,在 go 中如何進行併發程式設計,最容易想到的肯定是 channel 了,因此我們在內部準備兩個 channel,一個 inChan,一個 outChan,用 inChan 來接收資料,用 outChan 來流出資料。

  1. type InfiniteChannel struct {
  2. inChan chan interface{}
  3. outChan chan interface{}
  4. data []interface{}
  5. }
  6. func (c *InfiniteChannel) In(val interface{}) {
  7. c.inChan <- val
  8. }
  9. func (c *InfiniteChannel) Out() interface{} {
  10. return <-c.outChan
  11. }

其中, inChanoutChan 都是 unbuffered channel。

此外,也肯定是需要一個 select 來處理來自 inChanoutChan 身上的事件。因此我們另起一個協程,在裡面做 select 操作。

  1. func (c *InfiniteChannel) background() {
  2. for true {
  3. select {
  4. case newVal := <-c.inChan:
  5. c.data = append(c.data, newVal)
  6. case c.outChan <- c.pop(): // pop() 將取出佇列的首個元素
  7. }
  8. }
  9. }
  10. func NewInfiniteChannel() *InfiniteChannel {
  11. c := &InfiniteChannel{
  12. inChan: make(chan interface{}),
  13. outChan: make(chan interface{}),
  14. }
  15. go c.background() // 注意這裡另起了一個協程
  16. return c
  17. }

ps:感覺這也算是 go 併發程式設計的一個套路了。即

  1. 在 new struct 的時候,順手 go 一個 select 協程,select 協程內執行一個 for 迴圈,不停的 select,監聽一個或者多個 channel 的事件。
  2. struct 對外提供的 method,只會操作 struct 內的 channel(在本例中就是 inChan 和 outChan),不會操作 struct 內的其他資料(在本例中,In() 和 Out() 都沒有直接操作 data)。
  3. 觸發 channel 的事件後,由 select 協程進行資料的更新(在本例中就是 data )。因為只有 select 協程對除 channel 外的資料成員進行讀寫操作,且 go 保證了對於 channel 的併發讀寫是安全的,所以程式碼是併發安全的。
  4. 如果 struct 是 exported ,使用者或許會越過 new ,直接手動 make 一個 struct,可以考慮將 struct 設定為 unexported,把它的首字母小寫即可。

pop() 的實現也非常簡單。

  1. // 取出佇列的首個元素,如果佇列為空,將會返回一個 nil
  2. func (c *InfiniteChannel) pop() interface{} {
  3. if len(c.data) == 0 {
  4. return nil
  5. }
  6. val := c.data[0]
  7. c.data = c.data[1:]
  8. return val
  9. }
測試一下

用一個協程每秒鐘生產一條資料,另一個協程每半秒消費一條資料,並列印。

  1. func main() {
  2. c := NewInfiniteChannel()
  3. go func() {
  4. for i := 0; i < 20; i++ {
  5. c.In(i)
  6. time.Sleep(time.Second)
  7. }
  8. }()
  9. for i := 0; i < 50; i++ {
  10. val := c.Out()
  11. fmt.Print(val)
  12. time.Sleep(time.Millisecond * 500)
  13. }
  14. }
  1. // out
  2. <nil>0<nil>1<nil>23<nil>4<nil><nil>5<nil>67<nil><nil>89<nil><nil>1011<nil>12<nil>13<nil>14<nil>15<nil>16<nil>17<nil><nil>1819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil>
  3. Process finished with the exit code 0

可以看到,將 InfiniteChannel 內沒有資料可供消費時,呼叫 Out() 將會返回一個 nil,不過這也在我們的意料之中,原因是 pop() 在佇列為空時,將會返回 nil。

目前 InfiniteChannel 的行為與標準的 channel 的行為是有出入的,go 中的 channel,在沒有資料卻仍要取資料時會被阻塞,如何實現這個效果?

優化

我認為此處是是整篇文章最有技巧的地方,我第一次看到時忍不住拍案叫絕。

首先把原來的 background() 摘出來

  1. func (c *InfiniteChannel) background() {
  2. for true {
  3. select {
  4. case newVal := <-c.inChan:
  5. c.data = append(c.data, newVal)
  6. case c.outChan <- c.pop():
  7. }
  8. }
  9. }

outChan 進行一個簡單封裝

  1. func (c *InfiniteChannel) background() {
  2. for true {
  3. select {
  4. case newVal := <-c.inChan:
  5. c.data = append(c.data, newVal)
  6. case c.outChanWrapper() <- c.pop():
  7. }
  8. }
  9. }
  10. func (c *InfiniteChannel) outChanWrapper() chan interface{} {
  11. return c.outChan
  12. }

目前為止,一切照舊。

點睛之筆來了:

  1. func (c *InfiniteChannel) outChanWrapper() chan interface{} {
  2. if len(c.data) == 0 {
  3. return nil
  4. }
  5. return c.outChan
  6. }

c.data 為空的時候,返回一個 nil

background() 中,當執行到 case c.outChan <- c.pop(): 時,實際上將會變成:

  1. case nil <- nil:

go 中,是無法往一個 nilchannel 中傳送元素的。例如

  1. func main() {
  2. var c chan interface{}
  3. select {
  4. case c <- 1:
  5. }
  6. }
  7. // fatal error: all goroutines are asleep - deadlock!
  8. func main() {
  9. var c chan interface{}
  10. select {
  11. case c <- 1:
  12. default:
  13. fmt.Println("hello world")
  14. }
  15. }
  16. // hello world

因此,對於

  1. select {
  2. case newVal := <-c.inChan:
  3. c.data = append(c.data, newVal)
  4. case c.outChanWrapper() <- c.pop():
  5. }

將會一直阻塞在 select 那裡,直到 inChan 來了資料。

再測試一下
  1. 012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!

最後,程式 panic 了,因為死鎖了。

補充

實際上 channel 除了 In()Out() 外,還有一個行為,即 close(),如果 channel close 後,依舊從其中取元素的話,將會取出該型別的預設值。

  1. func main() {
  2. c := make(chan interface{})
  3. close(c)
  4. for true {
  5. v := <-c
  6. fmt.Println(v)
  7. time.Sleep(time.Second)
  8. }
  9. }
  10. // output
  11. // <nil>
  12. // <nil>
  13. // <nil>
  14. // <nil>
  15. func main() {
  16. c := make(chan interface{})
  17. close(c)
  18. for true {
  19. v, isOpen := <-c
  20. fmt.Println(v, isOpen)
  21. time.Sleep(time.Second)
  22. }
  23. }
  24. // output
  25. // <nil> false
  26. // <nil> false
  27. // <nil> false
  28. // <nil> false

我們也需要實現相同的效果。

  1. func (c *InfiniteChannel) Close() {
  2. close(c.inChan)
  3. }
  4. func (c *InfiniteChannel) background() {
  5. for true {
  6. select {
  7. case newVal, isOpen := <-c.inChan:
  8. if isOpen {
  9. c.data = append(c.data, newVal)
  10. } else {
  11. c.isOpen = false
  12. }
  13. case c.outChanWrapper() <- c.pop():
  14. }
  15. }
  16. }
  17. func NewInfiniteChannel() *InfiniteChannel {
  18. c := &InfiniteChannel{
  19. inChan: make(chan interface{}),
  20. outChan: make(chan interface{}),
  21. isOpen: true,
  22. }
  23. go c.background()
  24. return c
  25. }
  26. func (c *InfiniteChannel) outChanWrapper() chan interface{} {
  27. // 這裡添加了對 c.isOpen 的判斷
  28. if c.isOpen && len(c.data) == 0 {
  29. return nil
  30. }
  31. return c.outChan
  32. }
再測試一下
  1. func main() {
  2. c := NewInfiniteChannel()
  3. go func() {
  4. for i := 0; i < 20; i++ {
  5. c.In(i)
  6. time.Sleep(time.Second)
  7. }
  8. c.Close() // 這裡呼叫了 Close
  9. }()
  10. for i := 0; i < 50; i++ {
  11. val := c.Out()
  12. fmt.Print(val)
  13. time.Sleep(time.Millisecond * 500)
  14. }
  15. }
  1. // output
  2. 012345678910111213141516171819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil>
  3. Process finished with the exit code 0

符合預期

遺憾

目前看上去已經很完美了,但是和標準的 channel 相比,仍然有差距。因為標準的 channel 是有這種用法的

  1. v,isOpen := <- ch

可以通過 isOpen 變數來獲取 channel 的開閉情況。

因此 InfiniteChannel 也應該提供一個類似的 method

  1. func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) {
  2. // todo
  3. }

可惜的是,要想得知 InfiniteChannel 是否是 Open 的,就必定要訪問 InfiniteChannel 內的 isOpen 成員。

  1. type InfiniteChannel struct {
  2. inChan chan interface{}
  3. outChan chan interface{}
  4. data []interface{}
  5. isOpen bool
  6. }

isOpen 並非 channel 型別,根據之前的套路,這種非 channel 型別的成員只應該被 select 協程訪問。一旦有多個協程訪問,就會出現併發問題,除非加鎖。

我不能接受!所以乾脆不提供這個 method 了,嘿嘿。

完整程式碼
  1. func main() {
  2. c := NewInfiniteChannel()
  3. go func() {
  4. for i := 0; i < 20; i++ {
  5. c.In(i)
  6. time.Sleep(time.Second)
  7. }
  8. c.Close()
  9. }()
  10. for i := 0; i < 50; i++ {
  11. val := c.Out()
  12. fmt.Print(val)
  13. time.Sleep(time.Millisecond * 500)
  14. }
  15. }
  16. type InfiniteChannel struct {
  17. inChan chan interface{}
  18. outChan chan interface{}
  19. data []interface{}
  20. isOpen bool
  21. }
  22. func (c *InfiniteChannel) In(val interface{}) {
  23. c.inChan <- val
  24. }
  25. func (c *InfiniteChannel) Out() interface{} {
  26. return <-c.outChan
  27. }
  28. func (c *InfiniteChannel) Close() {
  29. close(c.inChan)
  30. }
  31. func (c *InfiniteChannel) background() {
  32. for true {
  33. select {
  34. case newVal, isOpen := <-c.inChan:
  35. if isOpen {
  36. c.data = append(c.data, newVal)
  37. } else {
  38. c.isOpen = false
  39. }
  40. case c.outChanWrapper() <- c.pop():
  41. }
  42. }
  43. }
  44. func NewInfiniteChannel() *InfiniteChannel {
  45. c := &InfiniteChannel{
  46. inChan: make(chan interface{}),
  47. outChan: make(chan interface{}),
  48. isOpen: true,
  49. }
  50. go c.background()
  51. return c
  52. }
  53. // 取出佇列的首個元素,如果佇列為空,將會返回一個 nil
  54. func (c *InfiniteChannel) pop() interface{} {
  55. if len(c.data) == 0 {
  56. return nil
  57. }
  58. val := c.data[0]
  59. c.data = c.data[1:]
  60. return val
  61. }
  62. func (c *InfiniteChannel) outChanWrapper() chan interface{} {
  63. if c.isOpen && len(c.data) == 0 {
  64. return nil
  65. }
  66. return c.outChan
  67. }
參考

https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd