1. 程式人生 > >Go 系列教程-6 多執行緒 併發

Go 系列教程-6 多執行緒 併發

Go 系列教程 —— 20. 併發入門

Go 是併發式語言,而不是並行式語言。在討論 Go 如何處理併發之前,我們必須理解何為併發,以及併發與並行的區別。

併發是什麼?

併發是指立即處理多個任務的能力。一個例子就能很好地說明這一點。

我們可以想象一個人正在跑步。假如在他晨跑時,鞋帶突然鬆了。於是他停下來,系一下鞋帶,接下來繼續跑。這個例子就是典型的併發。這個人能夠一下搞定跑步和繫鞋帶兩件事,即立即處理多個任務。

並行是什麼?並行和併發有何區別?

並行是指同時處理多個任務。這聽起來和併發差不多,但其實完全不同。

我們同樣用這個跑步的例子來幫助理解。假如這個人在慢跑時,還在用他的 iPod 聽著音樂。在這裡,他是在跑步的同時聽音樂,也就是同時處理多個任務。這稱之為並行。

從技術上看併發和並行

通過現實中的例子,我們已經明白了什麼是併發,以及併發與並行的區別。作為一名極客,我們接下來從技術的角度來考察併發和並行。:)

假如我們正在編寫一個 web 瀏覽器。這個 web 瀏覽器有各種元件。其中兩個分別是 web 頁面的渲染區和從網上下載檔案的下載器。假設我們已經構建好了瀏覽器程式碼,各個元件也都可以相互獨立地執行(通過像 Java 裡的執行緒,或者通過即將介紹的 Go 語言中的 Go 協程來實現)。當瀏覽器在單核處理器中執行時,處理器會在瀏覽器的兩個元件間進行上下文切換。它可能在一段時間內下載檔案,轉而又對使用者請求的 web 頁面進行渲染。這就是併發。併發的程序從不同的時間點開始,分別交替執行。在這裡,就是在不同的時間點開始進行下載和渲染,並相互交替執行的。

如果該瀏覽器在一個多核處理器上執行,此時下載檔案的元件和渲染 HTML 的元件可能會在不同的核上同時執行。這稱之為並行。

image

並行不一定會加快執行速度,因為並行執行的元件之間可能需要相互通訊。在我們瀏覽器的例子裡,當檔案下載完成後,應當對使用者進行提醒,比如彈出一個視窗。於是,在負責下載的元件和負責渲染使用者介面的元件之間,就產生了通訊。在併發系統上,這種通訊開銷很小。但在多核的並行系統上,元件間的通訊開銷就很高了。所以,並行不一定會加快執行速度!

Go 對併發的支援

Go 程式語言原生支援併發。Go 使用 Go 協程(Goroutine) 和通道(Channel)來處理併發。

 

Go 系列教程 —— 21. Go 協程

在前面的教程裡,我們探討了併發,以及併發與並行的區別。本教程則會介紹在 Go 語言裡,如何使用 Go 協程(Goroutine)來實現併發。

Go 協程是什麼?

Go 協程是與其他函式或方法一起併發執行的函式或方法。Go 協程可以看作是輕量級執行緒。與執行緒相比,建立一個 Go 協程的成本很小。因此在 Go 應用中,常常會看到有數以千計的 Go 協程併發地執行。

Go 協程相比於執行緒的優勢

  • 相比執行緒而言,Go 協程的成本極低。堆疊大小隻有若干 kb,並且可以根據應用的需求進行增減。而執行緒必須指定堆疊的大小,其堆疊是固定不變的。
  • Go 協程會複用(Multiplex)數量更少的 OS 執行緒。即使程式有數以千計的 Go 協程,也可能只有一個執行緒。如果該執行緒中的某一 Go 協程發生了阻塞(比如說等待使用者輸入),那麼系統會再建立一個 OS 執行緒,並把其餘 Go 協程都移動到這個新的 OS 執行緒。所有這一切都在執行時進行,作為程式設計師,我們沒有直接面臨這些複雜的細節,而是有一個簡潔的 API 來處理併發。
  • Go 協程使用通道(Channel)來進行通訊。通道用於防止多個協程訪問共享記憶體時發生競態條件(Race Condition)。通道可以看作是 Go 協程之間通訊的管道。我們會在下一教程詳細討論通道。

如何啟動一個 Go 協程?

呼叫函式或者方法時,在前面加上關鍵字 go,可以讓一個新的 Go 協程併發地執行。

讓我們建立一個 Go 協程吧。

package main

import (
    "fmt"
)

func hello() {
    fmt.Println("Hello world goroutine")
}
func main() {
    go hello()
    fmt.Println("main function")
}

線上執行程式

在第 11 行,go hello() 啟動了一個新的 Go 協程。現在 hello() 函式與 main() 函式會併發地執行。主函式會執行在一個特有的 Go 協程上,它稱為 Go 主協程(Main Goroutine)。

執行一下程式,你會很驚訝!

該程式只會輸出文字 main function。我們啟動的 Go 協程究竟出現了什麼問題?要理解這一切,我們需要理解兩個 Go 協程的主要性質。

  • 啟動一個新的協程時,協程的呼叫會立即返回。與函式不同,程式控制不會去等待 Go 協程執行完畢。在呼叫 Go 協程之後,程式控制會立即返回到程式碼的下一行,忽略該協程的任何返回值。
  • 如果希望執行其他 Go 協程,Go 主協程必須繼續執行著。如果 Go 主協程終止,則程式終止,於是其他 Go 協程也不會繼續執行。

現在你應該能夠理解,為何我們的 Go 協程沒有運行了吧。在第 11 行呼叫了 go hello() 之後,程式控制沒有等待 hello 協程結束,立即返回到了程式碼下一行,列印 main function。接著由於沒有其他可執行的程式碼,Go 主協程終止,於是 hello 協程就沒有機會運行了。

