1. 程式人生 > >Go語言併發與並行

Go語言併發與並行

首先,並行!=併發, 兩者是不同的

Go語言的goroutines、通道和死鎖

goroutine

Go語言中有個概念叫做goroutine, 這類似我們熟知的執行緒,但是更輕。
以下的程式,我們序列地去執行兩次loop函式:

func loop() {
    for i := 0; i < 10; i++ {
        fmt.Printf("%d ", i)
    }
}

func main() {
    loop()
    loop()
}

毫無疑問,輸出會是這樣的:

0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

下面我們把一個loop放在一個goroutine裡跑,我們可以使用關鍵字go來定義並啟動一個goroutine:

func main() {
    go loop() // 啟動一個goroutine
    loop()
}

這次的輸出變成了:

0 1 2 3 4 5 6 7 8 9

可是為什麼只輸出了一趟呢?明明我們主線跑了一趟,也開了一個goroutine來跑一趟啊。

原來,在goroutine還沒來得及跑loop的時候,主函式已經退出了。

main函式退出地太快了,我們要想辦法阻止它過早地退出,一個辦法是讓main等待一下:

func main() {
    go loop()
    loop()
    time.Sleep(time.Second) // 停頓一秒
}

這次確實輸出了兩趟,目的達到了。

可是採用等待的辦法並不好,如果goroutine在結束的時候,告訴下主線說“Hey, 我要跑完了!”就好了, 即所謂阻塞主線的辦法,回憶下我們Python裡面等待所有執行緒執行完畢的寫法:

for thread in threads:
    thread.join()

是的,我們也需要一個類似join的東西來阻塞住主線。那就是通道

通道

通道是什麼?簡單說,是goroutine之間互相通訊的東西。類似我們Unix上的管道(可以在程序間傳遞訊息), 用來goroutine之間發訊息和接收訊息。其實,就是在做goroutine之間的記憶體共享。

使用make來建立一個通道:

var channel chan int = make(chan int)
// 或
channel := make(chan int)

那如何向通道存訊息和取訊息呢? 一個例子:

func main() {
    var messages chan string = make(chan string)
    go func(message string) {
        messages <- message // 存訊息
    }("Ping!")

    fmt.Println(<-messages) // 取訊息
}

預設的,通道的存訊息和取訊息都是阻塞的 (叫做無緩衝的通道,不過緩衝這個概念稍後瞭解,先說阻塞的問題)。

也就是說, 無緩衝的通道在取訊息和存訊息的時候都會掛起當前的goroutine,除非另一端已經準備好。

比如以下的main函式和foo函式:

var ch chan int = make(chan int)

func foo() {
    ch <- 0  // 向ch中加資料,如果沒有其他goroutine來取走這個資料,那麼掛起foo, 直到main函式把0這個資料拿走
}

func main() {
    go foo()
    <- ch // 從ch取資料,如果ch中還沒放資料,那就掛起main線,直到foo函式中放資料為止
}

那既然通道可以阻塞當前的goroutine, 那麼回到上一部分「goroutine」所遇到的問題「如何讓goroutine告訴主線我執行完畢了」 的問題來, 使用一個通道來告訴主線即可:

var complete chan int = make(chan int)

func loop() {
    for i := 0; i < 10; i++ {
        fmt.Printf("%d ", i)
    }

    complete <- 0 // 執行完畢了,發個訊息
}

func main() {
    go loop()
    <- complete // 直到執行緒跑完, 取到訊息. main在此阻塞住
}

如果不用通道來阻塞主線的話,主線就會過早跑完,loop線都沒有機會執行、、、

其實,無緩衝的通道永遠不會儲存資料,只負責資料的流通,為什麼這麼講呢?

  • 從無緩衝通道取資料,必須要有資料流進來才可以,否則當前線阻塞

  • 資料流入無緩衝通道, 如果沒有其他goroutine來拿走這個資料,那麼當前線阻塞

所以,你可以測試下,無論如何,我們測試到的無緩衝通道的大小都是0 (len(channel))

如果通道正有資料在流動,我們還要加入資料,或者通道乾澀,我們一直向無資料流入的空通道取資料呢? 就會引起死鎖

死鎖

一個死鎖的例子:

