1. 程式人生 > >二十二、Go基礎程式設計:併發程式設計—channel

二十二、Go基礎程式設計:併發程式設計—channel

goroutine執行在相同的地址空間,因此訪問共享記憶體必須做好同步。goroutine 奉行通過通訊來共享記憶體,而不是共享記憶體來通訊。

引⽤型別 channel 是 CSP 模式的具體實現,用於多個 goroutine 通訊。其內部實現了同步,確保併發安全。

1 channel型別

和map類似,channel也一個對應make建立的底層資料結構的引用。

當我們複製一個channel或用於函式引數傳遞時,我們只是拷貝了一個channel引用,因此呼叫者何被呼叫者將引用同一個channel物件。和其它的引用型別一樣,channel的零值也是nil。

定義一個channel時,也需要定義傳送到channel的值的型別。channel可以使用內建的make()函式來建立:
 

make(chan Type) //等價於make(chan Type, 0)
    make(chan Type, capacity)

當 capacity= 0 時,channel 是無緩衝阻塞讀寫的,當capacity> 0 時,channel 有緩衝、是非阻塞的,直到寫滿 capacity個元素才阻塞寫入。

channel通過操作符<-來接收和傳送資料,傳送和接收資料語法:

    channel <- value      //傳送value到channel
    <-channel             //接收並將其丟棄
    x := <-channel        //從channel中接收資料,並賦值給x
    x, ok := <-channel    //功能同上,同時檢查通道是否已關閉或者是否為空

預設情況下,channel接收和傳送資料都是阻塞的,除非另一端已經準備好,這樣就使得goroutine同步變的更加的簡單,而不需要顯式的lock。

示例程式碼:

func main() {
    c := make(chan int)

    go func() {
        defer fmt.Println("子協程結束")

        fmt.Println("子協程正在執行……")

        c <- 666 //666傳送到c
    }()

    num := <-c //從c中接收資料,並賦值給num

    fmt.Println("num = ", num)
    fmt.Println("main協程結束")
}

程式執行結果:

 

2 無緩衝的channel

無緩衝的通道(unbuffered channel)是指在接收前沒有能力儲存任何值的通道。

這種型別的通道要求傳送 goroutine 和接收 goroutine 同時準備好,才能完成傳送和接收操作。如果兩個goroutine沒有同時準備好,通道會導致先執行傳送或接收操作的 goroutine 阻塞等待。

這種對通道進行傳送和接收的互動行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。

下圖展示兩個 goroutine 如何利用無緩衝的通道來共享一個值: 

在第 1 步,兩個 goroutine 都到達通道,但哪個都沒有開始執行傳送或者接收。
在第 2 步,左側的 goroutine 將它的手伸進了通道,這模擬了向通道傳送資料的行為。這時,這個 goroutine 會在通道中被鎖住,直到交換完成。
在第 3 步,右側的 goroutine 將它的手放入通道,這模擬了從通道里接收資料。這個 goroutine 一樣也會在通道中被鎖住,直到交換完成。
在第 4 步和第 5 步,進行交換,並最終,在第 6 步,兩個 goroutine 都將它們的手從通道里拿出來,這模擬了被鎖住的 goroutine 得到釋放。兩個 goroutine 現在都可以去做別的事情了。


無緩衝的channel建立格式:
 

make(chan Type) //等價於make(chan Type, 0)
func main() {
    c := make(chan int, 0) //無緩衝的通道

    //內建函式 len 返回未被讀取的緩衝元素數量, cap 返回緩衝區大小
    fmt.Printf("len(c)=%d, cap(c)=%d\n", len(c), cap(c))

    go func() {
        defer fmt.Println("子協程結束")

        for i := 0; i < 3; i++ {
            c <- i
            fmt.Printf("子協程正在執行[%d]: len(c)=%d, cap(c)=%d\n", i, len(c), cap(c))
        }
    }()

    time.Sleep(2 * time.Second) //延時2s

    for i := 0; i < 3; i++ {
        num := <-c //從c中接收資料,並賦值給num
        fmt.Println("num = ", num)
    }

    fmt.Println("main協程結束")
}

