Golang百萬級高並發實例
阿新 • • 發佈:2019-01-16
minute 一個 ase 效果 tin n) 無限 chan 會有 前言
特別註意:Go語言中的map不是並發安全的,要想實現並發安全,需要自己實現(如加鎖),或者使用sync.Map。
感謝Handling 1 Million Requests per Minute with Go 這篇文章給予的巨大啟發。
基礎
我們使用Go語言,基本上是因為他原生支持的高並發:Goroutine 和 Channel;
Go 的並發屬於 CSP 並發模型的一種實現;
CSP 並發模型的核心概念是:“不要通過共享內存來通信,而應該通過通信來共享內存”。
簡單用法
我一開始學習Go語言的時候,遇到大訪問量的時候,會先創建一個帶緩沖的channel,然後起一個Go協程來逐個讀取channel中的數據並處理。
說他是並發是因為他沒有占用主線程,而是另起了一個協程獨自運行。但是這沒有實現請求之間的並發。
package main import ( "fmt" "runtime" "time" ) func main(){ //這裏我們假設數據是int類型,緩存格式設為100 dataChan:=make(chan int,100) go func(){ for{ select{ case data:=<-dataChan: fmt.Println("data:",data) time.Sleep(1 * time.Second)//這裏延遲是模擬處理數據的耗時 } } }() //填充數據 for i:=0;i<100;i++{ dataChan<-i } //這裏循環打印查看協程個數 for { fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine()) time.Sleep(2 * time.Second) } }
這裏打印出來的協程個數時2,為什麽? 因為main方法獨占一個主協程,我們又起了一個協程,所以是兩個。
實現百萬級的並發
首先我們要抽象出幾個概念:
Job: type Job interface { Do() } // 一個數據接口,所有的數據都要實現該接口,才能被傳遞進來 //實現Job接口的一個數據實例,需要實現一個Do()方法,對數據的處理就在這個Do()方法中。 Job通道: 這裏有兩個Job通道: 1、WorkerPool的Job channel,用於調用者把具體的數據寫入到這裏,WorkerPool讀取。 2、Worker的Job channel,當WorkerPool讀取到Job,並拿到可用的Worker的時候,會將Job實例寫入該Worker的Job channel,用來直接執行Do()方法。 Worker: type Worker struct { JobQueue chan Job //Worker的Job通道 } //每一個被初始化的worker都會在後期單獨占用一個協程 //初始化的時候會先把自己的JobQueue傳遞到Worker通道中, //然後阻塞讀取自己的JobQueue,讀到一個Job就執行Job對象的Do()方法。 工作池(WorkerPool): type WorkerPool struct { workerlen int //WorkerPool中同時 存在Worker的個數 JobQueue chan Job // WorkerPool的Job通道 WorkerQueue chan chan Job } //初始化時會按照傳入的num,啟動num個後臺協程,然後循環讀取Job通道裏面的數據, //讀到一個數據時,再獲取一個可用的Worker,並將Job對象傳遞到該Worker的chan通道
整個過程中 每個Worker都會被運行在一個協程中,在整個WorkerPool中就會有num可空閑的Worker,當來一條數據的時候,就會在工作池中去一個空閑的Worker去執行該Job,當工作池中沒有可用的worker時,就會阻塞等待一個空閑的worker。
這是一個粗糙最簡單的版本,只是為了演示效果,具體使用需要根據實際情況加一些特殊的處理。
當數據無限多的時候func (wp *WorkerPool) Run() 會無限創建協程,這裏需要做一些處理,這裏是為了讓所有的請求不等待,並且體現一下最大峰值時的協程數。具體因項目而異。
代碼地址:https://github.com/wangzhen0625/gonote/tree/master/7goroutune
main.go
package main
import (
"fmt"
"runtime"
"time"
)
type Score struct {
Num int
}
func (s *Score) Do() {
fmt.Println("num:", s.Num)
time.Sleep(1 * 1 * time.Second)
}
func main() {
num := 100 * 100 * 20
// debug.SetMaxThreads(num + 1000) //設置最大線程數
// 註冊工作池,傳入任務
// 參數1 worker並發個數
p := NewWorkerPool(num)
p.Run()
datanum := 100 * 100 * 100 * 100
go func() {
for i := 1; i <= datanum; i++ {
sc := &Score{Num: i}
p.JobQueue <- sc
}
}()
for {
fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
time.Sleep(2 * time.Second)
}
}
job.go
package main
type Job interface {
Do()
}
worker.go
package main
type Worker struct {
JobQueue chan Job
}
func NewWorker() Worker {
return Worker{JobQueue: make(chan Job)}
}
func (w Worker) Run(wq chan chan Job) {
go func() {
for {
wq <- w.JobQueue
select {
case job := <-w.JobQueue:
job.Do()
}
}
}()
}
workerpool.go
package main
import "fmt"
type WorkerPool struct {
workerlen int
JobQueue chan Job
WorkerQueue chan chan Job
}
func NewWorkerPool(workerlen int) *WorkerPool {
return &WorkerPool{
workerlen: workerlen,
JobQueue: make(chan Job),
WorkerQueue: make(chan chan Job, workerlen),
}
}
func (wp *WorkerPool) Run() {
fmt.Println("初始化worker")
//初始化worker
for i := 0; i < wp.workerlen; i++ {
worker := NewWorker()
worker.Run(wp.WorkerQueue)
}
// 循環獲取可用的worker,往worker中寫job
go func() {
for {
select {
case job := <-wp.JobQueue:
worker := <-wp.WorkerQueue
worker <- job
}
}
}()
}
Golang百萬級高並發實例