1. 對建立的gorouting負載

1.1 不要建立一個你不知道何時退出的 goroutine

下面的程式碼有什麼問題? 是不是在我們的程式種經常寫類似的程式碼?


  1. // Week03/blog/01/01.go
  2. package main
  3. import (
  4. "log"
  5. "net/http"
  6. _ "net/http/pprof"
  7. )
  8. // 初始化函式
  9. func setup() {
  10. // 這裡面有一些初始化的操作
  11. }
  12. // 入口函式
  13. func main() {
  14. setup()
  15. // 主服務
  16. server()
  17. // for debug
  18. pprof()
  19. select {}
  20. }
  21. // http api server
  22. func server() {
  23. go func() {
  24. mux := http.NewServeMux()
  25. mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
  26. w.Write([]byte("pong"))
  27. })
  28. // 主服務
  29. if err := http.ListenAndServe(":8080", mux); err != nil {
  30. log.Panicf("http server err: %+v", err)
  31. return
  32. }
  33. }()
  34. }
  35. // 輔助服務,用來debug效能測試
  36. func pprof() {
  37. // 輔助服務,監聽了其他埠,這裡是 pprof 服務,用於 debug
  38. go http.ListenAndServe(":8081", nil)
  39. }

以上程式碼有幾個問題,是否想到過?

  1. 如果server 是在其他的包裡面, 如果沒有特殊的說明, 呼叫者是否知道這是一個非同步呼叫?
  2. main 函式種,最後使用select {} 使整個程式處於阻塞狀態,也就是空轉, 會不會存在浪費?
  3. 如果線上出現事故,debug服務已經突出,你想要debug這時是否很茫然?
  4. 如果某一天服務突然重啟, 你卻找不到事故日誌, 是否能想到起的這個8801埠的服務呢?

1.1.1 不要幫別人做選擇

把是否 併發 的選擇權交給你的呼叫者,而不是自己就直接悄悄的用上了 goroutine

下面做如下改變,將兩個函式是否併發操作的選擇權留給main函式


  1. package main
  2. import (
  3. "log"
  4. "net/http"
  5. _ "net/http/pprof"
  6. )
  7. func setup(){
  8. // 初始化操作
  9. }
  10. func main(){
  11. setup()
  12. // for debug
  13. go pprof()
  14. // 主服務,http api
  15. go server()
  16. select{}
  17. }
  18. func server(){
  19. mux := http.NewServerMux()
  20. mux.HandleFunc("ping", func(w http.ResponseWriter, r * http.Request){
  21. w.Write([]byte("pong"))
  22. }
  23. // 主服務
  24. if err := http.ListerAndServer(":8080",mux); err != nil{
  25. log.panic("http server launch error: %v", err)
  26. return
  27. }
  28. }
  29. func pprof(){
  30. // 輔助服務 監聽其他埠,這裡是pprof服務,擁有debug
  31. http.ListerAndServer(":8081",nil)
  32. }

1.1.2 不要作為一個旁觀者

一般情況下,不要讓 主程序稱為一個無所事事的旁觀者, 明明可以幹活,但是最後使用一個select在那兒空跑,而且這種看著也怪,在沒有特殊場景下儘量不要使用這種阻塞的方式


  1. package main
  2. import (
  3. "log"
  4. "net/http"
  5. _ "net/http/pprof"
  6. )
  7. func setup() {
  8. // 這裡面有一些初始化的操作
  9. }
  10. func main() {
  11. setup()
  12. // for debug
  13. go pprof()
  14. // 主服務, http本來就是一個阻塞的服務
  15. server()
  16. }
  17. func server() {
  18. mux := http.NewServeMux()
  19. mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
  20. w.Write([]byte("pong"))
  21. })
  22. // 主服務
  23. if err := http.ListenAndServe(":8080", mux); err != nil {
  24. log.Panicf("http server err: %+v", err)
  25. return
  26. }
  27. }
  28. func pprof() {
  29. // 輔助服務,監聽了其他埠,這裡是 pprof 服務,用於 debug
  30. http.ListenAndServe(":8081", nil)
  31. }

1.1.3 不要建立不知道什麼時候退出的 goroutine