我們現在修復這個問題。

package main

import (  
    "fmt"
    "time"
)

func hello() {  
    fmt.Println("Hello world goroutine")
}
func main() {  
    go hello()
    time.Sleep(1 * time.Second)
    fmt.Println("main function")
}

線上執行程式

在上面程式的第 13 行,我們呼叫了 time 包裡的函式 Sleep,該函式會休眠執行它的 Go 協程。在這裡,我們使 Go 主協程休眠了 1 秒。因此在主協程終止之前,呼叫 go hello() 就有足夠的時間來執行了。該程式首先列印 Hello world goroutine,等待 1 秒鐘之後,接著列印 main function

在 Go 主協程中使用休眠,以便等待其他協程執行完畢,這種方法只是用於理解 Go 協程如何工作的技巧。通道可用於在其他協程結束執行之前,阻塞 Go 主協程。我們會在下一教程中討論通道。

啟動多個 Go 協程

為了更好地理解 Go 協程,我們再編寫一個程式,啟動多個 Go 協程。

package main

import (  
    "fmt"
    "time"
)

func numbers() {  
    for i := 1; i <= 5; i++ {
        time.Sleep(250 * time.Millisecond)
        fmt.Printf("%d ", i)
    }
}
func alphabets() {  
    for i := 'a'; i <= 'e'; i++ {
        time.Sleep(400 * time.Millisecond)
        fmt.Printf("%c ", i)
    }
}
func main() {  
    go numbers()
    go alphabets()
    time.Sleep(3000 * time.Millisecond)
    fmt.Println("main terminated")
}

線上執行程式

在上面程式中的第 21 行和第 22 行,啟動了兩個 Go 協程。現在,這兩個協程併發地執行。numbers 協程首先休眠 250 微秒,接著列印 1,然後再次休眠,列印 2,依此類推,一直到列印 5 結束。alphabete 協程同樣列印從 a 到 e 的字母,並且每次有 400 微秒的休眠時間。 Go 主協程啟動了 numbers 和 alphabete 兩個 Go 協程,休眠了 3000 微秒後終止程式。

該程式會輸出:

1 a 2 3 b 4 c 5 d e main terminated

程式的運作如下圖所示。為了更好地觀看圖片,請在新標籤頁中開啟。

image

第一張藍色的圖表示 numbers 協程,第二張褐紅色的圖表示 alphabets 協程,第三張綠色的圖表示 Go 主協程,而最後一張黑色的圖把以上三種協程合併了,表明程式是如何執行的。在每個方框頂部,諸如 0 ms 和 250 ms 這樣的字串表示時間(以微秒為單位)。在每個方框的底部,123 等表示輸出。藍色方框表示:250 ms 打印出 1500 ms 打印出 2,依此類推。最後黑色方框的底部的值會是 1 a 2 3 b 4 c 5 d e main terminated,這同樣也是整個程式的輸出。以上圖片非常直觀,你可以用它來理解程式是如何運作的。

 

 

Go 系列教程 —— 22. 通道(channel)

上一教程裡,我們探討了如何使用 Go 協程(Goroutine)來實現併發。我們接著在本教程裡學習通道(Channel),學習如何通過通道來實現 Go 協程間的通訊。

什麼是通道?

通道可以想像成 Go 協程之間通訊的管道。如同管道中的水會從一端流到另一端,通過使用通道,資料也可以從一端傳送,在另一端接收。

通道的宣告

所有通道都關聯了一個型別。通道只能運輸這種型別的資料,而運輸其他型別的資料都是非法的。

chan T 表示 T 型別的通道。

通道的零值為 nil。通道的零值沒有什麼用,應該像對 map 和切片所做的那樣,用 make 來定義通道。

下面編寫程式碼,宣告一個通道。

package main

import "fmt"

func main() {  
    var a chan int
    if a == nil {
        fmt.Println("channel a is nil, going to define it")
        a = make(chan int)
        fmt.Printf("Type of a is %T", a)
    }
}

線上執行程式

由於通道的零值為 nil,在第 6 行,通道 a 的值就是 nil。於是,程式執行了 if 語句內的語句,定義了通道 a。程式中 a 是一個 int 型別的通道。該程式會輸出:

channel a is nil, going to define it  
Type of a is chan int

簡短宣告通常也是一種定義通道的簡潔有效的方法。

a := make(chan int)

這一行程式碼同樣定義了一個 int 型別的通道 a

通過通道進行傳送和接收

如下所示,該語法通過通道傳送和接收資料。

data := <- a // 讀取通道 a  
a <- data // 寫入通道 a

通道旁的箭頭方向指定了是傳送資料還是接收資料。

在第一行,箭頭對於 a 來說是向外指的,因此我們讀取了通道 a 的值,並把該值儲存到變數 data

在第二行,箭頭指向了 a,因此我們在把資料寫入通道 a

傳送與接收預設是阻塞的

傳送與接收預設是阻塞的。這是什麼意思?當把資料傳送到通道時,程式控制會在傳送資料的語句處發生阻塞,直到有其它 Go 協程從通道讀取到資料,才會解除阻塞。與此類似,當讀取通道的資料時,如果沒有其它的協程把資料寫入到這個通道,那麼讀取過程就會一直阻塞著。

通道的這種特效能夠幫助 Go 協程之間進行高效的通訊,不需要用到其他程式語言常見的顯式鎖或條件變數。

通道的程式碼示例

理論已經夠了:)。接下來寫點程式碼,看看協程之間通過通道是怎麼通訊的吧。

我們其實可以重寫上章學習 Go 協程 時寫的程式,現在我們在這裡用上通道。

首先引用前面教程裡的程式。

