簡介
工作中經常有定時執行某些代碼塊的需求,如果是php代碼,一般寫個腳本,然后用Cron實現。
Go里提供了兩種定時器:Timer(到達指定時間觸發且只觸發一次)和 Ticker(間隔特定時間觸發)。
Timer和Ticker的實現幾乎一樣,Ticker相對復雜一些,這里主要講述一下Ticker是如何實現的。
讓我們先來看一下如何使用Ticker
//創建Ticker,設置多長時間觸發一次
ticker := time.NewTicker(time.Second * 10)
go func() {
for range ticker.C { //遍歷ticker.C,如果有值,則會執行do someting,否則阻塞
//do someting
}
}()
代碼很簡潔,給開發者提供了巨大的便利。那GoLang是如何實現這個功能的呢?
原理
NewTicker
time/tick.go的NewTicker函數:
調用NewTicker可以生成Ticker,關于這個函數有四點需要說明
- NewTicker主要作用之一是初始化
- NewTicker中的時間是以納秒為單位的,when返回的從當前時間+d的納秒值,d必須為正值
- Ticker結構體中包含channel,sendTime是個function,邏輯為用select等待c被賦值
- 神秘的startTimer函數,揭示channel、sendTime是如何關聯的
// NewTicker returns a new Ticker containing a channel that will send the
// time with a period specified by the duration argument.
// It adjusts the intervals or drops ticks to make up for slow receivers.
// The duration d must be greater than zero; if not, NewTicker will panic.
// Stop the ticker to release associated resources.
func NewTicker(d Duration) *Ticker {
if d <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
// Give the channel a 1-element time buffer.
// If the client falls behind while reading, we drop ticks
// on the floor until the client catches up.
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d),
period: int64(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
time/tick.go的Ticker數據結構
// A Ticker holds a channel that delivers `ticks' of a clock
// at intervals.
type Ticker struct {
C <-chan Time // The channel on which the ticks are delivered.
r runtimeTimer
}
time/sleep.go的runtimeTimer
// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/time.go:/^type timer
type runtimeTimer struct {
tb uintptr
i int
when int64
period int64
f func(interface{}, uintptr) // NOTE: must not be closure
arg interface{}
seq uintptr
}
time/sleep.go的sendTime
func sendTime(c interface{}, seq uintptr) {
// Non-blocking send of time on c.
// Used in NewTimer, it cannot block anyway (buffer).
// Used in NewTicker, dropping sends on the floor is
// the desired behavior when the reader gets behind,
// because the sends are periodic.
select {
case c.(chan Time) <- Now():
default:
}
}
time/sleep.go的startTimer
func startTimer(*runtimeTimer)
func stopTimer(*runtimeTimer) bool
startTimer
看完上面的代碼,大家內心是不是能夠猜出是怎么實現的?
有一個機制保證時間到了時,sendTime被調用,此時channel會被賦值,調用ticker.C的位置解除阻塞,執行指定的邏輯。
讓我們看一下GoLang是不是這樣實現的。
追蹤代碼的時候我們發現在time包里的startTimer,只是一個聲明,那真正的實現在哪里?
runtime/time.go的startTimer
此處使用go的隱藏技能go:linkname引導編譯器將當前(私有)方法或者變量在編譯時鏈接到指定的位置的方法或者變量。另外timer和runtimeTimer的結構是一致的,所以程序運行正常。
//startTimer將new的timer對象加入timer的堆數據結構中
//startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
if raceenabled {
racerelease(unsafe.Pointer(t))
}
addtimer(t)
}
runtime/time.go的addtimer
func addtimer(t *timer) {
tb := t.assignBucket()
lock(&tb.lock)
ok := tb.addtimerLocked(t)
unlock(&tb.lock)
if !ok {
badTimer()
}
}
runtime/time.go的addtimerLocked
// Add a timer to the heap and start or kick timerproc if the new timer is
// earlier than any of the others.
// Timers are locked.
// Returns whether all is well: false if the data structure is corrupt
// due to user-level races.
func (tb *timersBucket) addtimerLocked(t *timer) bool {
// when must never be negative; otherwise timerproc will overflow
// during its delta calculation and never expire other runtime timers.
if t.when < 0 {
t.when = 1<<63 - 1
}
t.i = len(tb.t)
tb.t = Append(tb.t, t)
if !siftupTimer(tb.t, t.i) {
return false
}
if t.i == 0 {
// siftup moved to top: new earliest deadline.
if tb.sleeping && tb.sleepUntil > t.when {
tb.sleeping = false
notewakeup(&tb.waitnote)
}
if tb.rescheduling {
tb.rescheduling = false
goready(tb.gp, 0)
}
if !tb.created {
tb.created = true
go timerproc(tb)
}
}
return true
}
runtime/time.go的timerproc
func timerproc(tb *timersBucket) {
tb.gp = getg()
for {
lock(&tb.lock)
tb.sleeping = false
now := nanotime()
delta := int64(-1)
for {
if len(tb.t) == 0 { //無timer的情況
delta = -1
break
}
t := tb.t[0] //拿到堆頂的timer
delta = t.when - now
if delta > 0 { // 所有timer的時間都沒有到期
break
}
if t.period > 0 { // t[0] 是ticker類型,調整其到期時間并調整timer堆結構
// leave in heap but adjust next time to fire
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(tb.t, 0)
} else {
//Timer類型的定時器是單次的,所以這里需要將其從堆里面刪除
// remove from heap
last := len(tb.t) - 1
if last > 0 {
tb.t[0] = tb.t[last]
tb.t[0].i = 0
}
tb.t[last] = nil
tb.t = tb.t[:last]
if last > 0 {
siftdownTimer(tb.t, 0)
}
t.i = -1 // mark as removed
}
f := t.f
arg := t.arg
seq := t.seq
unlock(&tb.lock)
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
f(arg, seq) //sendTimer被調用的位置 ---------------------------------------
lock(&tb.lock)
}
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
tb.rescheduling = true
goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
continue
}
// At least one timer pending. Sleep until then.
tb.sleeping = true
tb.sleepUntil = now + delta
noteclear(&tb.waitnote)
unlock(&tb.lock)
notetsleepg(&tb.waitnote, delta)
}
}
追蹤了一圈,最終追蹤到timerproc,發現了sendTimer被調用位置f(arg, seq) ,而且可以看到將channel c傳到了sendTimer中。
上面的這堆代碼邏輯是什么意思呢?
- 所有timer統一使用一個最小堆結構去維護,按照timer的when(到期時間)比較大小;
- for循環過程中,如果delta = t.when - now的時間大于0,則break,直到有到時間的timer才進行操作;
- timer處理線程從堆頂開始處理每個timer,對于到期的timer,如果其period>0,則表明該timer 屬于Ticker類型,調整其下次到期時間并調整其在堆中的位置,否則從堆中移除該timer;
- 調用該timer的處理函數以及其他相關工作;
總結
讀完這篇文章,有沒有奇怪的知識又增加了一些的感覺。寫這些源碼的大神們,對Go的理解很深刻,編碼的功能也很深厚。
本質上GoLang用channel和堆實現了定時器功能,讓我們來mock一下,偽代碼如下:
func cronMock() {
for {
//從堆中獲取時間最近的定時器
t := getNearestTime()
//如果時間還沒到,則continue
t.delta > 0 {
continue
}else{
//時間到了,將當前的定時器再加一個鐘
t.when += t.duration
//將堆重新排序
siftdownTimer()
//執行當前定時器指定的函數,即sendTimer
t.sendTimer()
}
}
}
資料
- golang進階(八)——隱藏技能go:linkname https://blog.csdn.net/lastsweetop/article/details/78830772
- 從99.9%CPU淺談Golang的定時器實現原理https://www.jianshu.com/p/c9e8aaa13415
最后
大家如果喜歡我的文章,可以關注我的公眾號(程序員麻辣燙)