很多時候我們在建立一個 協程(goroutine)後就放任不管了,如果程式永遠執行下去,可能不會有什麼問題,但實際情況並非如此, 我們的產品需要迭代,需要修復bug,需要不停進行構建,釋出, 所以當程式退出後(主程式),執行的某些子程式並不會完全退出,比如這個 pprof, 他自身本來就是一個後臺服務,但是當 main退出後,實際 pprof這個服務並不會退出,這樣 pprof就會稱為一個孤魂野鬼,稱為一個 zombie, 導致goroutine洩漏。

所以再一次對程式進行修改, 保證 goroutine能正常退出


  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. _ "net/http/pprof"
  8. "time"
  9. )
  10. func setup() {
  11. // 這裡面有一些初始化的操作
  12. }
  13. func main() {
  14. setup()
  15. // 用於監聽服務退出, 這裡使用了兩個 goroutine,所以 cap 為2
  16. done := make(chan error, 2)
  17. // 無緩衝的通道,用於控制服務退出,傳入同一個 stop,做到只要有一個服務退出了那麼另外一個服務也會隨之退出
  18. stop := make(chan struct{}, 0)
  19. // for debug
  20. go func() {
  21. // pprof 傳遞一個 channel
  22. fmt.Println("pprof start...")
  23. done <- pprof(stop)
  24. fmt.Printf("err1:%v\n", done)
  25. }()
  26. // 主服務
  27. go func() {
  28. fmt.Println("app start...")
  29. done <- app(stop)
  30. fmt.Printf("err2:%v\n", done)
  31. }()
  32. // stopped 用於判斷當前 stop 的狀態
  33. var stopped bool
  34. // 這裡迴圈讀取 done 這個 channel
  35. // 只要有一個退出了,我們就關閉 stop channel
  36. for i := 0; i < cap(done); i++ {
  37. // 對於有緩衝的chan, chan中無值會一直處於阻塞狀態
  38. // 對於app 服務會一直阻塞狀態,不會有 資料寫入到done 通道,只有在5s後,模擬的 pprof會有err寫入chan,此時才會觸發以下邏輯
  39. if err := <-done; err != nil {
  40. log.Printf("server exit err: %+v", err)
  41. }
  42. if !stopped {
  43. stopped = true
  44. // 通過關閉 無緩衝的channel 來通知所有的 讀 stop相關的goroutine退出
  45. close(stop)
  46. }
  47. }
  48. }
  49. // http 服務
  50. func app(stop <-chan struct{}) error {
  51. mux := http.NewServeMux()
  52. mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
  53. w.Write([]byte("pong"))
  54. })
  55. return server(mux, ":8080", stop)
  56. }
  57. func pprof(stop <-chan struct{}) error {
  58. // 注意這裡主要是為了模擬服務意外退出,用於驗證一個服務退出,其他服務同時退出的場景
  59. // 因為這裡沒有返回err, 所以done chan中無法接收到值, 主程式中會一直阻塞住
  60. go func() {
  61. server(http.DefaultServeMux, ":8081", stop)
  62. }()
  63. time.Sleep(5 * time.Second)
  64. // 模擬出錯
  65. return fmt.Errorf("mock pprof exit")
  66. }
  67. // 啟動一個服務
  68. func server(handler http.Handler, addr string, stop <-chan struct{}) error {
  69. s := http.Server{
  70. Handler: handler,
  71. Addr: addr,
  72. }
  73. // 這個 goroutine 控制退出,因為 stop channel 只要close或者是寫入資料,這裡就會退出
  74. go func() {
  75. // 無緩衝channel等待,寫入或者關閉
  76. <-stop
  77. log.Printf("server will exiting, addr: %s", addr)
  78. // 此時 httpApi 服務就會優雅的退出
  79. s.Shutdown(context.Background())
  80. }()
  81. // 沒有觸發異常的話,會一直處於阻塞
  82. return s.ListenAndServe()
  83. }

檢視以下執行結果

  1. D:\gopath\controlGoExit>go run demo.go
  2. app start...
  3. pprof start...
  4. err1:0xc00004c720
  5. 2021/09/12 22:48:37 server exit err: mock pprof exit
  6. 2021/09/12 22:48:37 server will exiting, addr: :8080
  7. 2021/09/12 22:48:37 server will exiting, addr: :8081
  8. err2:0xc00004c720
  9. 2021/09/12 22:48:37 server exit err: http: Server closed