package main

import (  
    "fmt"
    "time"
)

func hello() {  
    fmt.Println("Hello world goroutine")
}
func main() {  
    go hello()
    time.Sleep(1 * time.Second)
    fmt.Println("main function")
}

線上執行程式

這是上一篇的程式碼。我們使用到了休眠,使 Go 主協程等待 hello 協程結束。如果你看不懂,建議你閱讀上一教程 Go 協程

我們接下來使用通道來重寫上面程式碼。

package main

import (  
    "fmt"
)

func hello(done chan bool) {  
    fmt.Println("Hello world goroutine")
    done <- true
}
func main() {  
    done := make(chan bool)
    go hello(done)
    <-done
    fmt.Println("main function")
}

線上執行程式

在上述程式裡,我們在第 12 行建立了一個 bool 型別的通道 done,並把 done 作為引數傳遞給了 hello 協程。在第 14 行,我們通過通道 done 接收資料。這一行程式碼發生了阻塞,除非有協程向 done 寫入資料,否則程式不會跳到下一行程式碼。於是,這就不需要用以前的 time.Sleep 來阻止 Go 主協程退出了。

<-done 這行程式碼通過協程(譯註:原文筆誤,通道)done 接收資料,但並沒有使用資料或者把資料儲存到變數中。這完全是合法的。

現在我們的 Go 主協程發生了阻塞,等待通道 done 傳送的資料。該通道作為引數傳遞給了協程 hellohello 打印出 Hello world goroutine,接下來向 done 寫入資料。當完成寫入時,Go 主協程會通過通道 done 接收資料,於是它解除阻塞狀態,打印出文字 main function

該程式輸出如下:

Hello world goroutine  
main function

我們稍微修改一下程式,在 hello 協程里加入休眠函式,以便更好地理解阻塞的概念。

package main

import (  
    "fmt"
    "time"
)

func hello(done chan bool) {  
    fmt.Println("hello go routine is going to sleep")
    time.Sleep(4 * time.Second)
    fmt.Println("hello go routine awake and going to write to done")
    done <- true
}
func main() {  
    done := make(chan bool)
    fmt.Println("Main going to call hello go goroutine")
    go hello(done)
    <-done
    fmt.Println("Main received data")
}

線上執行程式

在上面程式裡,我們向 hello 函式裡添加了 4 秒的休眠(第 10 行)。

程式首先會列印 Main going to call hello go goroutine。接著會開啟 hello 協程,列印 hello go routine is going to sleep。列印完之後,hello 協程會休眠 4 秒鐘,而在這期間,主協程會在 <-done 這一行發生阻塞,等待來自通道 done 的資料。4 秒鐘之後,列印 hello go routine awake and going to write to done,接著再列印 Main received data

通道的另一個示例

我們再編寫一個程式來更好地理解通道。該程式會計算一個數中每一位的平方和與立方和,然後把平方和與立方和相加並打印出來。

例如,如果輸出是 123,該程式會如下計算輸出:

squares = (1 * 1) + (2 * 2) + (3 * 3) 
cubes = (1 * 1 * 1) + (2 * 2 * 2) + (3 * 3 * 3) 
output = squares + cubes = 50

我們會這樣去構建程式:在一個單獨的 Go 協程計算平方和,而在另一個協程計算立方和,最後在 Go 主協程把平方和與立方和相加。

package main

import (  
    "fmt"
)

func calcSquares(number int, squareop chan int) {  
    sum := 0
    for number != 0 {
        digit := number % 10
        sum += digit * digit
        number /= 10
    }
    squareop <- sum
}

func calcCubes(number int, cubeop chan int) {  
    sum := 0 
    for number != 0 {
        digit := number % 10
        sum += digit * digit * digit
        number /= 10
    }
    cubeop <- sum
} 

func main() {  
    number := 589
    sqrch := make(chan int)
    cubech := make(chan int)
    go calcSquares(number, sqrch)
    go calcCubes(number, cubech)
    squares, cubes := <-sqrch, <-cubech
    fmt.Println("Final output", squares + cubes)
}

線上執行程式

在第 7 行,函式 calcSquares 計算一個數每位的平方和,並把結果傳送給通道 squareop。與此類似,在第 17 行函式 calcCubes 計算一個數每位的立方和,並把結果傳送給通道 cubop

這兩個函式分別在單獨的協程裡執行(第 31 行和第 32 行),每個函式都有傳遞通道的引數,以便寫入資料。Go 主協程會在第 33 行等待兩個通道傳來的資料。一旦從兩個通道接收完資料,資料就會儲存在變數 squares 和 cubes 裡,然後計算並打印出最後結果。該程式會輸出:

Final output 1536

死鎖

使用通道需要考慮的一個重點是死鎖。當 Go 協程給一個通道傳送資料時,照理說會有其他 Go 協程來接收資料。如果沒有的話,程式就會在執行時觸發 panic,形成死鎖。

同理,當有 Go 協程等著從一個通道接收資料時,我們期望其他的 Go 協程會向該通道寫入資料,要不然程式就會觸發 panic。

package main

func main() {  
    ch := make(chan int)
    ch <- 5
}

線上執行程式

在上述程式中,我們建立了一個通道 ch,接著在下一行 ch <- 5,我們把 5 傳送到這個通道。對於本程式,沒有其他的協程從 ch接收資料。於是程式觸發 panic,出現如下執行時錯誤。

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:  
main.main()  
    /tmp/sandbox249677995/main.go:6 +0x80

單向通道

我們目前討論的通道都是雙向通道,即通過通道既能傳送資料,又能接收資料。其實也可以建立單向通道,這種通道只能傳送或者接收資料。

package main

import "fmt"

func sendData(sendch chan<- int) {  
    sendch <- 10
}

