Golang多協程併發工作池
本文一個簡單的golang工作池,僅供交流和學習使用。golang工作池的作用是可以限制goroutine的啟動數量。
//NewTask是放到工作池當中執行的函式。使用的時候需要先例項化他 w := pool.NewTask(func() error { fmt.Println(time.Now());return nil }) //例項化工作池 p := pool.NewPool(3) //這裡啟用另外一個goroutine向worker當中寫入,不然會出現all goroutines are asleep,需要從管道中獲得一個數據,而這個資料必須是其他goroutine線放入管道的 go func() { for { p.Worker <- w //把需要執行的函式依次放入工作池。 } }() p.Run()
為什麼需要工作池?
一般情況下,goroutine在作業系統上只要你的硬體資源夠它是可以無限啟動的。但是如果出現大規模的啟動goroutine的情況會造成大量佔用系統資源,我們知道普通的部署一個golang應用的時候作業系統不僅僅會執行golang程式還有其他輔助的程式執行,所以理論上講工作池的目的就是為了限制golang的啟動數量,保證不會出現硬體計算資源溢位的情況。
實際我們真的需要工作池嗎?
理論上來講,我們其實不需要在golang層面設定工作池的。如果是網路請求大部分時候我們會使用nginx或者其他閘道器,中介軟體作為golang程式的代理,我們可以在請求訪問流量進入到golang程式之前使用nginx或者其他中介軟體限制流量或者使用熔斷機制來保證我們的golang程式不會開滿goroutine造成硬體計算資源溢位的情況。 如果流量小完全沒必要限制goroutine 如果流量大沒有熔斷機制對整個服務都是很危險的。
工作池效能的討論
我在編寫自己的工作池之前使用了很多其他的工作池,我發現工作池本身並不會提升執行效率,反而會拖慢效率,使用工作池程式執行時間大概是原來的2/3,但是有一些pool會在一定程度上節省記憶體,比如ants,但記憶體的節省我覺得更多的是在於資料結構的複用,沒有大量的重複建立資料物件導致的記憶體節省。
goroutine是否需要像其他程式語言一樣使用I/O多路複用?
根據我在網上可以搜尋到的資料,以及我自己的理解,golang本身是有一個pool用來複用goroutine的,所以我們並不需要自己再去實現一個多路複用的功能,反而會拖慢程式。
其他
以上論調並沒有嚴謹的驗證過,只是個人遇到的情況分享,希望和大家共同討論學習,共同進步。
工作池程式碼如下
package pool import "sync" //建立worker,每一個worker抽象成一個可以執行任務的函式 type Worker struct { f func() error } //通過NewTask來建立一個worker func NewTask(f func() error)*Worker { return &Worker{ f:f, } } //執行worker func (t *Worker) Run(wg *sync.WaitGroup){ t.f() //減少waitGroup計數器的值 wg.Done() } //池 type Pool struct { //這個*Worker指標切片用來接受任務,方便外部呼叫,減少channel異常的問題,這裡會整個切片一起提交 //Workers []*Worker //這裡的Worker是一個管道,用來接受其他go程帶來的資料,實時執行,無限等待資料迴圈,這裡使用另外一個管道還可以隱藏wg的操作。讓外部程式使用更方便一些。 Worker chan *Worker //size用來表明池的大小,不能超發。 size int //jobs表示執行任務的通道用於作為佇列,我們將任務從切片當中取出來,然後存放到通道當中,再從通道當中取出任務並執行。 Jobs chan *Worker //用於阻塞 wg sync.WaitGroup } //例項化工作池使用 func NewPool(cap int) *Pool { return &Pool{ //Workers:tasks, Worker:make(chan *Worker), size: cap, Jobs:make(chan *Worker), } } //從jobs當中取出任務並執行。 func (p *Pool) work(){ for task := range p.Jobs{ task.Run(&p.wg) } } //執行工作池當中的任務 func (p *Pool) Run(){ //只啟動有限大小的協程,協程的數量不可以超過工作池設定的數量,防止計算資源崩潰 for i:=0;i<p.size;i++{ go p.work() } //從worker切片當中把任務取出 for task := range p.Worker{ p.wg.Add(1) p.Jobs <- task } //執行完畢就需要關閉jobs close(p.Jobs) //執行的過程需要阻塞直到有空閒的goroutine可用 p.wg.Wait() }