雖然我們已經經過了三輪優化,但是這裡還是有一些需要注意的地方:

  1. 雖然我們呼叫了 Shutdown 方法,但是我們其實並沒有實現優雅退出
  2. 在 server 方法中我們並沒有處理 panic的邏輯,這裡需要處理麼?如果需要那該如何處理呢?

1.1.4 不要建立都無法退出的 goroutine

永遠無法退出的 goroutine, 即 goroutine 洩漏

下面是一個例子,可能在不知不覺中會用到


  1. package main
  2. import (
  3. "log"
  4. _ "net/http/pprof"
  5. "net/http"
  6. )
  7. func setup() {
  8. // 這裡面有一些初始化的操作
  9. log.Print("服務啟動初始化...")
  10. }
  11. func main() {
  12. setup()
  13. // for debug
  14. go pprof()
  15. // 主服務, http本來就是一個阻塞的服務
  16. server()
  17. }
  18. func server() {
  19. mux := http.NewServeMux()
  20. mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
  21. w.Write([]byte("pong"))
  22. })
  23. mux.HandleFunc("/leak", LeakHandle)
  24. // 主服務
  25. if err := http.ListenAndServe(":8080", mux); err != nil {
  26. log.Panicf("http server err: %+v", err)
  27. return
  28. }
  29. }
  30. func pprof() {
  31. // 輔助服務,監聽了其他埠,這裡是 pprof 服務,用於 debug
  32. http.ListenAndServe(":8081", nil)
  33. }
  34. func LeakHandle(w http.ResponseWriter, r *http.Request) {
  35. ch := make(chan bool, 0)
  36. go func() {
  37. fmt.Println("非同步任務做一些操作")
  38. <-ch
  39. }()
  40. w.Write([]byte("will leak"))
  41. }

複用一下上面的 server 程式碼,我們經常會寫出這種類似的程式碼

  • http 請求來了,我們啟動一個 goroutine 去做一些耗時一點的工作
  • 然後返回了
  • 然後之前建立的那個 goroutine 阻塞了(對於一個無緩衝的chan,如果沒有接收或關閉操作會永遠阻塞下去)
  • 然後就洩漏了

絕大部分的 goroutine 洩漏都是因為 goroutine 當中因為各種原因阻塞了,我們在外面也沒有控制它退出的方式,所以就洩漏了

接下來我們驗證一下是不是真的洩漏了

服務啟動之後,訪問debug訪問網址,http://localhost:8081/debug/pprof/goroutine?debug=1.

當請求兩次 http://127.0.0.1/leak後檢視 goroutine數量,如圖

繼續請求三次後,如圖

1.1.5 確保創建出的goroutine工作已經完成

這個其實就是優雅退出的問題,程式中可能啟動了很多的 goroutine 去處理一些問題,但是服務退出的時候我們並沒有考慮到就直接退出了。例如退出前日誌沒有 flush 到磁碟,我們的請求還沒完全關閉,非同步 worker 中還有 job 在執行等等。

看一個例子,假設現在有一個埋點服務,每次請求我們都會上報一些資訊到埋點服務上

  1. // Reporter 埋點服務上報
  2. type Reporter struct {
  3. }
  4. var reporter Reporter
  5. // 模擬耗時
  6. func (r Reporter) report(data string) {
  7. time.Sleep(time.Second)
  8. fmt.Printf("report: %s\n", data)
  9. }
  10. mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
  11. // 在請求中非同步呼叫
  12. // 這裡並沒有滿足一致性
  13. go reporter.report("ping pong")
  14. fmt.Println("ping")
  15. w.Write([]byte("pong"))
  16. })

在傳送一次請後之後就直接退出了, 非同步上報的邏輯是沒有執行的

  1. $ go tun demo.go
  2. ping
  3. ^C signal:interrupt

有兩種改法:

  • 一種是給 reporter 加上 shutdown 方法,類似 http 的 shutdown,等待所有的非同步上報完成之後,再退出
  • 另外一種是我們直接使用 一些 worker 來執行,在當然這個 worker 也要實現類似 shutdown 的方法。