func main() {  
    sendch := make(chan<- int)
    go sendData(sendch)
    fmt.Println(<-sendch)
}

線上執行程式

上面程式的第 10 行,我們建立了唯送(Send Only)通道 sendchchan<- int 定義了唯送通道,因為箭頭指向了 chan。在第 12 行,我們試圖通過唯送通道接收資料,於是編譯器報錯:

main.go:11: invalid operation: <-sendch (receive from send-only type chan<- int)

一切都很順利,只不過一個不能讀取資料的唯送通道究竟有什麼意義呢?

這就需要用到通道轉換(Channel Conversion)了。把一個雙向通道轉換成唯送通道或者唯收(Receive Only)通道都是行得通的,但是反過來就不行。

package main

import "fmt"

func sendData(sendch chan<- int) {  
    sendch <- 10
}

func main() {  
    cha1 := make(chan int)
    go sendData(cha1)
    fmt.Println(<-cha1)
}

線上執行程式

在上述程式的第 10 行,我們建立了一個雙向通道 cha1。在第 11 行 cha1 作為引數傳遞給了 sendData 協程。在第 5 行,函式 sendData 裡的引數 sendch chan<- int 把 cha1 轉換為一個唯送通道。於是該通道在 sendData 協程裡是一個唯送通道,而在 Go 主協程裡是一個雙向通道。該程式最終列印輸出 10

關閉通道和使用 for range 遍歷通道

資料傳送方可以關閉通道,通知接收方這個通道不再有資料傳送過來。

當從通道接收資料時,接收方可以多用一個變數來檢查通道是否已經關閉。

v, ok := <- ch

上面的語句裡,如果成功接收通道所傳送的資料,那麼 ok 等於 true。而如果 ok 等於 false,說明我們試圖讀取一個關閉的通道。從關閉的通道讀取到的值會是該通道型別的零值。例如,當通道是一個 int 型別的通道時,那麼從關閉的通道讀取的值將會是 0

package main

import (  
    "fmt"
)

func producer(chnl chan int) {  
    for i := 0; i < 10; i++ {
        chnl <- i
    }
    close(chnl)
}
func main() {  
    ch := make(chan int)
    go producer(ch)
    for {
        v, ok := <-ch
        if ok == false {
            break
        }
        fmt.Println("Received ", v, ok)
    }
}

線上執行程式

在上述的程式中,producer 協程會從 0 到 9 寫入通道 chn1,然後關閉該通道。主函式有一個無限的 for 迴圈(第 16 行),使用變數 ok(第 18 行)檢查通道是否已經關閉。如果 ok 等於 false,說明通道已經關閉,於是退出 for 迴圈。如果 ok 等於 true,會打印出接收到的值和 ok 的值。

Received  0 true  
Received  1 true  
Received  2 true  
Received  3 true  
Received  4 true  
Received  5 true  
Received  6 true  
Received  7 true  
Received  8 true  
Received  9 true

for range 迴圈用於在一個通道關閉之前,從通道接收資料。

接下來我們使用 for range 迴圈重寫上面的程式碼。

package main

import (  
    "fmt"
)

func producer(chnl chan int) {  
    for i := 0; i < 10; i++ {
        chnl <- i
    }
    close(chnl)
}
func main() {  
    ch := make(chan int)
    go producer(ch)
    for v := range ch {
        fmt.Println("Received ",v)
    }
}

線上執行程式

在第 16 行,for range 迴圈從通道 ch 接收資料,直到該通道關閉。一旦關閉了 ch,迴圈會自動結束。該程式會輸出:

Received  0  
Received  1  
Received  2  
Received  3  
Received  4  
Received  5  
Received  6  
Received  7  
Received  8  
Received  9

我們可以使用 for range 迴圈,重寫通道的另一個示例這一節裡面的程式碼,提高程式碼的可重用性。

如果你仔細觀察這段程式碼,會發現獲得一個數裡的每位數的程式碼在 calcSquares 和 calcCubes 兩個函式內重複了。我們將把這段程式碼抽離出來,放在一個單獨的函式裡,然後併發地呼叫它。

package main

import (  
    "fmt"
)

func digits(number int, dchnl chan int) {  
    for number != 0 {
        digit := number % 10
        dchnl <- digit
        number /= 10
    }
    close(dchnl)
}
func calcSquares(number int, squareop chan int) {  
    sum := 0
    dch := make(chan int)
    go digits(number, dch)
    for digit := range dch {
        sum += digit * digit
    }
    squareop <- sum
}

func calcCubes(number int, cubeop chan int) {  
    sum := 0
    dch := make(chan int)
    go digits(number, dch)
    for digit := range dch {
        sum += digit * digit * digit
    }
    cubeop <- sum
}

func main() {  
    number := 589
    sqrch := make(chan int)
    cubech := make(chan int)
    go calcSquares(number, sqrch)
    go calcCubes(number, cubech)
    squares, cubes := <-sqrch, <-cubech
    fmt.Println("Final output", squares+cubes)
}

線上執行程式

上述程式裡的 digits 函式,包含了獲取一個數的每位數的邏輯,並且 calcSquares 和 calcCubes 兩個函式併發地呼叫了 digits。當計算完數字裡面的每一位數時,第 13 行就會關閉通道。calcSquares 和 calcCubes 兩個協程使用 for range 迴圈分別監聽了它們的通道,直到該通道關閉。程式的其他地方不變,該程式同樣會輸出:

Final output 1536

本教程的內容到此結束。關於通道還有一些其他的概念,比如緩衝通道(Buffered Channel)、工作池(Worker Pool)和 select。我們會在接下來的教程裡專門介紹它們。

 

Go 系列教程 —— 23. 緩衝通道和工作池(Buffered Channels and Worker Pools)

 

