1. 程式人生 > >Go併發程式設計--通過channel實現生產者消費者模型

Go併發程式設計--通過channel實現生產者消費者模型

概述

生產者消費者模型是多執行緒設計的經典模型,該模型被廣泛的應用到各個系統的多執行緒/程序模型設計中。本文介紹了Go語言中channel的特性,並通過Go語言實現了兩個生產者消費者模型。

channel的一些特性

在Go中channel是非常重要的協程通訊的手段,channel是雙向的通道,通過channel可以實現協程間資料的傳遞,通過channel也可以實現協程間的同步(後面會有介紹)。本文介紹的生產者消費者模型主要用到了channel的以下特性:任意時刻只能有一個協程能夠對channel中某一個item進行訪問。

單生產者單消費者模型

把生產者和消費者都放到一個無線迴圈中,這個和我們的伺服器端的任務處理非常相似。生產者不斷的向channel中放入資料,而消費者不斷的從channel中取出資料,並對資料進行處理(列印)。由於生產者的協程不會退出,所以channel的寫入會永久存在,這樣當channel中沒有放入資料時,消費者端將會阻塞,等待生產者端放入資料。

程式碼的實現如下:

package main

import (
    "fmt"
    "time"
)

var ch1 chan int = make(chan int)
var bufChan chan int = make(chan int, 1000)
var msgChan chan int = make(chan int)

func sum(a int, b int) {
    ch1 <- a + b
}

// write data to channel
func writer(max int) {
    for {
        for i := 0; i < max; i++ {  // 簡單的向channel中放入一個整數
bufChan <- i time.Sleep(1 * time.Millisecond) //控制放入的頻率 } } } // read data fro m channel func reader(max int) { for { r := <-bufChan fmt.Printf("read value: %d\n", r) } // 通知主執行緒,工作結束了,這一步可以省略 msgChan <- 1 } func testWriterAndReader(max int
) { go writer(max) go reader(max) // writer 和reader的任務結束了,主執行緒會得到通知 res := <-msgChan fmt.Printf("task is done: value=%d\n", res) } func main() { testWriterAndReader(100) }

多生產者消費者模型

我們可以利用channel在某個時間點只能有一個協程能夠訪問其中的某一個數據,的特性來實現生產者消費者模型。由於channel具有這樣的特性,我們在放資料和消費資料時可以不需要加鎖。

package main

import (
    "time"
    "fmt"
    "os"
)

var ch1 chan int = make(chan int)
var bufChan chan int = make(chan int, 1000)
var msgChan chan string = make(chan string)

func sum(a int, b int) {
    ch1 <- a + b
}

// write data to channel
func writer(max int) {
    for {
        for i := 0; i < max; i++ {
            bufChan <- i
            fmt.Fprintf(os.Stderr, "%v write: %d\n", os.Getpid(), i)
            time.Sleep(10 * time.Millisecond)
        }
    }
}

// read data fro m channel
func reader(name string) {
    for {
        r := <-bufChan
        fmt.Printf("%s read value: %d\n", name, r)
    }
    msgChan <- name
}

func testWriterAndReader(max int) {
    // 開啟多個writer的goroutine,不斷地向channel中寫入資料
    go writer(max)
    go writer(max)

    // 開啟多個reader的goroutine,不斷的從channel中讀取資料,並處理資料
    go reader("read1")
    go reader("read2")
    go reader("read3")

    // 獲取三個reader的任務完成狀態
    name1 := <-msgChan
    name2 := <-msgChan
    name3 := <-msgChan

    fmt.Println("%s,%s,%s: All is done!!", name1, name2, name3)
}

func main() {
    testWriterAndReader(100)
}

輸出如下:

read3 read value: 0
80731 write: 0
80731 write: 0
read1 read value: 0
80731 write: 1
read2 read value: 1
80731 write: 1
read3 read value: 1
80731 write: 2
read2 read value: 2
80731 write: 2
... ...

總結

本文通過channel實現了經典的生產者和消費者模型,利用了channel的特性。但要注意,當消費者的速度小於生產者時,channel就有可能產生擁塞,導致佔用記憶體增加,所以,在實際場景中需要考慮channel的緩衝區的大小。設定了channel的大小,當生產的資料大於channel的容量時,生產者將會阻塞,這些問題都是要在實際場景中需要考慮的。
一個解決辦法就是使用一個固定的陣列或切片作為環形緩衝區,而非channel,通過Sync包的機制來進行同步,實現生產者消費者模型,這樣可以避免由於channel滿而導致消費者端阻塞。但,對於環形緩衝區而言,可能會覆蓋老的資料,同樣需要考慮具體的使用場景。關於環形緩衝區的原理和實現,在分析Sync包的使用時再進一步分析。