func main() {
    ch := make(chan int)
    <- ch // 阻塞main goroutine, 通道c被鎖
}

執行這個程式你會看到Go報這樣的錯誤:

fatal error: all goroutines are asleep - deadlock!

何謂死鎖? 作業系統有講過的,所有的執行緒或程序都在等待資源的釋放。如上的程式中, 只有一個goroutine, 所以當你向裡面加資料或者存資料的話,都會鎖死通道, 並且阻塞當前 goroutine, 也就是所有的goroutine(其實就main線一個)都在等待通道的開放(沒人拿走資料通道是不會開放的),也就是死鎖咯。

我發現死鎖是一個很有意思的話題,這裡有幾個死鎖的例子:

1.只在單一的goroutine裡操作無緩衝通道,一定死鎖。比如你只在main函式裡操作通道:

func main() {
    ch := make(chan int)
    ch <- 1 // 1流入通道,堵塞當前線, 沒人取走資料通道不會開啟
    fmt.Println("This line code wont run") //在此行執行之前Go就會報死鎖
}

2.如下也是一個死鎖的例子:

var ch1 chan int = make(chan int)
var ch2 chan int = make(chan int)

func say(s string) {
    fmt.Println(s)
    ch1 <- <- ch2 // ch1 等待 ch2流出的資料
}

func main() {
    go say("hello")
    <- ch1  // 堵塞主線
}

其中主線等ch1中的資料流出,ch1等ch2的資料流出,但是ch2等待資料流入,兩個goroutine都在等,也就是死鎖。

其實,總結來看,為什麼會死鎖?非緩衝通道上如果發生了流入無流出,或者流出無流入,也就導致了死鎖。或者這樣理解 Go啟動的所有goroutine裡的非緩衝通道一定要一個線裡存資料,一個線裡取資料,要成對才行 。所以下面的示例一定死鎖:

c, quit := make(chan int), make(chan int)

go func() {
   c <- 1  // c通道的資料沒有被其他goroutine讀取走,堵塞當前goroutine
   quit <- 0 // quit始終沒有辦法寫入資料
}()

<- quit // quit 等待資料的寫

仔細分析的話,是由於:主線等待quit通道的資料流出,quit等待資料寫入,而func被c通道堵塞,所有goroutine都在等,所以死鎖。

簡單來看的話,一共兩個線,func線中流入c通道的資料並沒有在main線中流出,肯定死鎖。

但是,是否果真 所有不成對向通道存取資料的情況都是死鎖?

如下是個反例:

func main() {
    c := make(chan int)

    go func() {
       c <- 1
    }()
}

程式正常退出了,很簡單,並不是我們那個總結不起作用了,還是因為一個讓人很囧的原因,main又沒等待其它goroutine,自己先跑完了, 所以沒有資料流入c通道,一共執行了一個goroutine, 並且沒有發生阻塞,所以沒有死鎖錯誤。

那麼死鎖的解決辦法呢?

最簡單的,把沒取走的資料取走,沒放入的資料放入, 因為無緩衝通道不能承載資料,那麼就趕緊拿走!

具體來講,就死鎖例子3中的情況,可以這麼避免死鎖:

c, quit := make(chan int), make(chan int)

go func() {
    c <- 1
    quit <- 0
}()

<- c // 取走c的資料!
<-quit

另一個解決辦法是緩衝通道, 即設定c有一個數據的緩衝大小:

c := make(chan int, 1)

這樣的話,c可以快取一個數據。也就是說,放入一個數據,c並不會掛起當前線, 再放一個才會掛起當前線直到第一個資料被其他goroutine取走, 也就是隻阻塞在容量一定的時候,不達容量不阻塞。

這十分類似我們Python中的佇列Queue不是嗎?

無緩衝通道的資料進出順序

我們已經知道,無緩衝通道從不儲存資料,流入的資料必須要流出才可以。

觀察以下的程式:

var ch chan int = make(chan int)

func foo(id int) { //id: 這個routine的標號
    ch <- id
}

func main() {
    // 開啟5個routine
    for i := 0; i < 5; i++ {
        go foo(i)
    }

    // 取出通道中的資料
    for i := 0; i < 5; i++ {
        fmt.Print(<- ch)
    }
}