什麼是緩衝通道?

上一教程裡,我們討論的主要是無緩衝通道。我們在通道的教程裡詳細討論了,無緩衝通道的傳送和接收過程是阻塞的。

我們還可以建立一個有緩衝(Buffer)的通道。只在緩衝已滿的情況,才會阻塞向緩衝通道(Buffered Channel)傳送資料。同樣,只有在緩衝為空的時候,才會阻塞從緩衝通道接收資料。

通過向 make 函式再傳遞一個表示容量的引數(指定緩衝的大小),可以建立緩衝通道。

ch := make(chan type, capacity)

要讓一個通道有緩衝,上面語法中的 capacity 應該大於 0。無緩衝通道的容量預設為 0,因此我們在上一教程建立通道時,省略了容量引數。

我們開始編寫程式碼,建立一個緩衝通道。

示例一

package main

import (  
    "fmt"
)


func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}

線上執行程式

在上面程式裡的第 9 行,我們建立了一個緩衝通道,其容量為 2。由於該通道的容量為 2,因此可向它寫入兩個字串,而且不會發生阻塞。在第 10 行和第 11 行,我們向通道寫入兩個字串,該通道並沒有發生阻塞。我們又在第 12 行和第 13 行分別讀取了這兩個字串。該程式輸出:

naveen  
paul

示例二

我們再看一個緩衝通道的示例,其中有一個併發的 Go 協程來向通道寫入資料,而 Go 主協程負責讀取資料。該示例幫助我們進一步理解,在向緩衝通道寫入資料時,什麼時候會發生阻塞。

package main

import (  
    "fmt"
    "time"
)

func write(ch chan int) {  
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {  
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)

    }
}

線上執行程式

在上面的程式中,第 16 行在 Go 主協程中建立了容量為 2 的緩衝通道 ch,而第 17 行把 ch 傳遞給了 write 協程。接下來 Go 主協程休眠了兩秒。在這期間,write 協程在併發地執行。write 協程有一個 for 迴圈,依次向通道 ch 寫入 0~4。而緩衝通道的容量為 2,因此 write 協程裡立即會向 ch 寫入 0 和 1,接下來發生阻塞,直到 ch 內的值被讀取。因此,該程式立即打印出下面兩行:

successfully wrote 0 to ch  
successfully wrote 1 to ch

列印上面兩行之後,write 協程中向 ch 的寫入發生了阻塞,直到 ch 有值被讀取到。而 Go 主協程休眠了兩秒後,才開始讀取該通道,因此在休眠期間程式不會列印任何結果。主協程結束休眠後,在第 19 行使用 for range 迴圈,開始讀取通道 ch,打印出了讀取到的值後又休眠兩秒,這個迴圈一直到 ch 關閉才結束。所以該程式在兩秒後會列印下面兩行:

read value 0 from ch  
successfully wrote 2 to ch

該過程會一直進行,直到通道讀取完所有的值,並在 write 協程中關閉通道。最終輸出如下:

successfully wrote 0 to ch  
successfully wrote 1 to ch  
read value 0 from ch  
successfully wrote 2 to ch  
read value 1 from ch  
successfully wrote 3 to ch  
read value 2 from ch  
successfully wrote 4 to ch  
read value 3 from ch  
read value 4 from ch

死鎖

package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

線上執行程式

在上面程式裡,我們向容量為 2 的緩衝通道寫入 3 個字串。當在程式控制到達第 3 次寫入時(第 11 行),由於它超出了通道的容量,因此這次寫入發生了阻塞。現在想要這次寫操作能夠進行下去,必須要有其它協程來讀取這個通道的資料。但在本例中,並沒有併發協程來讀取這個通道,因此這裡會發生死鎖(deadlock)。程式會在執行時觸發 panic,資訊如下:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:  
main.main()  
    /tmp/sandbox274756028/main.go:11 +0x100

長度 vs 容量

緩衝通道的容量是指通道可以儲存的值的數量。我們在使用 make 函式建立緩衝通道的時候會指定容量大小。

緩衝通道的長度是指通道中當前排隊的元素個數。

程式碼可以把一切解釋得很清楚。:)

package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}

線上執行程式

在上面的程式裡,我們建立了一個容量為 3 的通道,於是它可以儲存 3 個字串。接下來,我們分別在第 9 行和第 10 行向通道寫入了兩個字串。於是通道有兩個字串排隊,因此其長度為 2。在第 13 行,我們又從通道讀取了一個字串。現在該通道內只有一個字串,因此其長度變為 1。該程式會輸出:

capacity is 3  
length is 2  
read value naveen  
new length is 1

WaitGroup

在本教程的下一節裡,我們會講到工作池(Worker Pools)。而 WaitGroup 用於實現工作池,因此要理解工作池,我們首先需要學習 WaitGroup

WaitGroup 用於等待一批 Go 協程執行結束。程式控制會一直阻塞,直到這些協程全部執行完畢。假設我們有 3 個併發執行的 Go 協程(由 Go 主協程生成)。Go 主協程需要等待這 3 個協程執行結束後,才會終止。這就可以用 WaitGroup 來實現。

理論說完了,我們編寫點兒程式碼吧。:)

package main

