1. 程式人生 > >Golang教程:(二十三)緩衝通道和工人池

Golang教程:(二十三)緩衝通道和工人池

什麼是緩衝通道

上一節我們討論的是基本的無緩衝區的通道,對一個無緩衝的通道進行傳送/寫入和接收/讀取資料是實時阻塞的。事實上我們可以建立一個帶緩衝區的通道,往通道中傳送資料時,只有當緩衝區滿的情況下才會阻塞;類似的,只有我們從通道中讀取資料時,只有讀到緩衝區空為止才會阻塞。

帶緩衝的通道可以通過內建函式make來建立,需要額外指定緩衝區大小。

ch := make(chan type, capacity)  

capacity的值應該大於0,當為0時即我們上節所討論的無緩衝通道。

讓我們舉例說明一下:

package main

import (  
    "fmt"
)



func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}
在playground執行

在上面的例子中我們建立了一個緩衝區大小為2的通道,這樣我們就可以往通道中寫入2個字串後才被阻塞,上面的例子中我們分別往通道中寫入兩個字串,然後再分別讀出。程式列印輸出:

naveen  
paul  

讓我們再通過另外一個例子能比較好理解緩衝通道的概念。

package main


import (  
    "fmt"
    "time"
)


func write(ch chan int) {  
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {  
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)


    }
}

在上面的例子中,首先建立了一個緩衝區大小為2的通道ch :=make(chanint,2),在main協程中建立了一個write的協程。然後main協程sleep了2秒,在這個時間內,write協程併發的執行起來,只要是迴圈往我們建立的通道中寫入0..4。因為我們的緩衝區大小是2,所以當write協程中往通道寫入0和1之後,通道就被阻塞,直到最少有一個值被讀出。所以我們的程式會先打印出:

successfully wrote 0 to ch  
successfully wrote 1 to ch 

之後當main協程sleep結束後,會迴圈讀取channl中的值,並打印出來,然後再次休眠2秒,然後重複這個過程直到ch被關閉,所以程式會接著列印:

read value 0 from ch  
successfully wrote 2 to ch  

這個過程會持續進行:write協程寫入資料->channel緩衝滿,阻塞->main協程讀取資料->channl緩衝非滿,解除阻塞->wirte協程寫入shuju->...->直到緩衝區空,關閉channel。最終列印結果如下:

successfully wrote 0 to ch  
successfully wrote 1 to ch  
read value 0 from ch  
successfully wrote 2 to ch  
read value 1 from ch  
successfully wrote 3 to ch  
read value 2 from ch  
successfully wrote 4 to ch  
read value 3 from ch  
read value 4 from ch  

死鎖

package main


import (  
    "fmt"
)


func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

在上面的例子中,我們往一個緩衝區為2的通道中寫入了3個元素,當我們三個元素寫入時因為已經達到最大的容量所以協程會阻塞掉,直到有其他協程從channel中讀取出來,但是我們這個例子中沒有其他協程進行這個工作,所以就會造成死鎖的出現,程式會列印下面資訊:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:  
main.main()  
    /tmp/sandbox274756028/main.go:11 +0x100

Length vs Capacity

capacity是指緩衝通道能容納的最大元素的個數,這個值是我們建立緩衝通道時用make函式指定的。length是指當前緩衝通道中的元素個數。上例子演示:

package main


import (  
    "fmt"
)


func main() {  
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}

在上面的例子中,建立了一個capacity為3的通道,就是說可以容納3個string元素。當我們寫入了2個元素到通道中時,這時候的長度就是2,然後又從通道中讀出1個元素,現在通道的長度就變成了1。結果列印如下:

capacity is 3  
length is 2  
read value naveen  
new length is 1

WaitGroup (等待組 ps:翻譯不一定準確)

 下面我們要開始講解併發中的工人池(worker pools)的概念,在此之前我們必須要先了解下waitgroup的概念,因為它被用來實現工人池. 一個等待組(waitgroup)是用來等待一組協程的執行完成。在所有的協程完成之前會一直處於阻塞狀態。舉個栗子,一個main協程中開啟了3個併發的協程,主協程要等待3個子協程結束後再銷燬,我們可以用等待組來實現:

package main


import (  
    "fmt"
    "sync"
    "time"
)


func process(i int, wg *sync.WaitGroup) {  
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}


func main() {  
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}