我們開了5個goroutine,然後又依次取資料。其實整個的執行過程細分的話,5個線的資料 依次流過通道ch, main列印之, 而巨集觀上我們看到的即 無緩衝通道的資料是先到先出,但是 無緩衝通道並不儲存資料,只負責資料的流通

緩衝通道

終於到了這個話題了, 其實快取通道用英文來講更為達意: buffered channel.

緩衝這個詞意思是,緩衝通道不僅可以流通資料,還可以快取資料。它是有容量的,存入一個數據的話 , 可以先放在信道里,不必阻塞當前線而等待該資料取走。

當緩衝通道達到滿的狀態的時候,就會表現出阻塞了,因為這時再也不能承載更多的資料了,「你們必須把 資料拿走,才可以流入資料」。

在宣告一個通道的時候,我們給make以第二個引數來指明它的容量(預設為0,即無緩衝):

var ch chan int = make(chan int, 2) // 寫入2個元素都不會阻塞當前goroutine, 儲存個數達到2的時候會阻塞

如下的例子,緩衝通道ch可以無緩衝的流入3個元素:

func main() {
    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3
}

如果你再試圖流入一個數據的話,通道ch會阻塞main線, 報死鎖。

也就是說,緩衝通道會在滿容量的時候加鎖。

其實,緩衝通道是先進先出的,我們可以把緩衝通道看作為一個執行緒安全的佇列:

func main() {
    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3

    fmt.Println(<-ch) // 1
    fmt.Println(<-ch) // 2
    fmt.Println(<-ch) // 3
}

通道資料讀取和通道關閉

你也許發現,上面的程式碼一個一個地去讀取通道簡直太費事了,Go語言允許我們使用range來讀取通道:

func main() {
    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3

    for v := range ch {
        fmt.Println(v)
    }
}

如果你執行了上面的程式碼,會報死鎖錯誤的,原因是range不等到通道關閉是不會結束讀取的。也就是如果 緩衝通道乾涸了,那麼range就會阻塞當前goroutine, 所以死鎖咯。

那麼,我們試著避免這種情況,比較容易想到的是讀到通道為空的時候就結束讀取:

ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
for v := range ch {
    fmt.Println(v)
    if len(ch) <= 0 { // 如果現有資料量為0,跳出迴圈
        break
    }
}

以上的方法是可以正常輸出的,但是注意檢查通道大小的方法不能在通道存取都在發生的時候用於取出所有資料,這個例子 是因為我們只在ch中存了資料,現在一個一個往外取,通道大小是遞減的。

另一個方式是顯式地關閉通道:

ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3

// 顯式地關閉通道
close(ch)

for v := range ch {
    fmt.Println(v)
}

被關閉的通道會禁止資料流入, 是隻讀的。我們仍然可以從關閉的通道中取出資料,但是不能再寫入資料了。

等待多gorountine的方案

那好,我們回到最初的一個問題,使用通道堵塞主線,等待開出去的所有goroutine跑完。

這是一個模型,開出很多小goroutine, 它們各自跑各自的,最後跑完了向主線報告。

我們討論如下2個版本的方案:

  1. 只使用單個無緩衝通道阻塞主線

  2. 使用容量為goroutines數量的緩衝通道

對於方案1, 示例的程式碼大概會是這個樣子:

var quit chan int // 只開一個通道

func foo(id int) {
    fmt.Println(id)
    quit <- 0 // ok, finished
}

func main() {
    count := 1000
    quit = make(chan int) // 無緩衝

    for i := 0; i < count; i++ {
        go foo(i)
    }

    for i := 0; i < count; i++ {
        <- quit
    }
}

對於方案2, 把通道換成緩衝1000的:

quit = make(chan int, count) // 容量1000

其實區別僅僅在於一個是緩衝的,一個是非緩衝的。

對於這個場景而言,兩者都能完成任務, 都是可以的。

  • 無緩衝的通道是一批資料一個一個的「流進流出」

  • 緩衝通道則是一個一個儲存,然後一起流出去

Go語言的併發和並行

不知道你有沒有注意到一個現象,還是這段程式碼,如果我跑在兩個goroutines裡面的話:

var quit chan int = make(chan int)

func loop() {
    for i := 0; i < 10; i++ {
        fmt.Printf("%d ", i)
    }
    quit <- 0
}