import (  
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {  
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {  
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}

線上執行程式

WaitGroup 是一個結構體型別,我們在第 18 行建立了 WaitGroup 型別的變數,其初始值為零值。WaitGroup 使用計數器來工作。當我們呼叫 WaitGroup 的 Add 並傳遞一個 int 時,WaitGroup 的計數器會加上 Add 的傳參。要減少計數器,可以呼叫 WaitGroup 的 Done() 方法。Wait() 方法會阻塞呼叫它的 Go 協程,直到計數器變為 0 後才會停止阻塞。

上述程式裡,for 迴圈迭代了 3 次,我們在迴圈內呼叫了 wg.Add(1)(第 20 行)。因此計數器變為 3。for 迴圈同樣建立了 3 個 process 協程,然後在第 23 行呼叫了 wg.Wait(),確保 Go 主協程等待計數器變為 0。在第 13 行,process 協程內呼叫了 wg.Done,可以讓計數器遞減。一旦 3 個子協程都執行完畢(即 wg.Done() 呼叫了 3 次),那麼計數器就變為 0,於是主協程會解除阻塞。

在第 21 行裡,傳遞 wg 的地址是很重要的。如果沒有傳遞 wg 的地址,那麼每個 Go 協程將會得到一個 WaitGroup 值的拷貝,因而當它們執行結束時,main 函式並不會知道

該程式輸出:

started Goroutine  2  
started Goroutine  0  
started Goroutine  1  
Goroutine 0 ended  
Goroutine 2 ended  
Goroutine 1 ended  
All go routines finished executing

由於 Go 協程的執行順序不一定,因此你的輸出可能和我不一樣。:)

工作池的實現

緩衝通道的重要應用之一就是實現工作池

一般而言,工作池就是一組等待任務分配的執行緒。一旦完成了所分配的任務,這些執行緒可繼續等待任務的分配。

我們會使用緩衝通道來實現工作池。我們工作池的任務是計算所輸入數字的每一位的和。例如,如果輸入 234,結果會是 9(即 2 + 3 + 4)。向工作池輸入的是一列偽隨機數。

我們工作池的核心功能如下:

  • 建立一個 Go 協程池,監聽一個等待作業分配的輸入型緩衝通道。
  • 將作業新增到該輸入型緩衝通道中。
  • 作業完成後,再將結果寫入一個輸出型緩衝通道。
  • 從輸出型緩衝通道讀取並列印結果。

我們會逐步編寫這個程式,讓程式碼易於理解。

第一步就是建立一個結構體,表示作業和結果。

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

所有 Job 結構體變數都會有 id 和 randomno 兩個欄位,randomno 用於計算其每位數之和。

而 Result 結構體有一個 job 欄位,表示所對應的作業,還有一個 sumofdigits 欄位,表示計算的結果(每位數字之和)。

第二步是分別建立用於接收作業和寫入結果的緩衝通道。

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

工作協程(Worker Goroutine)會監聽緩衝通道 jobs 裡更新的作業。一旦工作協程完成了作業,其結果會寫入緩衝通道 results

如下所示,digits 函式的任務實際上就是計算整數的每一位之和,最後返回該結果。為了模擬出 digits 在計算過程中花費了一段時間,我們在函式內添加了兩秒的休眠時間。

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

然後,我們寫一個建立工作協程的函式。

func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}

上面的函式建立了一個工作者(Worker),讀取 jobs 通道的資料,根據當前的 job 和 digits 函式的返回值,建立了一個 Result結構體變數,然後將結果寫入 results 緩衝通道。worker 函式接收了一個 WaitGroup 型別的 wg 作為引數,當所有的 jobs 完成的時候,呼叫了 Done() 方法。

createWorkerPool 函式建立了一個 Go 協程的工作池。

func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

上面函式的引數是需要建立的工作協程的數量。在建立 Go 協程之前,它呼叫了 wg.Add(1) 方法,於是 WaitGroup 計數器遞增。接下來,我們建立工作協程,並向 worker 函式傳遞 wg 的地址。建立了需要的工作協程後,函式呼叫 wg.Wait(),等待所有的 Go 協程執行完畢。所有協程完成執行之後,函式會關閉 results 通道。因為所有協程都已經執行完畢,於是不再需要向 results 通道寫入資料了。

現在我們已經有了工作池,我們繼續編寫一個函式,把作業分配給工作者。

func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

上面的 allocate 函式接收所需建立的作業數量作為輸入引數,生成了最大值為 998 的偽隨機數,並使用該隨機數建立了 Job 結構體變數。這個函式把 for 迴圈的計數器 i 作為 id,最後把建立的結構體變數寫入 jobs 通道。當寫入所有的 job 時,它關閉了 jobs 通道。

下一步是建立一個讀取 results 通道和列印輸出的函式。

func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

result 函式讀取 results 通道,並打印出 job 的 id、輸入的隨機數、該隨機數的每位數之和。result 函式也接受 done 通道作為引數,當列印所有結果時,done 會被寫入 true。

現在一切準備充分了。我們繼續完成最後一步,在 main() 函式中呼叫上面所有的函式。

func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

我們首先在 main 函式的第 2 行,儲存了程式的起始時間,並在最後一行(第 12 行)計算了 endTime 和 startTime 的差值,顯示出程式執行的總時間。由於我們想要通過改變協程數量,來做一點基準指標(Benchmark),所以需要這麼做。

我們把 noOfJobs 設定為 100,接下來呼叫了 allocate,向 jobs 通道新增作業。

我們建立了 done 通道,並將其傳遞給 result 協程。於是該協程會開始列印結果,並在完成列印時發出通知。

通過呼叫 createWorkerPool 函式,我們最終建立了一個有 10 個協程的工作池。main 函式會監聽 done 通道的通知,等待所有結果列印結束。

為了便於參考,下面是整個程式。我還引用了必要的包。

package main

import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

線上執行程式

為了更精確地計算總時間,請在你的本地機器上執行該程式。

該程式輸出:

Job id 1, input random no 636, sum of digits 15  
Job id 0, input random no 878, sum of digits 23  
Job id 9, input random no 150, sum of digits 6  
...
total time taken  20.01081009 seconds

程式總共會列印 100 行,對應著 100 項作業,然後最後會列印一行程式消耗的總時間。你的輸出會和我的不同,因為 Go 協程的執行順序不一定,同樣總時間也會因為硬體而不同。在我的例子中,執行程式大約花費了 20 秒。