一般推薦後一種,因為這樣可以避免請求量比較大時,建立大量 goroutine,當然如果請求量比較小,不會很大,用第一種也是可以的。

第二種方法程式碼如下:


  1. // 埋點上報
  2. package main
  3. import (
  4. "context"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "sync"
  9. )
  10. // Reporter 埋點服務上報
  11. type Reporter struct {
  12. worker int
  13. messages chan string
  14. wg sync.WaitGroup
  15. closed chan struct{}
  16. once sync.Once
  17. }
  18. // NewReporter NewReporter
  19. func NewReporter(worker, buffer int) *Reporter {
  20. return &Reporter{
  21. worker: worker,
  22. messages: make(chan string, buffer),
  23. closed: make(chan struct{}),
  24. }
  25. }
  26. // 執行上報
  27. func (r *Reporter) Run(stop <-chan struct{}) {
  28. // 用於執行錯誤
  29. go func() {
  30. // 沒有錯誤時
  31. <-stop
  32. fmt.Println("stop...")
  33. r.shutdown()
  34. }()
  35. for i := 0; i < r.worker; i++ {
  36. r.wg.Add(1)
  37. go func() {
  38. defer r.wg.Done()
  39. for {
  40. select {
  41. case <-r.closed:
  42. return
  43. case msg := <-r.messages:
  44. fmt.Printf("report: %s\n", msg)
  45. }
  46. }
  47. }()
  48. }
  49. r.wg.Wait()
  50. fmt.Println("report workers exit...")
  51. }
  52. // 這裡不必關閉 messages
  53. // 因為 closed 關閉之後,傳送端會直接丟棄資料不再發送
  54. // Run 方法中的消費者也會退出
  55. // Run 方法會隨之退出
  56. func (r *Reporter) shutdown() {
  57. r.once.Do(func() { close(r.closed) })
  58. }
  59. // 模擬耗時
  60. func (r *Reporter) Report(data string) {
  61. // 這個是為了及早退出
  62. // 並且為了避免我們消費者能力很強,傳送者這邊一直不阻塞,可能還會一直寫資料
  63. select {
  64. case <-r.closed:
  65. fmt.Printf("reporter is closed, data will be discarded: %s \n", data)
  66. default:
  67. }
  68. select {
  69. case <-r.closed:
  70. fmt.Printf("reporter is closed, data will be discarded: %s \n", data)
  71. case r.messages <- data:
  72. }
  73. }
  74. func setup3() {
  75. // 初始化一些操作
  76. fmt.Println("程式啟動...")
  77. }
  78. func main() {
  79. setup3()
  80. // 用於監聽服務完成時退出
  81. done := make(chan error, 3)
  82. // 例項化一個 reporter
  83. reporter := NewReporter(2, 100)
  84. // 用於控制服務退出,傳入同一個 stop,做到只要有一個服務退出了那麼另外一個服務也會隨之退出
  85. stop := make(chan struct{}, 0)
  86. // for debug
  87. go func() {
  88. done <- pprof3(stop)
  89. }()
  90. // http主服務
  91. go func() {
  92. done <- app3(reporter, stop)
  93. }()
  94. // 上報服務,接收一個監控停止的 chan
  95. go func() {
  96. reporter.Run(stop)
  97. done <- nil
  98. }()
  99. // 這裡迴圈讀取 done 這個 channel
  100. // 只要有一個退出了,我們就關閉 stop channel
  101. for i := 0; i < cap(done); i++ {
  102. // 對於有緩衝的chan, chan中無值會一直處於阻塞狀態
  103. // 對於app 服務會一直阻塞狀態,不會有 資料寫入到done 通道,只有在5s後,模擬的 pprof會有err寫入chan,此時才會觸發以下邏輯
  104. if err := <-done; err != nil {
  105. log.Printf("server exit err: %+v", err)
  106. }
  107. // 通過關閉 無緩衝的channel 來通知所有的 讀 stop相關的goroutine退出
  108. close(stop)
  109. }
  110. }
  111. func pprof3(stop <-chan struct{}) error {
  112. // 輔助服務,監聽了其他埠,這裡是 pprof 服務,用於 debug
  113. err := server3(http.DefaultServeMux, ":8081", stop)
  114. return err
  115. }
  116. func app3(report *Reporter, stop <-chan struct{}) error {
  117. mux := http.NewServeMux()
  118. mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
  119. // 在請求中非同步呼叫
  120. // 這裡並沒有滿足一致性
  121. go report.Report("ping pong")
  122. fmt.Println("ping")
  123. _, err := w.Write([]byte("pong"))
  124. if err != nil {
  125. log.Println("response err")
  126. }
  127. })
  128. return server3(mux, ":8080", stop)
  129. }
  130. // 啟動一個服務
  131. func server3(handler http.Handler, addr string, stop <-chan struct{}) error {
  132. s := http.Server{
  133. Handler: handler,
  134. Addr: addr,
  135. }
  136. // 這個 goroutine 控制退出,因為 stop channel 只要close 或者是寫入資料,這裡就會退出
  137. go func() {
  138. // 無緩衝channel等待,寫入或者關閉
  139. <-stop
  140. log.Printf("server will exiting, addr: %s", addr)
  141. // 此時 httpApi 服務就會優雅的退出
  142. err := s.Shutdown(context.Background())
  143. if err != nil {
  144. log.Printf("server exiting occur error, %s", err.Error())
  145. }
  146. }()
  147. // 沒有觸發異常的化,會一直處於阻塞
  148. return s.ListenAndServe()
  149. }
  • 上面程式碼應該還有問題,等日後再做優化