func main() {
    // 開兩個goroutine跑函式loop, loop函式負責列印10個數
    go loop()
    go loop()

    for i := 0; i < 2; i++ {
        <- quit
    }
}

我們觀察下輸出:

0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

這是不是有什麼問題??

以前我們用執行緒去做類似任務的時候,系統的執行緒會搶佔式地輸出, 表現出來的是亂序地輸出。而goroutine為什麼是這樣輸出的呢?

goroutine是在並行嗎?

我們找個例子測試下:

package main

import "fmt"
import "time"

var quit chan int

func foo(id int) {
    fmt.Println(id)
    time.Sleep(time.Second) // 停頓一秒
    quit <- 0 // 發訊息:我執行完啦!
}

func main() {
    count := 1000
    quit = make(chan int, count) // 緩衝1000個數據

    for i := 0; i < count; i++ { //開1000個goroutine
        go foo(i)
    }

    for i :=0 ; i < count; i++ { // 等待所有完成訊息傳送完畢。
        <- quit
    }
}

讓我們跑一下這個程式(之所以先編譯再執行,是為了讓程式跑的儘量快,測試結果更好):

go build test.go
time ./test
./test  0.01s user 0.01s system 1% cpu 1.016 total

我們看到,總計用時接近一秒。 貌似並行了!

我們看到,總計用時接近一秒。 貌似並行了!

並行和併發

從概念上講,併發和並行是不同的, 簡單來說看這個圖片
併發和並行

  • 兩個佇列,一個Coffee機器,那是併發
  • 兩個佇列,兩個Coffee機器,那是並行

更多的資料: 併發不是並行, 當然Google上有更多關於並行和併發的區別。

那麼回到一開始的疑問上,從上面的兩個例子執行後的表現來看,多個goroutine跑loop函式會挨個goroutine去進行,而sleep則是一起執行的。

這是為什麼?

預設地, Go所有的goroutines只能在一個執行緒裡跑 。

也就是說, 以上兩個程式碼都不是並行的,但是都是是併發的。

如果當前goroutine不發生阻塞,它是不會讓出CPU給其他goroutine的, 所以例子一中的輸出會是一個一個goroutine進行的,而sleep函式則阻塞掉了 當前goroutine, 當前goroutine主動讓其他goroutine執行, 所以形成了邏輯上的並行, 也就是併發。

真正的並行

為了達到真正的並行,我們需要告訴Go我們允許同時最多使用多個核。

回到起初的例子,我們設定最大開2個原生執行緒, 我們需要用到runtime包(runtime包是goroutine的排程器):

import (
    "fmt"
    "runtime"
)

var quit chan int = make(chan int)

func loop() {
    for i := 0; i < 100; i++ { //為了觀察,跑多些
        fmt.Printf("%d ", i)
    }
    quit <- 0
}

func main() {
    runtime.GOMAXPROCS(2) // 最多使用2個核

    go loop()
    go loop()

    for i := 0; i < 2; i++ {
        <- quit
    }
}

這下會看到兩個goroutine會搶佔式地輸出資料了。

我們還可以這樣顯式地讓出CPU時間:

func loop() {
    for i := 0; i < 10; i++ {
        runtime.Gosched() // 顯式地讓出CPU時間給其他goroutine
        fmt.Printf("%d ", i)
    }
    quit <- 0
}

func main() {

    go loop()
    go loop()

    for i := 0; i < 2; i++ {
        <- quit
    }
}

觀察下結果會看到這樣有規律的輸出:

0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9

其實,這種主動讓出CPU時間的方式仍然是在單核裡跑。但手工地切換goroutine導致了看上去的“並行”。

其實作為一個Python程式設計師,goroutine讓我更多地想到的是gevent的協程,而不是原生執行緒。

一個小問題

題目說,如下的程式,按照理解應該列印下5次 "world"呀,可是為什麼什麼也沒有列印

package main

import (
    "fmt"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        fmt.Println(s)
    }
}

func main() {
    go say("world") //開一個新的Goroutines執行
    for {
    }
}