程式執行結果:  

 

3 有緩衝的channel

有緩衝的通道(buffered channel)是一種在被接收前能儲存一個或者多個值的通道。

這種型別的通道並不強制要求 goroutine 之間必須同時完成傳送和接收。通道會阻塞傳送和接收動作的條件也會不同。只有在通道中沒有要接收的值時,接收動作才會阻塞。只有在通道沒有可用緩衝區容納被髮送的值時,傳送動作才會阻塞。

這導致有緩衝的通道和無緩衝的通道之間的一個很大的不同:無緩衝的通道保證進行傳送和接收的 goroutine 會在同一時間進行資料交換;有緩衝的通道沒有這種保證。

示例圖如下:

在第 1 步,右側的 goroutine 正在從通道接收一個值。
在第 2 步,右側的這個 goroutine獨立完成了接收值的動作,而左側的 goroutine 正在傳送一個新值到通道里。
在第 3 步,左側的goroutine 還在向通道傳送新值,而右側的 goroutine 正在從通道接收另外一個值。這個步驟裡的兩個操作既不是同步的,也不會互相阻塞。
最後,在第 4 步,所有的傳送和接收都完成,而通道里還有幾個值,也有一些空間可以存更多的值。


有緩衝的channel建立格式:
make(chan Type, capacity)

如果給定了一個緩衝區容量,通道就是非同步的。只要緩衝區有未使用空間用於傳送資料,或還包含可以接收的資料,那麼其通訊就會無阻塞地進行。

示例程式碼

func main() {
    c := make(chan int, 3) //帶緩衝的通道

    //內建函式 len 返回未被讀取的緩衝元素數量, cap 返回緩衝區大小
    fmt.Printf("len(c)=%d, cap(c)=%d\n", len(c), cap(c))

    go func() {
        defer fmt.Println("子協程結束")

        for i := 0; i < 3; i++ {
            c <- i
            fmt.Printf("子協程正在執行[%d]: len(c)=%d, cap(c)=%d\n", i, len(c), cap(c))
        }
    }()

    time.Sleep(2 * time.Second) //延時2s
    for i := 0; i < 3; i++ {
        num := <-c //從c中接收資料,並賦值給num
        fmt.Println("num = ", num)
    }
    fmt.Println("main協程結束")
}

程式執行結果:

這裡寫圖片描述 

4 range和close

如果傳送者知道,沒有更多的值需要傳送到channel的話,那麼讓接收者也能及時知道沒有多餘的值可接收將是有用的,因為接收者可以停止不必要的接收等待。這可以通過內建的close函式來關閉channel實現。

示例程式碼:

func main() {
    c := make(chan int)

    go func() {
        for i := 0; i < 5; i++ {
            c <- i
        }
        //把 close(c) 註釋掉,程式會一直阻塞在 if data, ok := <-c; ok 那一行
        close(c)
    }()

    for {
        //ok為true說明channel沒有關閉,為false說明管道已經關閉
        if data, ok := <-c; ok {
            fmt.Println(data)
        } else {
            break
        }
    }

    fmt.Println("Finished")
}

 程式執行結果:

注意點:


channel不像檔案一樣需要經常去關閉,只有當你確實沒有任何傳送資料了,或者你想顯式的結束range迴圈之類的,才去關閉channel;
關閉channel後,無法向channel 再發送資料(引發 panic 錯誤後導致接收立即返回零值);
關閉channel後,可以繼續向channel接收資料;
對於nil channel,無論收發都會被阻塞。


可以使用 range 來迭代不斷操作channel:
 

func main() {
    c := make(chan int)

    go func() {
        for i := 0; i < 5; i++ {
            c <- i
        }
        //把 close(c) 註釋掉,程式會一直阻塞在 for data := range c 那一行
        close(c)
    }()

    for data := range c {
        fmt.Println(data)
    }

    fmt.Println("Finished")
}

5 單方向的channel

