1. 程式人生 > >golang 定時器

golang 定時器

time.After

golang實現計時器:

1.time.After
otherTimeChan = time.After(refreshActiviryInterval * time.Second)
case <-otherTimeChan:
    an.checkAndRefreshAcitivity()
    otherTimeChan = time.After(refreshActiviryInterval * time.Second)

2.time.NewTimer
 timer := time.NewTimer(3 * time.Second)
 <-timer.C

3.time.NewTicker
ticker := time.NewTicker(1 * time.Second)
select {
    case <-ticker.C:
        c.SSEvent("stats", Stats())
}

4.time.AfterFunc
time.AfterFunc(5 * time.Minute, func() {
    fmt.Printf("expired")
})

內部如何實現計時:

其底層結構是runtimeTimer正常情況下,時間到了之後會呼叫觸發函式。NewTimer中建立了一個channel,size = 1。在Timer中則會呼叫sendTime函式向channel中發訊息,用select保證不會堵塞。

func After(d Duration) <-chan Time {
    return NewTimer(d).C
}
//到了指定時間now_t+d呼叫f(arg, seq)
func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    t := &Timer{
        C: c,
        r: runtimeTimer{
            when: when(d),  //定時時間
            f:    sendTime, //觸發函式
            arg:  c,        //觸發函式引數
        },
    }
    startTimer(&t.r)
    return t
}
//到時間後呼叫sendTime函式,向Channel傳訊息
func sendTime(c interface{}, seq uintptr) {
    select {
    case c.(chan Time) <- Now():
    default:
    }
}

//到了指定時間now_t+d呼叫f(arg, seq),並計算下一次的時間
func NewTicker(d Duration) *Ticker {
    if d <= 0 {
        panic(errors.New("non-positive interval for NewTicker"))
    }
    c := make(chan Time, 1)
    t := &Ticker{
        C: c,
        r: runtimeTimer{
            when:   when(d),
            period: int64(d),
            f:      sendTime,
            arg:    c,
        },
    }
    startTimer(&t.r)
    return t
}
// 時間d後執行f context包就是藉此實現計時
func AfterFunc(d Duration, f func()) *Timer {
    t := &Timer{
        r: runtimeTimer{
            when: when(d),
            f:    goFunc,
            arg:  f,
        },
    }
    startTimer(&t.r)
    return t
}

底層資料結構(多個計時器時誰先觸發):

  • 能夠快速維護距離當前時間最近的Timer
  • 數量龐大的Timer節點

啟動一個單獨的goroutine,維護了一個”最小堆”,go timerproc;如果到了觸發時間,讀取堆頂的timer,執行timer對應的f函式,並移除該timer element。建立一個Timer實則就是在這個最小堆中新增一個element,Stop一個timer,則是從堆中刪除對應的element。

  • 沒有新增節點:

    計算堆頂節點觸發的時間,睡眠到對應的時間。

  • 新增節點:

    如果插入節點經過平衡是堆頂節點,會利用notewakeup(&timers.waitnote)

    喚醒timerproc

// Timerproc runs the time-driven events.
// It sleeps until the next event in the timers heap.
// If addtimer inserts a new earlier event, it wakes timerproc early.
func timerproc() {
    timers.gp = getg()
    for {
        lock(&timers.lock)
        timers.sleeping = false
        now := nanotime()
        delta := int64(-1)
        for {
            if len(timers.t) == 0 {
                delta = -1
                break
            }
            t := timers.t[0]
            //獲取堆頂的節點,計算最快觸發事件的時間
            delta = t.when - now
            if delta > 0 {
                break
            }
            if t.period > 0 {
                //計算下一次的觸發時間,並維護最小堆
                t.when += t.period * (1 + -delta/t.period)
                siftdownTimer(0)
            } else {
                // 從最小堆中刪除
                last := len(timers.t) - 1
                if last > 0 {
                    timers.t[0] = timers.t[last]
                    timers.t[0].i = 0
                }
                timers.t[last] = nil
                timers.t = timers.t[:last]
                if last > 0 {
                    siftdownTimer(0)
                }
                t.i = -1 // mark as removed
            }
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&timers.lock)
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            //呼叫觸發函式 sendTime
            f(arg, seq)
            lock(&timers.lock)
        }
        if delta < 0 || faketime > 0 {
            // 如果最小堆中沒有timer
            timers.rescheduling = true
            goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
            continue
        }
        // 睡眠delta同時監聽timers.waitnote
        timers.sleeping = true
        noteclear(&timers.waitnote)
        unlock(&timers.lock)
        notetsleepg(&timers.waitnote, delta)
    }
}

可能遇見的問題:

  • 到時間才釋放對應的記憶體(如果沒有主動關閉,退出函式時不會立即釋放)
func () {
    select {
        case b := <-c:
            balabala
            return
        case <-timer.C:
            continue
    }
}
  • 頻繁的建立銷燬

    每次呼叫NewTime或者After都會重新建立Timer,建立一個新的channel。

Timer.Stop
/*
    Stop prevents the Timer from firing.
    It returns true if the call stops the timer, false if the timer has already
    expired or been stopped.
    Stop does not close the channel
    1.定時器已經到期
    2.定時器未到期
*/
if !t.Stop() {
    <-t.C
}

參考連結

func main() {
    timer := time.NewTimer(3 * time.Second)

    go func() {
        <-timer.C
        fmt.Println("Timer has expired.")
    }()

    timer.Stop()
    time.Sleep(60 * time.Second)
}

執行timer.Stop()之後,只是將節點從最小堆裡面刪除,等待返回的channel並沒有關閉也沒有呼叫sendtime函式。

Timer.Reset

Reset主要是重新設定倒計時時間。

在使用Reset函式時主要考慮Timer.C這個channel中是否是空的,將裡面冗餘的值取出來(如果能確保沒值則不考慮),避免出現邏輯錯誤。文件中對Reset的使用描述:如果明確timer已經失效,並且t.C已經被取空,那麼可以直接使用Reset。

  • 呼叫stop函式,發現Timer還生效,再呼叫Reset
  • 呼叫stop函式,發現Timer已經失效,這時Timer.C中可能有值;需要先判斷Timer.C裡面的值是否被取出,然後呼叫Reset
//  if !t.Stop() {
//      利用stop判斷是否需要清理channel
//      <-t.C
//  }
//  t.Reset(d)
func (t *Timer) Reset(d Duration) bool {
    if t.r.f == nil {
        panic("time: Reset called on uninitialized Timer")
    }
    w := when(d)
    active := stopTimer(&t.r)
    t.r.when = w
    startTimer(&t.r)
    return active
}
// 每隔xxx秒xxx
func () {
    timer := time.NewTimer(timeoutDuration)
    for {
        case <-timer.C:
            xxxxx
            timer.Reset(timeoutDuration)
    }
}