等待組是一個結構體型別,var wg sync.WaitGroup 建立了一個0值的waitgroup變數。waitgroup是通過計數器來工作的.當我們呼叫waitgroup的Add方法並傳遞一個int值給他,waitgroup的計數器就會增加相應的int值。通過呼叫Done()方法來減少計數器的值。wait()方法會阻塞呼叫此函式的協程,直到waitgroup中的計數器值變成0.

在上面的例子中我們迴圈3次呼叫了wg.Add(1),所以waitgroup的計數器值變為3,main協程呼叫wg.Wait()方法後就會處於阻塞狀態.同時我們也建立了3個子協程,當3個子協程執行結束時會呼叫wg.Done()3次,計數器值變為0,main協程就會被解除阻塞狀態。

需要注意的是我們在goprocess(i,&wg) 傳入的是waitgroup的地址,如果不傳入地址的話,那麼各個子協程會建立它們各自的一個waitgroup拷貝,而main協程不會得到子協程的結束狀態。程式輸出如下:

started Goroutine  0  
started Goroutine  1  
Goroutine 0 ended  
Goroutine 2 ended  
Goroutine 1 ended  
All go routines finished executing  

每個人的輸出結果可能不盡相同,因為協程具體執行的次序是不同的。

工人池(Worker Pool)的實現

緩衝通道的一個重要用處就是工人池的實現。一個工人池就是一些等待任務的執行緒集合,一旦他們完成了給他們分配的任務,那就進入可用狀態等待下一個任務到來。

我們通過緩衝通道來實現工人池.下面的例子我們會建立一個工人池來實現統計輸入數值的數字和。例如輸入234.輸入就是2+3+4.下面是一些關於我們工人池的核心功能

  • 建立一個協程池用來監聽得到job的輸入緩衝通道
  • 往輸入的緩衝通道中增加任務
  • 當job完成時寫入結果到輸出的緩衝通道
  • 從輸出緩衝通道中讀取和列印結果
我們會一步步寫出這個例子,以便理解。第一步先建立job和result的結構體
type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

每一個job結構體擁有一個id和隨機值randomno,result擁有一個job元素和統計各位數的和sunofdigits元素值。

下一步我們來建立接收job和輸出result的緩衝通道

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)  

工人協程會監聽jobs緩衝通道上到來的任務,一旦任務完成就把結果寫入到results的緩衝通道中。

接下來下面digits函式是統計數字的各個位之和。

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

接下來我們建立一個工人協程

func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}

上面的函式建立了一個“工人”他負責從jobs通道中讀取元素,計算結果併網results通道中寫入結果。這個方法用一個等待組wg作為引數,當所有jobs被完成時會呼叫wg的Done()方法。

接下來的crateWorkerPool函式會建立一個工人協程池。

func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

上面的函式有一個int輸入,指定要建立多少工人。函式中呼叫wg.Add(1)作為協程增加的計數器,之後把wg作為入參傳給worker函式,當完成建立所需要的工人協程之後,就呼叫wg.Wait()進入阻塞狀態。當所有的協程完成執行之後,它會關閉results通道。

現在我們的工人池已經準備好了讓我們從頭寫一個分配工作給工人的函式

func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

上面的allocate函式需要一個int作為輸入引數指定一共要分發給工人們的工作總數。randomno為一個最大值為998的隨機數,通過迴圈建立Job結構體並寫入到jobs通道中,當寫入工作全部結束後就關閉jobs通道。

接下來建立讀取和輸出結果的函式

func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

result函式從result通道中讀取出結果值並列印job的id、隨機數和統計隨機數各位數之和。函式中通過寫入done通道值來表示全部結果已經列印完畢。

最後來main函式:

func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

函式開始先進行儲存了開始執行的時間和最後結束的時間用來計算程式執行的時間差, noOfJobs設定為100,allocate被呼叫新增jobs到通道。done通道用來獲取結果輸出完畢的狀態。noOfWorkers作為createWorkerPool函式的入參,用來指定建立多少個工人協程。下面給出完整程式碼:

package main


import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)


type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}


var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)


func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

程式列印

Job id 1, input random no 636, sum of digits 15  
Job id 0, input random no 878, sum of digits 23  
Job id 9, input random no 150, sum of digits 6  
...
total time taken  20.01081009 seconds  

一共100行的輸出記過會被列印,現在如果把main函式中的noOfWorkers值給為20,會發現total time差不多會降低一半,原因當然是工人池中的工人數量增加了一倍,從jobs協程中讀取結果的效率也就多了一倍。你可以自行修改noOfJobs和boOfWorkers的值來檢視不同的效果。

這一節就到這裡,希望你有愉快的一天!