預設情況下,通道是雙向的,也就是,既可以往裡面傳送資料也可以同裡面接收資料。

但是,我們經常見一個通道作為引數進行傳遞而值希望對方是單向使用的,要麼只讓它傳送資料,要麼只讓它接收資料,這時候我們可以指定通道的方向。

單向channel變數的宣告非常簡單,如下:
 

var ch1 chan int       // ch1是一個正常的channel,不是單向的
var ch2 chan<- float64 // ch2是單向channel,只用於寫float64資料
var ch3 <-chan int     // ch3是單向channel,只用於讀取int資料
  • chan<- 表示資料進入管道,要把資料寫進管道,對於呼叫者就是輸出。
  • <-chan 表示資料從管道出來,對於呼叫者就是得到管道的資料,當然就是輸入。

可以將 channel 隱式轉換為單向佇列,只收或只發,不能將單向 channel 轉換為普通 channel:

   c := make(chan int, 3)
    var send chan<- int = c // send-only
    var recv <-chan int = c // receive-only
    send <- 1
    //<-send //invalid operation: <-send (receive from send-only type chan<- int)
    <-recv
    //recv <- 2 //invalid operation: recv <- 2 (send to receive-only type <-chan int)

    //不能將單向 channel 轉換為普通 channel
    d1 := (chan int)(send) //cannot convert send (type chan<- int) to type chan int
    d2 := (chan int)(recv) //cannot convert recv (type <-chan int) to type chan int
//   chan<- //只寫
func counter(out chan<- int) {
    defer close(out)
    for i := 0; i < 5; i++ {
        out <- i //如果對方不讀 會阻塞
    }
}

//   <-chan //只讀
func printer(in <-chan int) {
    for num := range in {
        fmt.Println(num)
    }
}

func main() {
    c := make(chan int) //   chan   //讀寫

    go counter(c) //生產者
    printer(c)    //消費者

    fmt.Println("done")
}

6 定時器

6.1 Timer

Timer是一個定時器,代表未來的一個單一事件,你可以告訴timer你要等待多長時間,它提供一個channel,在將來的那個時間那個channel提供了一個時間值。

示例程式碼:

import "fmt"
import "time"

func main() {
    //建立定時器,2秒後,定時器就會向自己的C位元組傳送一個time.Time型別的元素值
    timer1 := time.NewTimer(time.Second * 2)
    t1 := time.Now() //當前時間
    fmt.Printf("t1: %v\n", t1)

    t2 := <-timer1.C
    fmt.Printf("t2: %v\n", t2)

    //如果只是想單純的等待的話,可以使用 time.Sleep 來實現
    timer2 := time.NewTimer(time.Second * 2)
    <-timer2.C
    fmt.Println("2s後")

    time.Sleep(time.Second * 2)
    fmt.Println("再一次2s後")

    <-time.After(time.Second * 2)
    fmt.Println("再再一次2s後")

    timer3 := time.NewTimer(time.Second)
    go func() {
        <-timer3.C
        fmt.Println("Timer 3 expired")
    }()

    stop := timer3.Stop() //停止定時器
    if stop {
        fmt.Println("Timer 3 stopped")
    }

    fmt.Println("before")
    timer4 := time.NewTimer(time.Second * 5) //原來設定3s
    timer4.Reset(time.Second * 1)            //重新設定時間
    <-timer4.C
    fmt.Println("after")
}

6.2 Ticker

Ticker是一個定時觸發的計時器,它會以一個間隔(interval)往channel傳送一個事件(當前時間),而channel的接收者可以以固定的時間間隔從channel中讀取事件。

示例程式碼:

func main() {
    //建立定時器,每隔1秒後,定時器就會給channel傳送一個事件(當前時間)
    ticker := time.NewTicker(time.Second * 1)

    i := 0
    go func() {
        for { //迴圈
            <-ticker.C
            i++
            fmt.Println("i = ", i)

            if i == 5 {
                ticker.Stop() //停止定時器
            }
        }
    }() //別忘了()

    //死迴圈,特地不讓main goroutine結束
    for {
    }
}