現在我們把 main 函式裡的 noOfWorkers 增加到 20。我們把工作者的數量加倍了。由於工作協程增加了(準確說來是兩倍),因此程式花費的總時間會減少(準確說來是一半)。在我的例子裡,程式會打印出 10.004364685 秒。

...
total time taken  10.004364685 seconds

現在我們可以理解了,隨著工作協程數量增加,完成作業的總時間會減少。你們可以練習一下:在 main 函式裡修改 noOfJobs 和 noOfWorkers 的值,並試著去分析一下結果。

 

Go 系列教程 —— 24. Select

什麼是 select?

select 語句用於在多個傳送/接收通道操作中進行選擇。select 語句會一直阻塞,直到傳送/接收操作準備就緒。如果有多個通道操作準備完畢,select 會隨機地選取其中之一執行。該語法與 switch 類似,所不同的是,這裡的每個 case 語句都是通道操作。我們好好看一些程式碼來加深理解吧。

示例

package main

import (  
    "fmt"
    "time"
)

func server1(ch chan string) {  
    time.Sleep(6 * time.Second)
    ch <- "from server1"
}
func server2(ch chan string) {  
    time.Sleep(3 * time.Second)
    ch <- "from server2"

}
func main() {  
    output1 := make(chan string)
    output2 := make(chan string)
    go server1(output1)
    go server2(output2)
    select {
    case s1 := <-output1:
        fmt.Println(s1)
    case s2 := <-output2:
        fmt.Println(s2)
    }
}

線上執行程式

在上面程式裡,server1 函式(第 8 行)休眠了 6 秒,接著將文字 from server1 寫入通道 ch。而 server2 函式(第 12 行)休眠了 3 秒,然後把 from server2 寫入了通道 ch

而 main 函式在第 20 行和第 21 行,分別呼叫了 server1 和 server2 兩個 Go 協程。

在第 22 行,程式執行到了 select 語句。select 會一直髮生阻塞,除非其中有 case 準備就緒。在上述程式裡,server1 協程會在 6 秒之後寫入 output1 通道,而server2 協程在 3 秒之後就寫入了 output2 通道。因此 select 語句會阻塞 3 秒鐘,等著 server2 向 output2 通道寫入資料。3 秒鐘過後,程式會輸出:

from server2

然後程式終止。

select 的應用

在上面程式中,函式之所以取名為 server1 和 server2,是為了展示 select 的實際應用。

假設我們有一個關鍵性應用,需要儘快地把輸出返回給使用者。這個應用的資料庫複製並且儲存在世界各地的伺服器上。假設函式 server1 和 server2 與這樣不同區域的兩臺伺服器進行通訊。每臺伺服器的負載和網路時延決定了它的響應時間。我們向兩臺伺服器傳送請求,並使用 select 語句等待相應的通道發出響應。select 會選擇首先響應的伺服器,而忽略其它的響應。使用這種方法,我們可以向多個伺服器傳送請求,並給使用者返回最快的響應了。:)

預設情況

在沒有 case 準備就緒時,可以執行 select 語句中的預設情況(Default Case)。這通常用於防止 select 語句一直阻塞。

package main

import (  
    "fmt"
    "time"
)

func process(ch chan string) {  
    time.Sleep(10500 * time.Millisecond)
    ch <- "process successful"
}

func main() {  
    ch := make(chan string)
    go process(ch)
    for {
        time.Sleep(1000 * time.Millisecond)
        select {
        case v := <-ch:
            fmt.Println("received value: ", v)
            return
        default:
            fmt.Println("no value received")
        }
    }

}

線上執行程式

上述程式中,第 8 行的 process 函式休眠了 10500 毫秒(10.5 秒),接著把 process successful 寫入 ch 通道。在程式中的第 15 行,併發地呼叫了這個函式。

在併發地呼叫了 process 協程之後,主協程啟動了一個無限迴圈。這個無限迴圈在每一次迭代開始時,都會先休眠 1000 毫秒(1 秒),然後執行一個 select 操作。在最開始的 10500 毫秒中,由於 process 協程在 10500 毫秒後才會向 ch 通道寫入資料,因此 select 語句的第一個 case(即 case v := <-ch:)並未就緒。所以在這期間,程式會執行預設情況,該程式會列印 10 次 no value received

在 10.5 秒之後,process 協程會在第 10 行向 ch 寫入 process successful。現在,就可以執行 select 語句的第一個 case 了,程式會列印 received value: process successful,然後程式終止。該程式會輸出:

no value received  
no value received  
no value received  
no value received  
no value received  
no value received  
no value received  
no value received  
no value received  
no value received  
received value:  process successful

死鎖與預設情況

package main

func main() {  
    ch := make(chan string)
    select {
    case <-ch:
    }
}

線上執行程式

上面的程式中,我們在第 4 行建立了一個通道 ch。我們在 select 內部(第 6 行),試圖讀取通道 ch。由於沒有 Go 協程向該通道寫入資料,因此 select 語句會一直阻塞,導致死鎖。該程式會觸發執行時 panic,報錯資訊如下:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:  
main.main()  
    /tmp/sandbox416567824/main.go:6 +0x80

如果存在預設情況,就不會發生死鎖,因為在沒有其他 case 準備就緒時,會執行預設情況。我們用預設情況重寫後,程式如下:

package main

import "fmt"

func main() {  
    ch := make(chan string)
    select {
    case <-ch:
    default:
        fmt.Println("default case executed")
    }
}

線上執行程式

以上程式會輸出:

default case executed

如果 select 只含有值為 nil 的通道,也同樣會執行預設情況。

package main

import "fmt"

