1. 程式人生 > >3種優雅的Go channel用法

3種優雅的Go channel用法

寫Go的人應該都聽過Rob Pike的這句話

Do not communicate by sharing memory; instead, share memory by communicating.

相信很多朋友和我一樣,在實際應用中總感覺不到好處,為了用channel而用。但以我的切身體會來說,這是寫程式碼時碰到的場景不復雜、對channel不熟悉導致的,所以希望這篇文章能給大家帶來點新思路,對Golang優雅的channel有更深的認識 :)

Fan In/Out

資料的輸出有時候需要做扇出/入(Fan In/Out),但是在函式中呼叫常常得修改介面,而且上下游對於資料的依賴程度非常高,所以一般使用通過channel進行Fan In/Out,這樣就可以輕易實現類似於shell裡的管道。

func fanIn(input1, input2 <-chan string) <-chan string {
   c := make(chan string)
   go func() {
       for {
           select {
           case s := <-input1:  c <- s
           case s := <-input2:  c <- s
           }
       }
   }()
   return c
}

同步Goroutine

兩個goroutine之間同步狀態,例如A goroutine需要讓B goroutine退出,一般做法如下:

func main() {
   g = make(chan int)
   quit = make(chan bool)
   go B()
   for i := 0; i < 3; i++ {
       g <- i
   }
   quit <- true // 沒辦法等待B的退出只能Sleep
   fmt.Println("Main quit")
}

func B() {
   for {
       select {
       case i := <-g:
           fmt.Println(i + 1)
       case <-quit:
           fmt.Println("B quit")
           return
       }
   }
}

/*
Output:
1
2
3
Main quit
*/

可是了main函式沒辦法等待B合適地退出,所以B quit 沒辦法列印,程式直接退出了。然而,chan是Go裡的第一物件,所以可以把chan傳入chan中,所以上面的程式碼可以把quit 定義為chan chan bool,以此控制兩個goroutine的同步

func main() {
   g = make(chan int)
   quit = make(chan chan bool)
   go B()
   for i := 0; i < 5; i++ {
       g <- i
   }
   wait := make(chan bool)
   quit <- wait
   <-wait //這樣就可以等待B的退出了
   fmt.Println("Main Quit")
}

func B() {
   for {
       select {
       case i := <-g:
           fmt.Println(i + 1)
       case c := <-quit:
           c <- true
           fmt.Println("B Quit")
           return
       }
   }
}

/* Output
1
2
3
B Quit
Main Quit
*/

分散式遞迴呼叫

在現實生活中,如果你要找美國總統聊天,你會怎麼做?第一步打電話給在美國的朋友,然後他們也會發動自己的關係網,再找可能認識美國總統的人,以此類推,直到找到為止。這在Kadmelia分散式系統中也是一樣的,如果需要獲取目標ID資訊,那麼就不停地查詢,被查詢節點就算沒有相關資訊,也會返回它覺得最近節點,直到找到ID或者等待超時。 好了,這個要用Go來實現怎麼做呢?

func recursiveCall(ctx context.Context, id []byte, initialNodes []*node){
	seen := map[string]*node{} //已見過的節點記錄
	request := make(chan *node, 3) //設定請求節點channel

        // 輸入初始節點
	go func() {
		for _, n := range initialNodes {
			request <- n
		}
	}()

OUT:
	for {
               //迴圈直到找到資料
		if data != nil {
		    return
		}
                // 在新的請求,超時和上層取消請求中select
		select {
		case n := <-request:
			go func() {
                                // 傳送新的請求
				response := s.sendQuery(ctx, n, MethodFindValue, id)
				select {
				case <-ctx.Done():
				case msg :=<-response:
                                    seen[responseToNode(response)] = n //更新已見過的節點資訊
                                                // 載入新的節點
						for _, rn := range LoadNodeInfoFromByte(msg[PayLoadStart:]) {
							mu.Lock()
							_, ok := seen[rn.HexID()]
							mu.Unlock()
                                                        // 見過了,跳過這個節點
							if ok { 
 								continue
							}
							AddNode(rn)
                                                        // 將新的節點送入channel
							request <- rn
						}
					}
				}
			}()
		case <-time.After(500 * time.Millisecond):
			break OUT // break至外層,否則僅僅是跳至loop外
        	case <-ctx.Done():
			break OUT
		}
	}
	return
}

這時的buffered channel類似於一個區域性queue,對需要的節點進行處理,但這段程式碼的精妙之處在於,這裡的block操作是select的,隨時可以取消,而不是要等待或者對queue的長度有認識。

你對這三種channel的用法有什麼疑問,歡迎討論╮(╯▽╰)╭