樓下的答案已經很棒了,這裡Go仍然在使用單核,for死迴圈佔據了單核CPU所有的資源,而main線和say兩個goroutine都在一個執行緒裡面, 所以say沒有機會執行。解決方案還是兩個:

  • 允許Go使用多核(runtime.GOMAXPROCS)
  • 手動顯式調動(runtime.Gosched)

runtime排程器

runtime排程器是個很神奇的東西,但是我真是但願它不存在,我希望顯式排程能更為自然些,多核處理預設開啟。

關於runtime包幾個函式:

  • Gosched 讓出cpu
  • NumCPU 返回當前系統的CPU核數量
  • GOMAXPROCS 設定最大的可同時使用的CPU核數
  • Goexit 退出當前goroutine(但是defer語句會照常執行)

總結

我們從例子中可以看到,預設的, 所有goroutine會在一個原生執行緒裡跑,也就是隻使用了一個CPU核。

在同一個原生執行緒裡,如果當前goroutine不發生阻塞,它是不會讓出CPU時間給其他同線程的goroutines的,這是Go執行時對goroutine的排程,我們也可以使用runtime包來手工排程。

本文開頭的兩個例子都是限制在單核CPU裡執行的,所有的goroutines跑在一個執行緒裡面,分析如下:

  • 對於程式碼例子一(loop函式的那個),每個goroutine沒有發生堵塞(直到quit流入資料), 所以在quit之前每個goroutine不會主動讓出CPU,也就發生了序列列印
  • 對於程式碼例子二(time的那個),每個goroutine在sleep被呼叫的時候會阻塞,讓出CPU, 所以例子二併發執行。

那麼關於我們開啟多核的時候呢?Go語言對goroutine的排程行為又是怎麼樣的?

我們可以在Golang官方網站的這裡 找到一句話:

When a coroutine blocks, such as by calling a blocking system call, the run-time automatically moves other coroutines on the same operating system thread to a different, runnable thread so they won’t be blocked.

也就是說:

當一個goroutine發生阻塞,Go會自動地把與該goroutine處於同一系統執行緒的其他goroutines轉移到另一個系統執行緒上去,以使這些goroutines不阻塞

開啟多核的實驗

仍然需要做一個實驗,來測試下多核支援下goroutines的對原生執行緒的分配, 也驗證下我們所得到的結論“goroutine不阻塞不放開CPU”。

實驗程式碼如下:

package main

import (
    "fmt"
    "runtime"
)

var quit chan int = make(chan int)

func loop(id int) { // id: 該goroutine的標號
    for i := 0; i < 10; i++ { //列印10次該goroutine的標號
        fmt.Printf("%d ", id)
    }
    quit <- 0
}

func main() {
    runtime.GOMAXPROCS(2) // 最多同時使用2個核

    for i := 0; i < 3; i++ { //開三個goroutine
        go loop(i)
    }

    for i := 0; i < 3; i++ {
        <- quit
    }
}

多跑幾次會看到類似這些輸出(不同機器環境不一樣):

0 0 0 0 0 1 1 0 0 1 0 0 1 0 1 2 1 2 1 2 1 2 1 2 1 2 2 2 2 2
0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2
0 0 0 0 0 0 0 1 1 1 1 1 0 1 0 1 0 1 2 1 2 1 2 2 2 2 2 2 2 2
0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 0 2 0 2 0 2 2 2 2 2 2 2 2
0 0 0 0 0 0 0 1 0 0 1 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 2 2

執行它我們會發現以下現象:

  • 有時會發生搶佔式輸出(說明Go開了不止一個原生執行緒,達到了真正的並行)
  • 有時會順序輸出, 列印完0再列印1, 再列印2(說明Go開一個原生執行緒,單執行緒上的goroutine不阻塞不鬆開CPU)

那麼,我們還會觀察到一個現象,無論是搶佔地輸出還是順序的輸出,都會有那麼兩個數字表現出這樣的現象:

  • 一個數字的所有輸出都會在另一個數字的所有輸出之前

原因是, 3個goroutine分配到至多2個執行緒上,就會至少兩個goroutine分配到同一個執行緒裡,單執行緒裡的goroutine 不阻塞不放開CPU, 也就發生了順序輸出。

Go語言併發的設計模式和應用場景

生成器

在Python中我們可以使用yield關鍵字來讓一個函式成為生成器,在Go中我們可以使用通道來製造生成器(一種lazy load類似的東西)。