func main() {  
    var ch chan string
    select {
    case v := <-ch:
        fmt.Println("received value", v)
    default:
        fmt.Println("default case executed")

    }
}

線上執行程式

在上面程式中,ch 等於 nil,而我們試圖在 select 中讀取 ch(第 8 行)。如果沒有預設情況,select 會一直阻塞,導致死鎖。由於我們在 select 內部加入了預設情況,程式會執行它,並輸出:

default case executed

隨機選取

當 select 由多個 case 準備就緒時,將會隨機地選取其中之一去執行。

package main

import (  
    "fmt"
    "time"
)

func server1(ch chan string) {  
    ch <- "from server1"
}
func server2(ch chan string) {  
    ch <- "from server2"

}
func main() {  
    output1 := make(chan string)
    output2 := make(chan string)
    go server1(output1)
    go server2(output2)
    time.Sleep(1 * time.Second)
    select {
    case s1 := <-output1:
        fmt.Println(s1)
    case s2 := <-output2:
        fmt.Println(s2)
    }
}

線上執行程式

在上面程式裡,我們在第 18 行和第 19 行分別呼叫了 server1 和 server2 兩個 Go 協程。接下來,主程式休眠了 1 秒鐘(第 20 行)。當程式控制到達第 21 行的 select 語句時,server1 已經把 from server1 寫到了 output1 通道上,而 server2 也同樣把 from server2 寫到了 output2 通道上。因此這個 select 語句中的兩種情況都準備好執行了。如果你執行這個程式很多次的話,輸出會是 from server1 或者 from server2,這會根據隨機選取的結果而變化。

請在你的本地系統上執行這個程式,獲得程式的隨機結果。因為如果你在 playground 上線上執行的話,它的輸出總是一樣的,這是由於 playground 不具有隨機性所造成的。

這下我懂了:空 select

package main

func main() {  
    select {}
}

線上執行程式

你認為上面程式碼會輸出什麼?

我們已經知道,除非有 case 執行,select 語句就會一直阻塞著。在這裡,select 語句沒有任何 case,因此它會一直阻塞,導致死鎖。該程式會觸發 panic,輸出如下:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select (no cases)]:  
main.main()  
    /tmp/sandbox299546399/main.go:4 +0x20

 

Go 系列教程 —— 25. Mutex

本教程我們學習 Mutex。我們還會學習怎樣通過 Mutex 和通道來處理競態條件(Race Condition)。

臨界區

在學習 Mutex 之前,我們需要理解併發程式設計中臨界區(Critical Section)的概念。當程式併發地執行時,多個 Go 協程不應該同時訪問那些修改共享資源的程式碼。這些修改共享資源的程式碼稱為臨界區。例如,假設我們有一段程式碼,將一個變數 x 自增 1。

x = x + 1

如果只有一個 Go 協程訪問上面的程式碼段,那都沒有任何問題。

但當有多個協程併發執行時,程式碼卻會出錯,讓我們看看究竟是為什麼吧。簡單起見,假設在一行程式碼的前面,我們已經運行了兩個 Go 協程。

在上一行程式碼的內部,系統執行程式時分為如下幾個步驟(這裡其實還有很多包括暫存器的技術細節,以及加法的工作原理等,但對於我們的系列教程,只需認為只有三個步驟就好了):

  1. 獲得 x 的當前值
  2. 計算 x + 1
  3. 將步驟 2 計算得到的值賦值給 x

如果只有一個協程執行上面的三個步驟,不會有問題。

我們討論一下當有兩個併發的協程執行該程式碼時,會發生什麼。下圖描述了當兩個協程併發地訪問程式碼行 x = x + 1 時,可能出現的一種情況。

one-scenario

我們假設 x 的初始值為 0。而協程 1 獲取 x 的初始值,並計算 x + 1。而在協程 1 將計算值賦值給 x 之前,系統上下文切換到了協程 2。於是,協程 2 獲取了 x 的初始值(依然為 0),並計算 x + 1。接著系統上下文又切換回了協程 1。現在,協程 1 將計算值 1 賦值給 x,因此 x 等於 1。然後,協程 2 繼續開始執行,把計算值(依然是 1)複製給了 x,因此在所有協程執行完畢之後,x 都等於 1。

現在我們考慮另外一種可能發生的情況。

another-scenario

在上面的情形裡,協程 1 開始執行,完成了三個步驟後結束,因此 x 的值等於 1。接著,開始執行協程 2。目前 x 的值等於 1。而當協程 2 執行完畢時,x 的值等於 2。

所以,從這兩個例子你可以發現,根據上下文切換的不同情形,x 的最終值是 1 或者 2。這種不太理想的情況稱為競態條件(Race Condition),其程式的輸出是由協程的執行順序決定的。

在上例中,如果在任意時刻只允許一個 Go 協程訪問臨界區,那麼就可以避免競態條件。而使用 Mutex 可以達到這個目的

Mutex

Mutex 用於提供一種加鎖機制(Locking Mechanism),可確保在某時刻只有一個協程在臨界區執行,以防止出現競態條件。

Mutex 可以在 sync 包內找到。Mutex 定義了兩個方法:Lock 和 Unlock。所有在 Lock 和 Unlock 之間的程式碼,都只能由一個 Go 協程執行,於是就可以避免競態條件。

mutex.Lock()  
x = x + 1  
mutex.Unlock()

在上面的程式碼中,x = x + 1 只能由一個 Go 協程執行,因此避免了競態條件。

如果有一個 Go 協程已經持有了鎖(Lock),當其他協程試圖獲得該鎖時,這些協程會被阻塞,直到 Mutex 解除鎖定為止。

含有競態條件的程式

在本節裡,我們會編寫一個含有競態條件的程式,而在接下來一節,我們再修復競態條件的問題。

package main  
import (