1. 程式人生 > >Go中定時器實現原理及原始碼解析

Go中定時器實現原理及原始碼解析

> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 本文使用的go的原始碼15.7,需要注意的是由於timer是1.14版本進行改版,但是1.14和1.15版本的timer並無很大區別 我在春節期間寫了一篇文章有關時間輪的:https://www.luozhiyun.com/archives/444。後來有同學建議我去看看 1.14版本之後的 timer 優化。然後我就自己就時間輪和 timer 也做了個 benchmark: ``` goos: darwin goarch: amd64 pkg: gin-test/api/main BenchmarkTimingWheel_StartStop/N-1m-12 4582120 254 ns/op 85 B/op 2 allocs/op BenchmarkTimingWheel_StartStop/N-5m-12 3356630 427 ns/op 46 B/op 1 allocs/op BenchmarkTimingWheel_StartStop/N-10m-12 2474842 483 ns/op 60 B/op 1 allocs/op BenchmarkStandardTimer_StartStop/N-1m-12 6777975 179 ns/op 84 B/op 1 allocs/op BenchmarkStandardTimer_StartStop/N-5m-12 6431217 231 ns/op 85 B/op 1 allocs/op BenchmarkStandardTimer_StartStop/N-10m-12 5374492 266 ns/op 83 B/op 1 allocs/op PASS ok gin-test/api/main 60.414s ``` 從上面可以直接看出,在添加了一千萬個定時器後,時間輪的單次呼叫時間有明顯的上漲,但是 timer 卻依然很穩。 從官方的一個數據顯示: ``` name old time/op new time/op delta AfterFunc-12 1.57ms ± 1% 0.07ms ± 1% -95.42% (p=0.000 n=10+8) After-12 1.63ms ± 3% 0.11ms ± 1% -93.54% (p=0.000 n=9+10) Stop-12 78.3µs ± 3% 73.6µs ± 3% -6.01% (p=0.000 n=9+10) SimultaneousAfterFunc-12 138µs ± 1% 111µs ± 1% -19.57% (p=0.000 n=10+9) StartStop-12 28.7µs ± 1% 31.5µs ± 5% +9.64% (p=0.000 n=10+7) Reset-12 6.78µs ± 1% 4.24µs ± 7% -37.45% (p=0.000 n=9+10) Sleep-12 183µs ± 1% 125µs ± 1% -31.67% (p=0.000 n=10+9) Ticker-12 5.40ms ± 2% 0.03ms ± 1% -99.43% (p=0.000 n=10+10) ... ``` 在很多項測試中,效能確實得到了很大的增強。下面也就一起看看效能暴漲的原因。 ## 介紹 ### 1.13 版本的 timer Go 在1.14版本之前是使用 64 個最小堆,執行時建立的所有計時器都會加入到最小堆中,每個處理器(P)建立的計時器會由對應的最小堆維護。 ![timer1.13](Go中定時器實現原理(未完)/timer1.13.png) 下面是1.13版本 `runtime.time`原始碼: ```go const timersLen = 64 var timers [timersLen]struct { timersBucket // padding, 防止false sharing pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte } // 獲取 P 對應的 Bucket func (t *timer) assignBucket() *timersBucket { id := uint8(getg().m.p.ptr().id) % timersLen t.tb = &timers[id].timersBucket return t.tb } type timersBucket struct { lock mutex gp *g created bool sleeping bool rescheduling bool sleepUntil int64 waitnote note // timer 列表 t []*timer } ``` 通過上面的 assignBucket 方法可以知道,如果當前機器上的處理器 P 的個數超過了 64,多個處理器上的計時器就可能儲存在同一個桶 timersBucket 中。 每個桶負責管理一堆這樣有序的 timer,同時每個桶都會有一個對應的 timerproc 非同步任務來負責不斷排程這些 timer。t imerproc 會從 timersBucket 不斷取堆頂元素,如果堆頂的 timer 已到期則執行,沒有任務到期則 sleep,所有任務都消耗完了,那麼呼叫 gopark 掛起,直到有新的 timer 被新增到桶中時,才會被重新喚醒。 timerproc 在 sleep 的時候會呼叫 notetsleepg ,繼而引發entersyscallblock呼叫,該方法會主動呼叫 handoffp ,解綁 M 和 P。當下一個定時時間到來時,又會進行 M 和 P 的繫結,處理器 P 和執行緒 M 之間頻繁的上下文切換也是 timer 的首要效能影響因素之一。 ### 1.14 版本後 timer 的變化 在Go 在1.14版本之後,移除了timersBucket,所有的計時器都以最小四叉堆的形式儲存 P 中。 ```go type p struct { ... // 互斥鎖 timersLock mutex // 儲存計時器的最小四叉堆 timers []*timer // 計時器數量 numTimers uint32 // 處於 timerModifiedEarlier 狀態的計時器數量 adjustTimers uint32 // 處於 timerDeleted 狀態的計時器數量 deletedTimers uint32 ... } ``` ![new_timer](Go中定時器實現原理(未完)/new_timer.png) timer 不再使用 timerproc 非同步任務來排程,而是改用排程迴圈或系統監控排程的時候進行觸發執行,減少了執行緒之間上下文切換帶來的效能損失,並且通過使用 netpoll 阻塞喚醒機制可以讓 timer 更加及時的得到執行。 ### timer的使用 `time.Timer`計時器必須通過`time.NewTimer`、`time.AfterFunc`或者 `time.After` 函式建立。 如下`time.NewTimer`: 通過定時器的欄位C,我們可以及時得知定時器到期的這個事件來臨,C是一個chan time.Time型別的緩衝通道,一旦觸及到期時間,定時器就會向自己的C欄位傳送一個time.Time型別的元素值 ```go func main() { //初始化定時器 t := time.NewTimer(2 * time.Second) //當前時間 now := time.Now() fmt.Printf("Now time : %v.\n", now) expire := <- t.C fmt.Printf("Expiration time: %v.\n", expire) } ``` `time.After`一般是配合select來使用: ```go func main() { ch1 := make(chan int, 1) select { case e1 := <-ch1: //如果ch1通道成功讀取資料,則執行該case處理語句 fmt.Printf("1th case is selected. e1=%v",e1) case <- time.After(2 * time.Second): fmt.Println("Timed out") } } ``` `time.Afterfunc`可以在設定時間過後執行一個函式: ```go func main() { f := func(){ fmt.Printf("Expiration time : %v.\n", time.Now()) } time.AfterFunc(1*time.Second, f) time.Sleep(2 * time.Second) } ``` ## 分析 ### 初始化&Timer結構體 我們先看看NewTimer方法是如何建立一個Timer的: ```go type Timer struct { C <-chan Time r runtimeTimer } func NewTimer(d Duration) *Timer { // 初始化一個channel,用於返回 c := make(chan Time, 1) t := &Timer{ C: c, r: runtimeTimer{ when: when(d), f: sendTime, arg: c, }, } // 呼叫runtime.time的startTimer方法 startTimer(&t.r) return t } func startTimer(*runtimeTimer) ``` NewTimer方法主要是初始化一個Timer,然後呼叫startTimer方法,並返回Timer。startTimer方法的真正邏輯並不在time包裡面,我們可以使用到上一節提到的使用dlv除錯彙編程式碼: ``` sleep.go:94 0xd8ea09 e872c7faff call $time.startTimer ``` 得知startTimer實際上呼叫的是`runtime.time.startTimer`方法。也就是說`time.Timer`只是對runtime包中timer的一層wrap。這層自身實現的最核心功能是將底層的超時回撥轉換為傳送channel訊息。 下面我們看看`runtime.startTimer`: ```go func startTimer(t *timer) { ... addtimer(t) } ``` startTimer方法會將傳入的runtimeTimer轉為timer,然後呼叫addtimer方法。 在NewTimer方法中會初始化一個runtimeTimer結構體,這個結構體實際上會被當做`runtime.time`中的timer結構體傳入到startTimer方法中,所以下面我們來看看timer: ```go type timer struct { // 對應處理器P的指標 pp puintptr // 定時器被喚醒的時間 when int64 // 喚醒的間隔時間 period int64 // 喚醒時被呼叫的函式 f func(interface{}, uintptr) // 被呼叫的函式的引數 arg interface{} seq uintptr // 處於timerModifiedXX狀態時用於設定when欄位 nextwhen int64 // 定時器的狀態 status uint32 } ``` 除此之外,timer還有一些標誌位來表示 status 狀態: ```go const ( // 初始化狀態 timerNoStatus = iota // 等待被呼叫 // timer 已在 P 的列表中 timerWaiting // 表示 timer 在執行中 timerRunning // timer 已被刪除 timerDeleted // timer 正在被移除 timerRemoving // timer 已被移除,並停止執行 timerRemoved // timer 被修改了 timerModifying // 被修改到了更早的時間 timerModifiedEarlier // 被修改到了更晚的時間 timerModifiedLater // 已經被修改,並且正在被移動 timerMoving ) ``` ### addtimer 新增 timer **runtime.addtimer** ```go func addtimer(t *timer) { // 定時器被喚醒的時間的時間不能為負數 if t.when < 0 { t.when = maxWhen } // 狀態必須為初始化 if t.status != timerNoStatus { throw("addtimer called with initialized timer") } // 設定為等待排程 t.status = timerWaiting when := t.when // 獲取當前 P pp := getg().m.p.ptr() lock(&pp.timersLock) // 清理 P 的 timer 列表頭中的 timer cleantimers(pp) // 將 timer 加入到 P 的最小堆中 doaddtimer(pp, t) unlock(&pp.timersLock) // 喚醒 netpoller 中休眠的執行緒 wakeNetPoller(when) } ``` 1. addtimer 會對 timer 被喚醒的時間 when 進行校驗,以及校驗 status 必須是新出初始化的 timer; 2. 接著會在加鎖後呼叫 cleantimers 對 P 中對應的 timer 列表的頭節點進行清理工作,清理完後呼叫 doaddtimer 將 timer 加入到 P 的最小堆中,並釋放鎖; 3. 呼叫 wakeNetPoller 喚醒 netpoller 中休眠的執行緒。 下面分別來看看 addtimer 中幾個重要函式的具體實現: **runtime.cleantimers** ```go func cleantimers(pp *p) { gp := getg() for { // 排程器列表為空,直接返回 if len(pp.timers) == 0 { return } // 如果當前 G 被搶佔了,直接返回 if gp.preemptStop { return } // 獲取第一個 timer t := pp.timers[0] if t.pp.ptr() != pp { throw("cleantimers: bad p") } switch s := atomic.Load(&t.status); s { case timerDeleted: // 設定 timer 的狀態 if !atomic.Cas(&t.status, s, timerRemoving) { continue } // 刪除第一個 timer dodeltimer0(pp) // 刪除完畢後重置狀態為 timerRemoved if !atomic.Cas(&t.status, timerRemoving, timerRemoved) { badTimer() } atomic.Xadd(&pp.deletedTimers, -1) // timer 被修改到了更早或更晚的時間 case timerModifiedEarlier, timerModifiedLater: // 將 timer 狀態設定為 timerMoving if !atomic.Cas(&t.status, s, timerMoving) { continue } // 重新設定 when 欄位 t.when = t.nextwhen // 在列表中刪除後重新加入 dodeltimer0(pp) doaddtimer(pp, t) if s == timerModifiedEarlier { atomic.Xadd(&pp.adjustTimers, -1) } // 設定狀態為 timerWaiting if !atomic.Cas(&t.status, timerMoving, timerWaiting) { badTimer() } default: return } } } ``` cleantimers 函式中使用了一個無限迴圈來獲取頭節點。如果頭節點的狀態是 timerDeleted ,那麼需要從 timer 列表中刪除;如果頭節點的狀態是 timerModifiedEarlier 或 timerModifiedLater ,表示頭節點的觸發的時間被修改到了更早或更晚的時間,那麼就先從 timer佇列中刪除再重新新增。 **runtime.doaddtimer** ```go func doaddtimer(pp *p, t *timer) { // Timers 依賴於 netpoller // 所以如果 netpoller 沒有啟動,需要啟動一下 if netpollInited == 0 { netpollGenericInit() } // 校驗是否早已在 timer 列表中 if t.pp != 0 { throw("doaddtimer: P already set in timer") } // 設定 timer 與 P 的關聯 t.pp.set(pp) i := len(pp.timers) // 將 timer 加入到 P 的 timer 列表中 pp.timers = append(pp.timers, t) // 維護 timer 在 最小堆中的位置 siftupTimer(pp.timers, i) // 如果 timer 是列表中頭節點,需要設定一下 timer0When if t == pp.timers[0] { atomic.Store64(&pp.timer0When, uint64(t.when)) } atomic.Xadd(&pp.numTimers, 1) } ``` doaddtimer 函式實際上很簡單,主要是將 timer 與 P 設定關聯關係,並將 timer 加入到 P 的 timer 列表中,並維護 timer 列表最小堆的順序。 **runtime.wakeNetPoller** ```go func wakeNetPoller(when int64) { if atomic.Load64(&sched.lastpoll) == 0 { pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) // 如果計時器的觸發時間小於netpoller的下一次輪詢時間 if pollerPollUntil == 0 || pollerPollUntil >
when { // 向netpollBreakWr裡面寫入資料,立即中斷netpoll netpollBreak() } } } func netpollBreak() { if atomic.Cas(&netpollWakeSig, 0, 1) { for { var b byte // 向 netpollBreakWr 裡面寫入資料 n := write(netpollBreakWr, unsafe.Pointer(&b), 1) if n == 1 { break } if n == -_EINTR { continue } if n == -_EAGAIN { return } println("runtime: netpollBreak write failed with", -n) throw("runtime: netpollBreak write failed") } } } ``` wakeNetPoller 主要是將 timer 下次排程的時間和 netpoller 的下一次輪詢時間相比,如果小於的話,呼叫 netpollBreak 向 netpollBreakWr 裡面寫入資料,立即中斷netpoll。具體如何中斷的,我們下面再聊。 ### stopTimer 終止 timer 終止 timer 的邏輯主要是 timer 的狀態的變更: 如果該timer處於 timerWaiting 或 timerModifiedLater 或 timerModifiedEarlier: * timerModifying ->
timerDeleted 如果該timer處於 其他狀態: * 待狀態改變或者直接返回 所以在終止 timer 的過程中不會去刪除 timer,而是標記一個狀態,等待被刪除。 ### modTimer 修改 timer ```go func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool { if when < 0 { when = maxWhen } status := uint32(timerNoStatus) wasRemoved := false var pending bool var mp *m loop: for { // 修改 timer 狀態 switch status = atomic.Load(&t.status); status { ... } t.period = period t.f = f t.arg = arg t.seq = seq // 如果 timer 已被刪除,那麼需要重新新增到 timer 列表中 if wasRemoved { t.when = when pp := getg().m.p.ptr() lock(&pp.timersLock) doaddtimer(pp, t) unlock(&pp.timersLock) if !atomic.Cas(&t.status, timerModifying, timerWaiting) { badTimer() } releasem(mp) wakeNetPoller(when) } else { t.nextwhen = when newStatus := uint32(timerModifiedLater) // 如果修改後的時間小於修改前的時間,將狀態設定為 timerModifiedEarlier if when < t.when { newStatus = timerModifiedEarlier } ... if !atomic.Cas(&t.status, timerModifying, newStatus) { badTimer() } releasem(mp) // 如果修改時間提前,那麼觸發 netpoll 中斷 if newStatus == timerModifiedEarlier { wakeNetPoller(when) } } return pending } ``` modtimer 進入到 for 迴圈後會根據不同的狀態做狀態設定以及必要欄位的處理;如果是 timer 已被刪除,那麼需要重新新增到 timer 列表中;如果 timer 修改後的時間小於修改前的時間,將狀態設定為 timerModifiedEarlier,修改時間提前,還需要觸發 netpoll 中斷。 ### timer 的執行 聊完了如何新增 timer,下面我們來看看 timer 是如何執行的。timer 的執行是交給 `runtime.runtimer`函式執行的,這個函式會檢查 P 上最小堆的最頂上的 timer 的狀態,根據狀態做不同的處理。 ```go func runtimer(pp *p, now int64) int64 { for { // 獲取最小堆的第一個元素 t := pp.timers[0] if t.pp.ptr() != pp { throw("runtimer: bad p") } // 獲取 timer 狀態 switch s := atomic.Load(&t.status); s { // timerWaiting case timerWaiting: // 還沒到時間,返回下次執行時間 if t.when > now { // Not ready to run. return t.when } // 修改狀態為 timerRunning if !atomic.Cas(&t.status, s, timerRunning) { continue } // 執行該 timer runOneTimer(pp, t, now) return 0 // timerDeleted case timerDeleted: if !atomic.Cas(&t.status, s, timerRemoving) { continue } // 刪除最小堆的第一個 timer dodeltimer0(pp) if !atomic.Cas(&t.status, timerRemoving, timerRemoved) { badTimer() } atomic.Xadd(&pp.deletedTimers, -1) if len(pp.timers) == 0 { return -1 } // 需要重新移動位置的 timer case timerModifiedEarlier, timerModifiedLater: if !atomic.Cas(&t.status, s, timerMoving) { continue } t.when = t.nextwhen // 刪除最小堆的第一個 timer dodeltimer0(pp) // 將該 timer 重新新增到最小堆 doaddtimer(pp, t) if s == timerModifiedEarlier { atomic.Xadd(&pp.adjustTimers, -1) } if !atomic.Cas(&t.status, timerMoving, timerWaiting) { badTimer() } case timerModifying: osyield() case timerNoStatus, timerRemoved: badTimer() case timerRunning, timerRemoving, timerMoving: badTimer() default: badTimer() } } } ``` runtimer 裡面會啟動一個 for 迴圈,不停的檢查 P 的 timer 列表的第一個元素的狀態。 * 如果該 timer 處於 timerWaiting,那麼判斷當前的時間大於 timer 執行的時間,則呼叫 runOneTimer 執行; * 如果該 timer 處於 timerDeleted,表示該 timer 是需要被刪除的,那麼呼叫 dodeltimer0 刪除最小堆的第一個 timer ,並修改其狀態; * 如果該 timer 狀態是 timerModifiedEarlier 、timerModifiedLater,那麼表示該 timer 的執行時間被修改過,需要重新調整它在最小堆中的位置,所以先呼叫 dodeltimer0 刪除該 timer,再呼叫 doaddtimer 將該 timer 重新新增到最小堆。 **runtime.runOneTimer** ```go func runOneTimer(pp *p, t *timer, now int64) { ... // 需要被執行的函式 f := t.f // 被執行函式的引數 arg := t.arg seq := t.seq // 表示該 timer 為 ticker,需要再次觸發 if t.period >
0 { // 放入堆中並調整觸發時間 delta := t.when - now t.when += t.period * (1 + -delta/t.period) siftdownTimer(pp.timers, 0) if !atomic.Cas(&t.status, timerRunning, timerWaiting) { badTimer() } updateTimer0When(pp) // 一次性 timer } else { // 刪除該 timer. dodeltimer0(pp) if !atomic.Cas(&t.status, timerRunning, timerNoStatus) { badTimer() } } unlock(&pp.timersLock) // 執行該函式 f(arg, seq) lock(&pp.timersLock) ... } ``` runOneTimer 會根據 period 是否大於0判斷該 timer 是否需要反覆執行,如果是的話需要重新調整 when 下次執行時間後重新調整該 timer 在堆中的位置。一次性 timer 的話會執行 dodeltimer0 刪除該 timer ,最後執行 timer 中的函式; ### timer 的觸發 下面這裡是我覺得比較有意思的地方,timer 的觸發有兩種: * 從排程迴圈中直接觸發; * 另一種是Go語言的後臺系統監控中會定時觸發; #### 排程迴圈觸發 排程迴圈,我在這篇文章 https://www.luozhiyun.com/archives/448 已經講的很清楚了,不明白的同學可以自己再去看看。 整個排程迴圈會有三個地方去檢查是否有可執行的 timer: 1. 呼叫 `runtime.schedule` 執行排程時; 2. 呼叫`runtime.findrunnable`獲取可執行函式時; 3. 呼叫`runtime.findrunnable`執行搶佔時; **runtime.schedule** ```go func schedule() { _g_ := getg() ... top: pp := _g_.m.p.ptr() ... // 檢查是否有可執行 timer 並執行 checkTimers(pp, 0) var gp *g ... if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available } ... execute(gp, inheritTime) } ``` 下面我們看看 checkTimers 做了什麼: **runtime.checkTimers** ```go func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { // 如果沒有需要調整的 timer if atomic.Load(&pp.adjustTimers) == 0 { // 獲取 timer0 的執行時間 next := int64(atomic.Load64(&pp.timer0When)) if next == 0 { return now, 0, false } if now == 0 { now = nanotime() } // 下次執行大於當前時間, if now < next { // 需要刪除的 timer 個數小於 timer列表個數的4分之1,直接返回 if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) { return now, next, false } } } lock(&pp.timersLock) // 進行調整 timer adjusttimers(pp) rnow = now if len(pp.timers) > 0 { if rnow == 0 { rnow = nanotime() } for len(pp.timers) > 0 { // 查詢堆中是否存在需要執行的 timer if tw := runtimer(pp, rnow); tw != 0 { if tw > 0 { pollUntil = tw } break } ran = true } } // 如果需要刪除的 timer 超過了 timer 列表數量的四分之一,那麼清理需要刪除的 timer if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 { clearDeletedTimers(pp) } unlock(&pp.timersLock) return rnow, pollUntil, ran } ``` checkTimers 中主要做了這麼幾件事: 1. 檢查是否有需要進行調整的 timer, 如果沒有需要執行的計時器時,直接返回;如果下一個要執行的 timer 沒有到期並且需要刪除的計時器較少(四分之一)時也會直接返回; 2. 呼叫 adjusttimers 進行 timer 列表的調整,主要是維護 timer 列表的最小堆的順序; 3. 呼叫 `runtime.runtimer`查詢堆中是否存在需要執行的timer, `runtime.runtimer`上面已經講過了,這裡不再贅述; 4. 如果當前 Goroutine 的 P 和傳入的 P 相同,並且需要刪除的 timer 超過了 timer 列表數量的四分之一,那麼呼叫 clearDeletedTimers 清理需要刪除的 timer; **runtime.findrunnable** ```go func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() ... // 檢查 P 中可執行的 timer now, pollUntil, _ := checkTimers(_p_, 0) ... // 如果 netpoll 已被初始化,並且 Waiters 大於零,並且 lastpoll 不為0 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { // 嘗試從netpoller獲取Glist if list := netpoll(0); !list.empty() { // 無阻塞 gp := list.pop() //將其餘佇列放入 P 的可執行G佇列 injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } ... // 開始竊取 for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } // 如果 i>2 表示如果其他 P 執行佇列中沒有 G ,將要從其他佇列的 runnext 中獲取 stealRunNextG := i > 2 // first look for ready queues with more than 1 g // 隨機獲取一個 P p2 := allp[enum.position()] if _p_ == p2 { continue } // 從其他 P 的執行佇列中獲取一般的 G 到當前佇列中 if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil { return gp, false } // 如果執行佇列中沒有 G,那麼從 timers 中獲取可執行的 timer if i > 2 || (i > 1 && shouldStealTimers(p2)) { // ran 為 true 表示有執行過 timer tnow, w, ran := checkTimers(p2, now) now = tnow if w != 0 && (pollUntil == 0 || w < pollUntil) { pollUntil = w } if ran { // 因為已經執行過 timer 了,說不定已經有準備就緒的 G 了 // 再次檢查本地佇列嘗試獲取 G if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } ranTimer = true } } } } if ranTimer { // 執行完一個 timer 後可能存在已經就緒的 G goto top } stop: ... delta := int64(-1) if pollUntil != 0 { // checkTimers ensures that polluntil > now. delta = pollUntil - now } ... // poll network // 休眠前再次檢查 poll 網路 if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { ... list := netpoll(delta) // 阻塞呼叫 lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ == nil { injectglist(&list) } else { acquirep(_p_) if !list.empty() { gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } goto top } } else if pollUntil != 0 && netpollinited() { pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) if pollerPollUntil == 0 || pollerPollUntil > pollUntil { netpollBreak() } } // 休眠當前 M stopm() goto top } ``` findrunnable 我在這篇文章 https://www.luozhiyun.com/archives/448 已經講的很清楚了,這裡提取 timer 相關的程式碼分析一下: 1. findrunnable 在竊取前先會呼叫 checkTimers 檢查 P 中可執行的 timer; 2. 如果 netpoll 中有等待的 waiter,那麼會呼叫 netpoll 嘗試無阻塞的從netpoller獲取Glist; 3. 如果獲取不到可執行的 G,那麼就會開始執行竊取。竊取的時候會呼叫 checkTimers 隨機從其他的 P 中獲取 timer; 4. 竊取完畢後也沒有可執行的 timer,那麼會繼續往下,休眠前再次檢查 netpoll 網路,呼叫 netpoll(delta) 函式進行阻塞呼叫。 #### 系統監控觸發 系統監控其實就是 Go 語言的守護程序,它們能夠在後臺監控系統的執行狀態,在出現意外情況時及時響應。它會每隔一段時間檢查 Go 語言執行時狀態,確保沒有異常發生。我們這裡不主要去講系統監控,只抽離出其中的和 timer 相關的程式碼進行講解。 **runtime.sysmon** ```go func sysmon() { ... for { ... now := nanotime() // 返回下次需要排程 timer 到期時間 next, _ := timeSleepUntil() ... // 如果超過 10ms 沒有 poll,則 poll 一下網路 lastpoll := int64(atomic.Load64(&sched.lastpoll)) if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) list := netpoll(0) // 非阻塞,返回 G 列表 // G 列表不為空 if !list.empty() { incidlelocked(-1) // 將獲取到的 G 列表插入到空閒的 P 中或全域性列表中 injectglist(&list) incidlelocked(1) } } // 如果有 timer 到期 if next < now { // 啟動新的 M 處理 timer startm(nil, false) } ... } } ``` 1. sysmon 會通過 timeSleepUntil 遍歷所有的 P 的 timer 列表,找到下一個需要執行的 timer; 2. 如果超過 10ms 沒有 poll,則 poll 一下網路; 3. 如果有 timer 到期,這個時候直接啟動新的 M 處理 timer; ### netpoll 的作用 我們從一開始呼叫 `runtime.addtimer` 新增 timer 的時候,就會 `runtime.wakeNetPoller`來中斷 netpoll ,那麼它是如何做到的?我們下面先來看一個官方的例子: ```go func TestNetpollBreak(t *testing.T) { if runtime.GOMAXPROCS(0) == 1 { t.Skip("skipping: GOMAXPROCS=1") } // 初始化 netpoll runtime.NetpollGenericInit() start := time.Now() c := make(chan bool, 2) go func() { c <- true // netpoll 等待時間 runtime.Netpoll(10 * time.Second.Nanoseconds()) c <- true }() <-c loop: for { runtime.Usleep(100) // 中斷netpoll 等待 runtime.NetpollBreak() runtime.NetpollBreak() select { case <-c: break loop default: } } if dur := time.Since(start); dur > 5*time.Second { t.Errorf("netpollBreak did not interrupt netpoll: slept for: %v", dur) } } ``` 在上面這個例子中,首先會呼叫 `runtime.Netpoll`進行阻塞等待,然後迴圈排程 `runtime.NetpollBreak`進行中斷阻塞。 **runtime.netpoll** ```go func netpoll(delay int64) gList { if epfd == -1 { return gList{} } var waitms int32 // 因為傳入delay單位是納秒,下面將納秒轉換成毫秒 if delay < 0 { waitms = -1 } else if delay == 0 { waitms = 0 } else if delay < 1e6 { waitms = 1 } else if delay < 1e15 { waitms = int32(delay / 1e6) } else { waitms = 1e9 } var events [128]epollevent retry: // 等待檔案描述符轉換成可讀或者可寫 n := epollwait(epfd, &events[0], int32(len(events)), waitms) // 返回負值,那麼重新呼叫epollwait進行等待 if n < 0 { ... goto retry } var toRun gList for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } // 如果是 NetpollBreak 中斷的,那麼執行 continue 跳過 if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd { if ev.events != _EPOLLIN { println("runtime: netpoll: break fd ready for", ev.events) throw("runtime: netpoll: break fd ready for something unexpected") } if delay != 0 { var tmp [16]byte read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp))) atomic.Store(&netpollWakeSig, 0) } continue } ... } return toRun } ``` 在 呼叫`runtime.findrunnable`執行搶佔時,最後會傳入一個時間,超時阻塞呼叫 netpoll,如果沒有事件中斷,那麼迴圈排程會一直等待直到 netpoll 超時後才往下進行: ```go func findrunnable() (gp *g, inheritTime bool) { ... delta := int64(-1) if pollUntil != 0 { // checkTimers ensures that polluntil > now. delta = pollUntil - now } ... // poll network // 休眠前再次檢查 poll 網路 if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { ... // 阻塞呼叫 list := netpoll(delta) } ... // 休眠當前 M stopm() goto top } ``` 所以在呼叫 `runtime.addtimer` 新增 timer 的時候進行 netpoll 的中斷操作可以更加靈敏的響應 timer 這類時間敏感的任務。 ## 總結 我們通過 timer 的 1.13版本以及1.14版本後的對比可以發現,即使是一個定時器 go 語言都做了相當多的優化工作。從原來的需要維護 64 個桶,然後每個桶裡面跑非同步任務,到現在的將 timer列表直接掛到了 P 上面,這不僅減少了上下文切換帶來的效能損耗,也減少了在鎖之間的爭搶問題,通過這些優化後有了可以媲美時間輪的效能表現。 ## Reference go1.14基於netpoll優化timer定時器實現原理 http://xiaorui.cc/archives/6483 https://github.com/golang/go/commit/6becb033341602f2df9d7c55cc23e64b925bbee2 https://github.com/golang/go/commit/76f4fd8a5251b4f63ea14a3c1e2fe2e78eb74f81 計時器 https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/timer/ 《Golang》Netpoll解析 https://www.pefish.club/2020/05/04/Golang/1011Netpoll%E8%A7%A3%E6%9E%90/ time.Timer 原始碼分析 https://docs.google.com/presentation/d/1c2mRWA-FiihgpbGsE4uducou7X5d4WoiiLVab-ewsT8/edit#slide=id.g6d01ce596e_0_77 ![luozhiyun很酷](https://img.luozhiyun.com/20210221183