當然我們的通道並不是簡單的做阻塞主線的功能來使用的哦。

下面是一個製作自增整數生成器的例子,直到主線向通道索要資料,我們才新增資料到通道

func xrange() chan int{ // xrange用來生成自增的整數
    var ch chan int = make(chan int)

    go func() { // 開出一個goroutine
        for i := 0; ; i++ {
            ch <- i  // 直到通道索要資料,才把i新增進通道
        }
    }()

    return ch
}

func main() {

    generator := xrange()

    for i:=0; i < 1000; i++ {  // 我們生成1000個自增的整數!
        fmt.Println(<-generator)
    }
}

這不禁叫我想起了Python中可愛的xrange, 所以給了生成器這個名字!

服務化

比如我們載入一個網站的時候,例如我們登入新浪微博,我們的訊息資料應該來自一個獨立的服務,這個服務只負責 返回某個使用者的新的訊息提醒。

如下是一個使用示例:

func get_notification(user string) chan string{
   /*
    * 此處可以查詢資料庫獲取新訊息等等..
    */
    notifications := make(chan string)

    go func() { // 懸掛一個信道出去
        notifications <- fmt.Sprintf("Hi %s, welcome to weibo.com!", user)
    }()

    return notifications
}

func main() {
    jack := get_notification("jack") //  獲取jack的訊息
    joe := get_notification("joe") // 獲取joe的訊息

    // 獲取訊息的返回
    fmt.Println(<-jack)
    fmt.Println(<-joe)
}

多路複合

上面的例子都使用一個通道作為返回值,可以把通道的資料合併到一個通道的。 不過這樣的話,我們需要按順序輸出我們的返回值(先進先出)。

如下,我們假設要計算很複雜的一個運算 100-x , 分為三路計算, 最後統一在一個通道中取出結果:

func do_stuff(x int) int { // 一個比較耗時的事情,比如計算
    time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) //模擬計算
    return 100 - x // 假如100-x是一個很費時的計算
}

func branch(x int) chan int{ // 每個分支開出一個goroutine做計算並把計算結果流入各自通道
    ch := make(chan int)
    go func() {
        ch <- do_stuff(x)
    }()
    return ch
}

func fanIn(chs... chan int) chan int {
    ch := make(chan int)

    for _, c := range chs {
        // 注意此處明確傳值
        go func(c chan int) {ch <- <- c}(c) // 複合
    }

    return ch
}

func main() {
    result := fanIn(branch(1), branch(2), branch(3))

    for i := 0; i < 3; i++ {
        fmt.Println(<-result)
    }
}

select監聽通道

Go有一個語句叫做select,用於監測各個通道的資料流動。

如下的程式是select的一個使用例子,我們監視三個通道的資料流出並收集資料到一個通道中。

func foo(i int) chan int {
    c := make(chan int)
    go func () { c <- i }()
    return c
}

func main() {
    c1, c2, c3 := foo(1), foo(2), foo(3)

    c := make(chan int)

    go func() { // 開一個goroutine監視各個通道資料輸出並收集資料到通道c
        for {
            select { // 監視c1, c2, c3的流出,並全部流入通道c
            case v1 := <- c1: c <- v1
            case v2 := <- c2: c <- v2
            case v3 := <- c3: c <- v3
            }
        }
    }()

    // 阻塞主線,取出通道c的資料
    for i := 0; i < 3; i++ {
        fmt.Println(<-c) // 從列印來看我們的資料輸出並不是嚴格的1,2,3順序
    }
}

有了select, 我們在 多路複合中的示例程式碼中的函式fanIn還可以這麼來寫(這樣就不用開好幾個goroutine來取資料了):

func fanIn(branches ... chan int) chan int {
    c := make(chan int)

    go func() {
        for i := 0 ; i < len(branches); i++ { //select會嘗試著依次取出各個通道的資料
            select {
            case v1 := <- branches[i]: c <- v1
            }
        }
    }()

    return c
}

使用select的時候,有時需要超時處理, 其中的timeout通道相當有趣:

timeout := time.After(1 * time.Second) // timeout 是一個計時通道, 如果達到時間了,就會發一個訊號出來