第一種方法參考:reporter 新增shutdown方法

2. 總結

在使用go語言初期, 使用一個go關鍵字輕鬆開啟一個非同步協程,再加上chan很容易實現 生產者---》消費者 設計模型,但是在使用過程中往往忽略了 程式退出時資源回收的問題,也很容易寫成一個數據使用一個go來處理,雖然官方說明了 建立一個goroutine的佔用資源很小,但是再小的 佔用空間也敵不過一個死迴圈啊。 所以在使用gorouine建立協程除了注意正確規定執行緒數以為,也要注意以下幾點。

  1. 將是否非同步呼叫的選擇泉交給呼叫者, 不然很有可能使用者不知道所呼叫的函式立使用了go

  2. 如果要啟動一個goroutine, 要對他負責

    • 不用啟動一個無法控制他退出或者無法知道何時退出的goroutine
    • 啟動goroutine時加上 panic recovery機制,避免服務直接不可用,可以使用如下程式碼
    1. // DeferRecover defer recover from panic.
    2. func DeferRecover(tag string, handlePanic func(error)) func() {
    3. return func() {
    4. if err := recover(); err != nil {
    5. log.Errorf("%s, recover from: %v\n%s\n", tag, err, debug.Stack())
    6. if handlePanic != nil {
    7. handlePanic(fmt.Errorf("%v", err))
    8. }
    9. }
    10. }
    11. }
    12. // WithRecover recover from panic.
    13. func WithRecover(tag string, f func(), handlePanic func(error)) {
    14. defer DeferRecover(tag, handlePanic)()
    15. f()
    16. }
    17. // Go is a wrapper of goroutine with recover.
    18. func Go(name string, f func(), handlePanic func(error)) {
    19. go WithRecover(fmt.Sprintf("goroutine %s", name), f, handlePanic)
    20. }
    • 造成 goroutine 洩漏的主要原因就是 goroutine 中造成了阻塞,並且沒有外部手段控制它退出
  3. 儘量避免在請求中直接啟動 goroutine 來處理問題,而應該通過啟動 worker 來進行消費,這樣可以避免由於請求量過大,而導致大量建立 goroutine 從而導致 oom,當然如果請求量本身非常小,那當我沒說

3. 參考

  1. https://dave.cheney.net/practical-go/presentations/qcon-china.html
  2. https://lailin.xyz/post/go-training-week3-goroutine.html#總結
  3. https://www.ardanlabs.com/blog/2019/04/concurrency-trap-2-incomplete-work.html
  4. https://www.ardanlabs.com/blog/2014/01/concurrency-goroutines-and-gomaxprocs.html