for is_timeout := false; !is_timeout; {
    select { // 監視通道c1, c2, c3, timeout通道的資料流出
    case v1 := <- c1: fmt.Printf("received %d from c1", v1)
    case v2 := <- c2: fmt.Printf("received %d from c2", v2)
    case v3 := <- c3: fmt.Printf("received %d from c3", v3)
    case <- timeout: is_timeout = true // 超時
    }
}

結束標誌

在Go併發與並行筆記一我們已經講過通道的一個很重要也很平常的應用,就是使用無緩衝通道來阻塞主線,等待goroutine結束。

這樣我們不必再使用timeout。

那麼對上面的timeout來結束主線的方案作個更新:

func main() {

    c, quit := make(chan int), make(chan int)

    go func() {
        c <- 2  // 新增資料
        quit <- 1 // 傳送完成訊號
    } ()

    for is_quit := false; !is_quit; {
        select { // 監視通道c的資料流出
        case v := <-c: fmt.Printf("received %d from c", v)
        case <-quit: is_quit = true // quit通道有輸出,關閉for迴圈
        }
    }
}

菊花鏈

簡單地來說,資料從一端流入,從另一端流出,看上去好像一個連結串列,不知道為什麼要取這麼個尷尬的名字。。

菊花鏈的英文名字叫做: Daisy-chain, 它的一個應用就是做過濾器,比如我們來篩下100以內的素數(你需要先知道什麼是篩法)

程式有詳細的註釋,不再說明了。

/*
 *  利用通道菊花鏈篩法求某一個整數範圍的素數
 *  篩法求素數的基本思想是:把從1開始的、某一範圍內的正整數從小到大順序排列,
 *  1不是素數,首先把它篩掉。剩下的數中選擇最小的數是素數,然後去掉它的倍數。
 *  依次類推,直到篩子為空時結束
 */
package main

import "fmt"

func xrange() chan int{ // 從2開始自增的整數生成器
    var ch chan int = make(chan int)

    go func() { // 開出一個goroutine
        for i := 2; ; i++ {
            ch <- i  // 直到通道索要資料,才把i新增進通道
        }
    }()

    return ch
}


func filter(in chan int, number int) chan int {
    // 輸入一個整數佇列,篩出是number倍數的, 不是number的倍數的放入輸出佇列
    // in:  輸入佇列
    out := make(chan int)

    go func() {
        for {
            i := <- in // 從輸入中取一個

            if i % number != 0 {
                out <- i // 放入輸出通道
            }
        }
    }()

    return out
}


func main() {
    const max = 100 // 找出100以內的所有素數
    nums := xrange() // 初始化一個整數生成器
    number := <-nums  // 從生成器中抓一個整數(2), 作為初始化整數

    for number <= max { // number作為篩子,當篩子超過max的時候結束篩選
        fmt.Println(number) // 列印素數, 篩子即一個素數
        nums = filter(nums, number) //篩掉number的倍數
        number = <- nums  // 更新篩子
    }
}

隨機數生成器

通道可以做生成器使用,作為一個特殊的例子,它還可以用作隨機數生成器。如下是一個隨機01生成器:

func rand01() chan int {
    ch := make(chan int)

    go func () {
        for {
            select { //select會嘗試執行各個case, 如果都可以執行,那麼隨機選一個執行
            case ch <- 0:
            case ch <- 1:
            }
        }
    }()

    return ch
}

func main() {
    generator := rand01() //初始化一個01隨機生成器

    //測試,列印10個隨機01
    for i := 0; i < 10; i++ {
        fmt.Println(<-generator)
    }
}

定時器

我們剛才其實已經接觸了通道作為定時器, time包裡的After會製作一個定時器。

看看我們的定時器吧!

/*
 * 利用通道做定時器
 */

package main

import (
    "fmt"
    "time"
)

func timer(duration time.Duration) chan bool {
    ch := make(chan bool)

    go func() {
        time.Sleep(duration)
        ch <- true // 到時間啦!
    }()

    return ch
}

func main() {
    timeout := timer(time.Second) // 定時1s

    for {
        select {
        case <- timeout:
            fmt.Println("already 1s!") // 到時間
            return  //結束程式
        }
    }
}

TODO

Google的應用場景例子。

本篇主要總結了使用通道, goroutine的一些設計模式。