golang系列之-定时器Timer和Ticker

Timer-一次性定时器,Ticker-周期性定时器。从1.23版本开始,将异步实现改为同步实现,但你仍然可以使用AfterFunc创建异步定时器,或者通过改变asynctimerchan变量启用异步实现

asynctimerchan变量可选项如下

asynctimerchan description
0 同步实现,从1.23版本开始启用
1 旧版异步实现
2 同1,异步实现,但修复了1的问题,debug用

定时器的精确度因系统不同而不同,具体如下

OS resolution
Unix ~1ms
>= Windows 1803 ~0.5ms
< Windows 1803 ~16ms

快速上手

深入了解源代码前,先了解其功能如何使用

Timer-一次性定时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"time"
)

func main() {
// 定时器1
timer1 := time.NewTimer(2 * time.Second)

<-timer1.C
fmt.Println("Timer 1 fired")

// 定时器2
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 fired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}

// 定时器3
time.Sleep(2 * time.Second)
}

上述示例代码运行效果如下

1
2
3
4
go run main.go

# Timer 1 fired
# Timer 2 stopped

Ticker-周期性定时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"time"
)

func main() {
// 每500ms执行一次
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)

go func() {
for {
select {
case <-done:
return
case t := <-ticker.C:
fmt.Println("Tick at", t)
}
}
}()

// 挂起休眠1600ms
time.Sleep(1600 * time.Millisecond)
// 停止定时器
ticker.Stop()
done <- true
fmt.Println("Ticker stopped")
}

上述示例代码运行效果如下

1
2
3
4
5
6
go run main.go

# Tick at 2025-02-27 10:41:07.875146141 +0800 CST m=+0.500099485
# Tick at 2025-02-27 10:41:08.37515345 +0800 CST m=+1.000100767
# Tick at 2025-02-27 10:41:08.875159521 +0800 CST m=+1.500100789
# Ticker stopped

数据结构

time.Timer以及time.Ticker数据结构同源,在实际运行时都会转换成runtime的timeTimer,数据结构的字段释义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// src/time/sleep.go
// time.Timer结构,可以忽略,实际使用会转换成runtime的timeTimer
type Timer struct {
C <-chan Time
initTimer bool
}

// src/time/tick.go
// time.Ticker,可以忽略,实际使用会转换成runtime的timeTimer
type Ticker struct {
C <-chan Time
initTicker bool
}

// src/runtime/time.go
// 下面几个数据结构才是实际使用的
type timeTimer struct {
c unsafe.Pointer // channel
init bool // 是否已经初始化
timer // 具体看下方
}

type timer struct {
mu mutex // 锁,保护以下所有字段
astate atomic.Uint8 // state字段的复制,modify以及unlock时复制
state uint8 // 状态位,具体见后面的列表
isChan bool // 同步还是异步(go1.23开始设置为true,除非自己设置asynctimerchan)
isFake bool // 测试用
blocked uint32 // 阻塞在channel中的G数量
when int64 // 目标过期时刻
period int64 // 时长-周期性定时器使用

// 函数f内部不能出现阻塞,默认为sendTime,也可以是goroutineReady或用户自定义函数
f func(arg any, seq uintptr, delay int64)
// 当与netpoll一起使用时
// arg是*pollDesc
// seq是版本计数器,类似时间戳,确保过期的消息不会被处理,stop以及modify时自增
// 当作为time包时
// arg是一个channel(After/NewTicker)或一个函数(AfterFunc)
// seq同上,但没有使用
arg any // 见上方解释
seq uintptr // 版本计数器

ts *timers // 判断timer属于哪个P

sendLock mutex // 用于channel加锁(go1.23开始启用)

// 用于处理运行定时器和停止/重置定时器的冲突检测
// 只用于同步定时器(isChan == true),周期性定时器不可用
// 发送数据到channel时isSending++,发送后isSending--
// t.mu加锁 => isSending可自增
// t.sendLock加锁 => isSending可自减
// t.mu、t.sendLock加锁 => 数据只读
isSending atomic.Int32 // 见上方解释
}

// 最小堆及其元数据,该结构放在p数据结构里(每个P一个timers)
type timers struct {
mu mutex // 锁,保护下面字段,因为其他P的G可以访问当前P的timers
heap []timerWhen // 最小堆,用于存储timer,按heap[i].when排序
len atomic.Uint32 // 最小堆中的元素数量 => len(heap)
zombies atomic.Int32 // 最小堆中,timer.state设置了timerZombie位的定时器数量
raceCtx uintptr // 冲突检测用,忽略
minWhenHeap atomic.Int64 // 最小堆中的最小when(=heap[0].when),为0时表示heap为空
minWhenModified atomic.Int64 // 最小堆中的最小when,但仅限timer.state设置了timerModified位的定时器
syncGroup *synctestGroup // 测试用
}

// 定时器及其过期时刻
type timerWhen struct {
timer *timer // 定时器指针
when int64 // 目标过期时刻
}

timer的state状态总共占用3个位,如下所示

state_name state_value description
timerHeaped 1 定时器已经放在某个P的timers最小堆中
timerModified 2 t.when被修改但还没更新heap[i].when,如果定时器不在heap,忽略
timerZombie 4 定时器被停止,但还放在heap里,可以跟timerModified位共存。定时器为zombie时可以发送数据到channel,因为数据不会被读取

注意:

  1. timerModified和timerZombie的前提都是timerHeaped
  2. 无法直接把timer移除出timers.heap,因为别的P可能已经拿到了这个timer
  3. timer有这几个状态位意味着最小堆还没有重新调整,timer还放在之前的位置上(不满足最小堆)

创建定时器

NewTimer & NewTicker

创建/获取定时器,过期时刻为当前时刻加目标时长

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 一次性定时器
func NewTimer(d Duration) *Timer {
// channel,有缓冲
c := make(chan Time, 1)
// 创建定时器
t := (*Timer)(newTimer(when(d), 0, sendTime, c, syncTimer(c)))
// 定时器绑定channel
t.C = c
return t
}

// 周期性定时器
func NewTicker(d Duration) *Ticker {
if d <= 0 {
panic("non-positive interval for NewTicker")
}
// channel,有缓冲
c := make(chan Time, 1)
// 创建定时器
t := (*Ticker)(unsafe.Pointer(newTimer(when(d), int64(d), sendTime, c, syncTimer(c))))
// 定时器绑定channel
t.C = c
return t
}

// 创建定时器time.Timer或time.Ticker
func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg any, c *hchan) *timeTimer {
t := new(timeTimer)
// mu/f/arg参数初始化,这里传nil,后面用t.modify修改
t.timer.init(nil, nil)
// debug用,忽略
t.trace("new")
// 同步定时器(go1.23开始默认启用),初始化sendLock、isChan
if c != nil {
lockInit(&t.sendLock, lockRankTimerSend)
t.isChan = true
// 双向绑定
c.timer = &t.timer
if c.dataqsiz == 0 {
throw("invalid timer channel: no capacity")
}
}
// 测试,忽略
if gr := getg().syncGroup; gr != nil {
t.isFake = true
}
// 更新timer状态、添加到最小堆、中断网络轮询
t.modify(when, period, f, arg, 0)
t.init = true
return t
}

为了避免代码过长影响阅读,其他依赖方法列在下方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
// 获取定时器触发时刻
func when(d Duration) int64 {
// 时长不能为负
if d <= 0 {
return runtimeNano()
}
// 单调时钟
t := runtimeNano() + int64(d)
// 溢出时,设置为最大值math.MaxInt64
if t < 0 {
t = 1<<63 - 1
}
return t
}

// 把目标触发时刻发送给channel,定时器过期时调用
// 目前有maybeRunAsync、unlockAndRun、maybeRunChan在使用
func sendTime(c any, seq uintptr, delta int64) {
// delta => 实际触发时刻-目标触发时刻
select {
case c.(chan Time) <- Now().Add(Duration(-delta)):
default:
}
}

// 获取同步定时器
func syncTimer(c chan Time) unsafe.Pointer {
// async,go1.22及之前版本
if asynctimerchan.Value() == "1" {
asynctimerchan.IncNonDefault()
return nil
}

// sync,go1.23及以后,返回channel的指针
// asynctimerchan=2作用同asynctimerchan=1,修复了旧版问题,用于debug
return *(*unsafe.Pointer)(unsafe.Pointer(&c))
}

// 初始化timer的mu/f/arg字段
func (t *timer) init(f func(arg any, seq uintptr, delay int64), arg any) {
lockInit(&t.mu, lockRankTimer)
t.f = f
t.arg = arg
}

// 更新timer状态、添加到最小堆、中断网络轮询
// 一般是netpoll、time.Ticker.Reset、time.Timer.Reset调用。该方法修改的是timer的when/period/f/arg/seq字段
func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay int64), arg any, seq uintptr) bool {
// 1. guard
if when <= 0 {
throw("timer when must be positive")
}
if period < 0 {
throw("timer period must be non-negative")
}

// 判断是异步还是同步定时器(go1.23开始默认是同步定时器)
async := debug.asynctimerchan.Load() != 0

// 同步定时器
if !async && t.isChan {
// 防止重复发送数据到channel
lock(&t.sendLock)
}

t.lock()
// 异步定时器
if async {
// 判断定时器是否需要触发执行函数f
t.maybeRunAsync()
}
// debug用,忽略
t.trace("modify")

// 2. 更新timer字段

oldPeriod := t.period
t.period = period
if f != nil {
t.f = f
t.arg = arg
t.seq = seq
}

wake := false
pending := t.when > 0
t.when = when
// timer已经在最小堆中
if t.state&timerHeaped != 0 {
t.state |= timerModified // 已修改
// timer已标记删除
if t.state&timerZombie != 0 {
t.ts.zombies.Add(-1) // 计数器更新
t.state &^= timerZombie // 移除标志位
}
// minWhen更新
// 最小堆为空 or when比minWhen小
if min := t.ts.minWhenModified.Load(); min == 0 || when < min {
// 需要中断网络轮询
wake = true
// 纪录state
t.astate.Store(t.state)
// 是否要用when替换minWhenModified
t.ts.updateMinWhenModified(when)
}
}

// 3. 判断是否需要把timer添加到最小堆、中断网络轮询

// 是否需要把定时器放到timers.heap
add := t.needsAdd()

// 同步定时器
if !async && t.isChan {
// 版本计数器更新
t.seq++

// 正在发送数据到channel
if oldPeriod == 0 && t.isSending.Load() > 0 {
pending = true
}
}
// 用state字段数据更新astate,然后再解锁
t.unlock()
// 同步定时器
if !async && t.isChan {
// 清空channel(t.arg)的buf缓冲区
if timerchandrain(t.hchan()) {
pending = true
}
unlock(&t.sendLock)
}

// 如果需要把定时器放到timers.heap
if add {
// 更新timer状态、添加到最小堆、中断网络轮询(跟modify函数有些相似)
t.maybeAdd()
}

// 中断netpoll轮询
if wake {
// 如果正在轮询netpoll且有更早过期的事件,中断netpoll轮询。如果没有轮询netpoll,则从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakeNetPoller(when)
}

return pending
}

// 判断定时器是否需要更新timer状态,执行函数f(异步定时器执行)
// timer为异步定时器调用(go1.23开始应该用不到了),目前只有modify和stop函数在使用
func (t *timer) maybeRunAsync() {
// mu加锁
assertLockHeld(&t.mu)
// 1. timer不在最小堆
// 2. 同步定时器(用来判断同步定时器???)
// 3. 过期时刻大于0
if t.state&timerHeaped == 0 && t.isChan && t.when > 0 {
// 获取当前时刻,如果已过期
if now := nanotime(); t.when <= now {
systemstack(func() {
// 更新timer状态、执行函数f
t.unlockAndRun(now)
})
// 重新上锁
t.lock()
}
}
}

// 更新timer状态、执行函数f
// 该函数不管是异步定时器还是同步定时器都会使用
func (t *timer) unlockAndRun(now int64) {
// 1. guard

// debug用,忽略
t.trace("unlockAndRun")
// mu加锁
assertLockHeld(&t.mu)
// 如果timer已经放到了ts最小堆上,ts也要加锁
if t.ts != nil {
assertLockHeld(&t.ts.mu)
}

// timer已经被修改或被停止,异常
if t.state&(timerModified|timerZombie) != 0 {
badTimer()
}

// 2. 更新timer状态

// 准备被调用的函数数据
f := t.f
arg := t.arg
seq := t.seq

var next int64
// 超过过期时刻的纳秒数
delay := now - t.when
// 周期性定时器
if t.period > 0 {
// 计算下一个过期时刻,这公式没明白原理
next = t.when + t.period*(1+delay/t.period)
// 溢出了,设置为最大值2^63-1
if next < 0 {
next = maxWhen
}
} else {
// 一次性定时器
next = 0
}
ts := t.ts
t.when = next
// timer已经在最小堆中
if t.state&timerHeaped != 0 {
// 设置timerModified标志位
t.state |= timerModified
// 如果是一次性定时器,标记删除
if next == 0 {
// 标记为删除
t.state |= timerZombie
// 计数器zombies+=1
t.ts.zombies.Add(1)
}
// 根据timer状态更新最小堆,定时器已停止则删除,定时器已修改则同步
t.updateHeap()
}

// 3. 执行函数f

// 判断是异步还是同步定时器(go1.23开始默认是同步定时器)
async := debug.asynctimerchan.Load() != 0
// 同步定时器 and 一次性
if !async && t.isChan && t.period == 0 {
// 发送中,计数器加1(用于并发检测,后面会执行减1操作)
if t.isSending.Add(1) < 0 {
throw("too many concurrent timer firings")
}
}

// 用state字段数据更新astate,然后再解锁
t.unlock()

// 当前定时器已经放到了最小堆
if ts != nil {
// 计算heap元素数量并更新len字段,最后解锁
ts.unlock()
}

// 测试,忽略
if ts != nil && ts.syncGroup != nil {
// Temporarily use the timer's synctest group for the G running this timer.
gp := getg()
if gp.syncGroup != nil {
throw("unexpected syncgroup set")
}
gp.syncGroup = ts.syncGroup
ts.syncGroup.changegstatus(gp, _Gdead, _Grunning)
}

// 同步定时器
if !async && t.isChan {
lock(&t.sendLock)

// 一次性定时器
if t.period == 0 {
// 发送完毕,计数器减1
if t.isSending.Add(-1) < 0 {
throw("mismatched isSending updates")
}
}

// 版本不一样,不执行。加锁后double-check
if t.seq != seq {
f = func(any, uintptr, int64) {}
}
}

// 执行函数f
f(arg, seq, delay)

// 同步定时器
if !async && t.isChan {
unlock(&t.sendLock)
}

// 测试,忽略
if ts != nil && ts.syncGroup != nil {
gp := getg()
ts.syncGroup.changegstatus(gp, _Grunning, _Gdead)
gp.syncGroup = nil
}

// 当前定时器已经放到了最小堆
if ts != nil {
ts.lock()
}
}

// 是否要用when替换minWhenModified
func (ts *timers) updateMinWhenModified(when int64) {
// old == 0 => 第一次访问,替换
// old < when => 忽略
// old > when => 替换
for {
old := ts.minWhenModified.Load()
if old != 0 && old < when {
return
}
if ts.minWhenModified.CompareAndSwap(old, when) {
return
}
}
}

// 判断是否需要把定时器放到timers.heap
func (t *timer) needsAdd() bool {
// t.mu加锁
assertLockHeld(&t.mu)
// 1. timer不在最小堆
// 2. 过期时刻大于0
// 3. 异步定时器 or 在测试 or 有G阻塞在channel中
need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.isFake || t.blocked > 0)

// 下面两个trace都是debug用的,忽略
if need {
t.trace("needsAdd+")
} else {
t.trace("needsAdd-")
}

return need
}

// 清空channel(t.arg)的buf缓冲区
func timerchandrain(c *hchan) bool {
// channel数据为0
if atomic.Loaduint(&c.qcount) == 0 {
return false
}
lock(&c.lock)
any := false
// buf有数据?清空
for c.qcount > 0 {
any = true
typedmemclr(c.elemtype, chanbuf(c, c.recvx))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
}
unlock(&c.lock)
return any
}

// 更新timer状态、添加到最小堆、中断网络轮询(跟modify函数有些相似)
func (t *timer) maybeAdd() {
// 1. 获取当前P的timers最小堆

// 确保拿的是当前P
mp := acquirem()
var ts *timers
// 测试,忽略
if t.isFake {
sg := getg().syncGroup
if sg == nil {
throw("invalid timer: fake time but no syncgroup")
}
ts = &sg.timers
} else {
// 其他情况
ts = &mp.p.ptr().timers
}
ts.lock()

// 2. 清理最小堆
// 确保最小堆首尾都没有过期的timer
ts.cleanHead()

t.lock()
// debug用,忽略
t.trace("maybeAdd")

// 3. 更新当前timer状态、添加到最小堆、中断网络轮询

when := int64(0)
wake := false
// double-check
if t.needsAdd() {
// 设置timerHeaped标志位
t.state |= timerHeaped
when = t.when
// 获取最小堆的最小when,用于判断是否需要唤醒netpoller
wakeTime := ts.wakeTime()
// 最小堆为空 or 当前timer过期时刻比最小堆的还小
wake = wakeTime == 0 || when < wakeTime
// 把timer定时器加入到timers.heap最小堆中
ts.addHeap(t)
}
// 用state字段数据更新astate,然后再解锁
t.unlock()
// 计算heap元素数量并更新len字段,最后解锁
ts.unlock()
releasem(mp)

// 中断netpoll轮询
if wake {
// 如果正在轮询netpoll且有更早过期的事件,中断netpoll轮询。如果没有轮询netpoll,则从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakeNetPoller(when)
}
}

// 确保最小堆首尾都没有过期的timer
func (ts *timers) cleanHead() {
// debug,忽略
ts.trace("cleanHead")
// ts.mu加锁
assertLockHeld(&ts.mu)
// 当前goroutine
gp := getg()

// 从最小堆的最后一个元素开始向前扫描,如果timer已过期,则移除出最小堆
for {
// 1. guard

// 最小堆数据为空
if len(ts.heap) == 0 {
return
}

// for循环会运行一段时间,因为持有锁,不会被抢占
// 如果其他G要抢占,返回,稍后再清理
if gp.preemptStop {
return
}

n := len(ts.heap)

// 2. 尾部处理
// 从最小堆的最后一个元素开始向前扫描,如果timer已过期,则移除出最小堆
if t := ts.heap[n-1].timer; t.astate.Load()&timerZombie != 0 {
t.lock()
// double-check
if t.state&timerZombie != 0 {
// 移除出heap前把状态清空
t.state &^= timerHeaped | timerZombie | timerModified
// 表示从heap上移除
t.ts = nil
// 计数器zombies-=1
ts.zombies.Add(-1)
// 先用空的结构替换
ts.heap[n-1] = timerWhen{}
// 移除最后一个数据
ts.heap = ts.heap[:n-1]
}
// 用state字段数据更新astate,然后再解锁
t.unlock()
continue
}

// 3. 头部处理

// 最小堆中的最小timer
t := ts.heap[0].timer
// 不在同一个P上
if t.ts != ts {
throw("bad ts")
}

// 没有被修改也没有标记删除,无须调整
if t.astate.Load()&(timerModified|timerZombie) == 0 {
return
}

t.lock()
// 根据timer状态更新最小堆,定时器已停止则删除,定时器已修改则同步
updated := t.updateHeap()
// 用state字段数据更新astate,然后再解锁
t.unlock()
if !updated {
return
}
}
}

// 更新minWhenHeap字段
// minWhenHeap = heap[0].when
func (ts *timers) updateMinWhenHeap() {
// mu加锁、STW
assertWorldStoppedOrLockHeld(&ts.mu)
// heap为空
if len(ts.heap) == 0 {
ts.minWhenHeap.Store(0)
} else {
// 最小堆中的最小when纪录到minWhenHeap字段
ts.minWhenHeap.Store(ts.heap[0].when)
}
}

// 获取最小堆的最小when,用于判断是否需要唤醒netpoller
// => min(minWhenHeap, minWhenModified)
func (ts *timers) wakeTime() int64 {
// 字段读取的顺序很关键,跟adjust函数存在并发竞态

// 设置了timerModified位的最小when
nextWhen := ts.minWhenModified.Load()
// 最小when
when := ts.minWhenHeap.Load()
// 最小堆为空 or nextWhen比最小when还要小
if when == 0 || (nextWhen != 0 && nextWhen < when) {
when = nextWhen
}
return when
}

// 如果正在轮询netpoll且有更早过期的事件,中断netpoll轮询。如果没有轮询netpoll,则从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
func wakeNetPoller(when int64) {
// 正在轮询netpoll中
if sched.lastpoll.Load() == 0 {
pollerPollUntil := sched.pollUntil.Load()
// 轮询完毕 or 有更早过期的事件
if pollerPollUntil == 0 || pollerPollUntil > when {
// 中断
netpollBreak()
}
} else {
// 非plan9
if GOOS != "plan9" {
// 从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakep()
}
}
}

AfterFunc

创建/获取一次性定时器,与NewTimer的区别是使用了用户自定义函数,此外,该定时器是异步的

1
2
3
4
5
6
7
8
func AfterFunc(d Duration, f func()) *Timer {
// 异步定时器
return (*Timer)(newTimer(when(d), 0, goFunc, f, nil))
}

func goFunc(arg any, seq uintptr, delta int64) {
go arg.(func())()
}

Sleep

创建定时器或重用当前G的定时器,把当前goroutine挂起休眠至少ns纳秒时间

  1. 初始化g.timer或重用当前G的timer
  2. 计算过期时刻when,纪录到g.sleepWhen
  3. 调用modify更新定时器,将当前goroutine挂起等待唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func timeSleep(ns int64) {
// 时长不能为负
if ns <= 0 {
return
}

// 1. g.timer设置
gp := getg()
t := gp.timer
// 确保当前goroutine的timer定时器不为nil
if t == nil {
t = new(timer)
// 纪录f/arg
t.init(goroutineReady, gp)
// 测试,忽略
if gp.syncGroup != nil {
t.isFake = true
}
// 绑定
gp.timer = t
}

// 2. g.sleepWhen设置

// 当前时刻-单调时钟
var now int64
if sg := gp.syncGroup; sg != nil {
now = sg.now
} else {
now = nanotime()
}
// 目标过期时刻
when := now + ns
if when < 0 { // 溢出
when = maxWhen // 设置为最大值2^63-1
}
gp.sleepWhen = when

// 3. 挂起goroutine
// 测试,忽略
if t.isFake {
resetForSleep(gp, nil)
// 挂起休眠
gopark(nil, nil, waitReasonSleep, traceBlockSleep, 1)
// 被唤醒
} else {
// 挂起休眠,挂起前调用resetForSleep
gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1)
// 被唤醒
}
}

// 不能直接在timeSleep内部调用reset,G挂起前会有一个小的时间间隔,有竟态
func resetForSleep(gp *g, _ unsafe.Pointer) bool {
// 调用modify函数更新when和period
gp.timer.reset(gp.sleepWhen, 0)
return true
}

After & Tick

创建定时器,返回定时器的channel,属于NewTimer/NewTicker函数的封装。go1.22及之前的版本中,如果在for循环使用After会申请大量内存,加剧GC压力

1
2
3
4
5
6
7
8
9
10
11
12
// 一次性定时器
func After(d Duration) <-chan Time {
return NewTimer(d).C
}

// 周期性定时器
func Tick(d Duration) <-chan Time {
if d <= 0 {
return nil
}
return NewTicker(d).C
}

定时器相关

停止定时器

停止定时器,因为定时器可能被其他P持有,只修改状态。具体逻辑如下

  1. 如果是异步定时器,判断定时器是否需要触发执行函数f
  2. 定时器字段更新
    • 更新定时器状态state的timerZombie位
    • 重置when
    • 更新版本计数器
    • 将状态state复制到astate上(到这里就解锁了)
    • 如果是同步定时器,清空channel(t.arg)的buf缓冲区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// 一次性定时器
func (t *Timer) Stop() bool {
// 未初始化
if !t.initTimer {
panic("time: Stop called on uninitialized Timer")
}
return stopTimer(t)
}

// 周期性定时器
func (t *Ticker) Stop() {
if !t.initTicker {
return
}
stopTimer((*Timer)(unsafe.Pointer(t)))
}

// 停止定时器
func stopTimer(t *timeTimer) bool {
// 测试,忽略
if t.isFake && getg().syncGroup == nil {
panic("stop of synctest timer from outside bubble")
}
return t.stop()
}

// 停止定时器
func (t *timer) stop() bool {
// 判断是异步还是同步定时器(go1.23开始默认是同步定时器)
async := debug.asynctimerchan.Load() != 0
// 同步定时器
if !async && t.isChan {
lock(&t.sendLock)
}

t.lock()
// debug用,忽略
t.trace("stop")
// 如果是异步定时器(go1.22及以前)
if async {
// 判断定时器是否需要更新timer状态,执行函数f
t.maybeRunAsync()
}
// timer已经在最小堆中
if t.state&timerHeaped != 0 {
// 设置timerModified标志位
t.state |= timerModified
// timer无删除标记
if t.state&timerZombie == 0 {
// 标记为删除
t.state |= timerZombie
// 计数器zombies+=1
t.ts.zombies.Add(1)
}
}
pending := t.when > 0
// 重置when
t.when = 0

// 同步定时器
if !async && t.isChan {
// 版本计数器更新
t.seq++
// 一次性定时器 and 已经在调用函数f了
if t.period == 0 && t.isSending.Load() > 0 {
pending = true
}
}
// 用state字段数据更新astate,然后再解锁
t.unlock()
// 同步定时器
if !async && t.isChan {
unlock(&t.sendLock)
// 清空channel(t.arg)的buf缓冲区
if timerchandrain(t.hchan()) {
pending = true
}
}

return pending
}

重置定时器

重置定时器,本质是modify函数调用,更新timer状态、添加到最小堆、中断网络轮询。需要先调用Stop才能确保安全调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 一次性定时器
func (t *Timer) Reset(d Duration) bool {
if !t.initTimer {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
return resetTimer(t, w, 0)
}

// 周期性定时器
func (t *Ticker) Reset(d Duration) {
if d <= 0 {
panic("non-positive interval for Ticker.Reset")
}
if !t.initTicker {
panic("time: Reset called on uninitialized Ticker")
}
resetTimer((*Timer)(unsafe.Pointer(t)), when(d), int64(d))
}

// 重置定时器
func resetTimer(t *timeTimer, when, period int64) bool {
// 测试,忽略
if t.isFake && getg().syncGroup == nil {
panic("reset of synctest timer from outside bubble")
}
return t.reset(when, period)
}

func (t *timer) reset(when, period int64) bool {
// 更新timer状态、添加到最小堆、中断网络轮询
return t.modify(when, period, nil, nil, 0)
}

最小堆相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// 根据timer状态更新最小堆,定时器已停止则删除,定时器已修改则同步
func (t *timer) updateHeap() (updated bool) {
// mu加锁、STW
assertWorldStoppedOrLockHeld(&t.mu)
// debug,忽略
t.trace("updateHeap")
// 最小堆
ts := t.ts
// 最小堆为nil(没有初始化) or 当前定时器不是最小堆中最小的,异常
if ts == nil || t != ts.heap[0].timer {
badTimer()
}
// ts.mu加锁
assertLockHeld(&ts.mu)
// timer已标记删除
if t.state&timerZombie != 0 {
// 移除标志位
t.state &^= timerHeaped | timerZombie | timerModified
// 计数器zombies-=1
ts.zombies.Add(-1)
// 移除最小的timer
ts.deleteMin()
return true
}

// 当前定时器已被修改
if t.state&timerModified != 0 {
// 移除标志位
t.state &^= timerModified
// 更新when
ts.heap[0].when = t.when
// 调整最小堆,从根节点开始,一直向下交换最小的子节点
ts.siftDown(0)
// 更新minWhenHeap字段
ts.updateMinWhenHeap()
return true
}

return false
}

// 最小堆重新排序
func (ts *timers) initHeap() {
// 0或1个元素,不用排序
if len(ts.heap) <= 1 {
return
}
// 父节点idx => (len(heap)-1-1)/4
for i := int(uint(len(ts.heap)-1-1) / timerHeapN); i >= 0; i-- {
ts.siftDown(i)
}
}

// 把timer定时器加入到timers.heap最小堆中
func (ts *timers) addHeap(t *timer) {
// mu加锁、STW
assertWorldStoppedOrLockHeld(&ts.mu)
// 确保netpoll已初始化(依赖)
if netpollInited.Load() == 0 {
netpollGenericInit()
}

// timer已经放到了P的timers上
if t.ts != nil {
throw("ts set in timer")
}
// 纪录P的timers指针
t.ts = ts
// 放到最小堆末尾
ts.heap = append(ts.heap, timerWhen{t, t.when})
// 重新调整最小堆,写入timer时触发
ts.siftUp(len(ts.heap) - 1)
// 是同一个定时器
if t == ts.heap[0].timer {
// 更新minWhenHeap字段
ts.updateMinWhenHeap()
}
}

// 重新调整最小堆,写入timer时触发
func (ts *timers) siftUp(i int) {
heap := ts.heap
// 指针超过数组长度,异常
if i >= len(heap) {
badTimer()
}
// 暂存当前节点
tw := heap[i]
when := tw.when
// 异常值
if when <= 0 {
badTimer()
}
// 如果父节点的值比当前节点的值要大,交换两者
for i > 0 {
// 找父节点idx => p = (i-1) / 4
p := int(uint(i-1) / timerHeapN)
// 如果父节点的when大于等于当前节点的when,调整完毕,退出循环
if when >= heap[p].when {
break
}
// 交换父节点和当前节点
heap[i] = heap[p]
i = p
}
if heap[i].timer != tw.timer {
// 一般排序完都要执行这步才对
heap[i] = tw
}
}

// 重新调整最小堆,移除timer时触发
func (ts *timers) siftDown(i int) {
heap := ts.heap
// 指针超过数组长度,异常
n := len(heap)
if i >= n {
badTimer()
}
// 子节点idx超过数组长度
// i*4+1 >= n
if i*timerHeapN+1 >= n {
return
}
// 暂存当前节点
tw := heap[i]
when := tw.when
// 异常值
if when <= 0 {
badTimer()
}
// 如果子节点的值比当前节点的值要小,交换两者
for {
// 子节点left_idx => i*4+1
leftChild := i*timerHeapN + 1
if leftChild >= n {
break
}
w := when // 当前节点when
c := -1 // 节点索引
// 找到子节点中最小的when
// 每个节点存储最多4个数据
for j, tw := range heap[leftChild:min(leftChild+timerHeapN, n)] {
if tw.when < w {
w = tw.when
c = leftChild + j
}
}
// 没找到更小的when,退出
if c < 0 {
break
}
// 找到了,交换父子节点
heap[i] = heap[c]
i = c
}
if heap[i].timer != tw.timer {
// 一般排序完都要执行这步
heap[i] = tw
}
}

// 移除最小的timer
func (ts *timers) deleteMin() {
// ts.mu加锁
assertLockHeld(&ts.mu)
// 最小堆第一个
t := ts.heap[0].timer
// 不在同一个P上
if t.ts != ts {
throw("wrong timers")
}
// 解除绑定
t.ts = nil
// 跟最小堆最后一个元素交换
last := len(ts.heap) - 1
if last > 0 {
ts.heap[0] = ts.heap[last]
}
ts.heap[last] = timerWhen{}
// 删除最后一个元素
ts.heap = ts.heap[:last]
// 重新调整最小堆
if last > 0 {
ts.siftDown(0)
}
// 更新minWhenHeap字段
ts.updateMinWhenHeap()
// 最小堆为空
if last == 0 {
// 将minWhen设置为0
ts.minWhenModified.Store(0)
}
}

goroutine调度相关

take

销毁P时,把最小堆里的timer全部迁移走

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 把一个timers最小堆的timer搬到当前timers最小堆中。P销毁时调用
func (ts *timers) take(src *timers) {
// debug,忽略
ts.trace("take")
// STW
assertWorldStopped()
// src最小堆不为空
if len(src.heap) > 0 {
// STW 忽略顺序
for _, tw := range src.heap {
t := tw.timer
// 解除P绑定
t.ts = nil
// timer已标记删除
if t.state&timerZombie != 0 {
// 清理状态位(为什么要去修改它???)
t.state &^= timerHeaped | timerZombie | timerModified
} else {
// 清理timerModified位
t.state &^= timerModified
// 把timer定时器加入到timers.heap最小堆中
ts.addHeap(t)
}
}
// 把整个最小堆清空了
src.heap = nil
// 下面的统计数据也全部重置
src.zombies.Store(0)
src.minWhenHeap.Store(0)
src.minWhenModified.Store(0)
src.len.Store(0)
ts.len.Store(uint32(len(ts.heap)))
}
}

check

清理最小堆,把所有标记删除的timer都移除出最小堆,如果最小的timer到期,则执行回调函数f运行。调度时,寻找可运行的G时调用(findRunnable或stealWork)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 清理最小堆,把所有标记删除的timer都移除出最小堆,如果最小的timer到期,则执行回调函数f运行
func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
// debug,忽略
ts.trace("check")
// 获取最小堆的最小when
next := ts.wakeTime()
// 最小堆为空
if next == 0 {
return now, 0, false
}

if now == 0 {
now = nanotime()
}

// 标记删除的timer数量
zombies := ts.zombies.Load()
if zombies < 0 {
badTimer()
}
// 当前P and 标记删除的timer数量超过总量的1/4
force := ts == &getg().m.p.ptr().timers && int(zombies) > int(ts.len.Load())/4

// now小于最小when(下一个定时器还没准备好运行) and 非强制
if now < next && !force {
return now, next, false
}

// 下面的场景是:now大于等于最小when or 强制

ts.lock()
if len(ts.heap) > 0 {
// 清理最小堆,并重新排序(非强制)
// 那么这里需要处理的场景就是now大于等于最小when(看了adjust代码感觉跟强制也没什么区别)
ts.adjust(now, false)
// 清理后,heap不为空
for len(ts.heap) > 0 {
// 运行最小的timer,如果还没到时间就不执行
if tw := ts.run(now); tw != 0 {
// -1或0表示最小堆为空,其他表示最小timer的运行时刻
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}

// 重新检测
force = ts == &getg().m.p.ptr().timers && int(ts.zombies.Load()) > int(ts.len.Load())/4
if force {
// 清理最小堆,并重新排序(强制)
ts.adjust(now, true)
}
}
ts.unlock()

return now, pollUntil, ran
}

其他依赖方法列在下方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// 清理最小堆,并重新排序
func (ts *timers) adjust(now int64, force bool) {
// debug,忽略
ts.trace("adjust")
// ts.mu加锁
assertLockHeld(&ts.mu)
// 非强制
if !force {
// 设置了timerModified位的最小when
first := ts.minWhenModified.Load()
// 最小堆为空 or now小于最小when(下一个定时器还没准备好运行)
if first == 0 || first > now {
// debug用,忽略
if verifyTimers {
ts.verify()
}
return
}
}

// 获取最小堆的最小when
// minWhenHeap = min(minWhenHeap, minWhenModified)
ts.minWhenHeap.Store(ts.wakeTime())
// minWhenModified = 0
ts.minWhenModified.Store(0)

// 扫描整个最小堆
changed := false
for i := 0; i < len(ts.heap); i++ {
tw := &ts.heap[i]
t := tw.timer
// 不在同一个P上
if t.ts != ts {
throw("bad ts")
}

// 没有被修改也没有标记删除,无须调整
if t.astate.Load()&(timerModified|timerZombie) == 0 {
continue
}

// 1. 加锁
t.lock()
switch {
case t.state&timerHeaped == 0: // 异常状态
badTimer()

case t.state&timerZombie != 0: // timer已标记删除
// 计数器zombies-=1
ts.zombies.Add(-1)
// 移除标志位
t.state &^= timerHeaped | timerZombie | timerModified
// 总数量
n := len(ts.heap)
// 当前timer跟最小堆的最后一个交换
ts.heap[i] = ts.heap[n-1]
// 先用空的结构替换
ts.heap[n-1] = timerWhen{}
// 移除最后一个数据
ts.heap = ts.heap[:n-1]
// 表示从heap上移除
t.ts = nil
// 重新扫描当前位置的timer
i--
changed = true

case t.state&timerModified != 0: // 已修改
// 更新when
tw.when = t.when
// 移除标志位
t.state &^= timerModified
changed = true
}
t.unlock()
}

// 有调整过最小堆
if changed {
// 最小堆重新排序
ts.initHeap()
}

// 更新minWhenHeap字段
// minWhenHeap = heap[0].when
ts.updateMinWhenHeap()

// debug用,忽略
if verifyTimers {
ts.verify()
}
}

// 验证timers是最小堆
func (ts *timers) verify() {
// ts.mu加锁
assertLockHeld(&ts.mu)
// 扫描最小堆
for i, tw := range ts.heap {
// 第一个timer,没有父节点
if i == 0 {
continue
}

// 父节点,p=(i-1)/4
p := int(uint(i-1) / timerHeapN)
// 子节点比父节点的值小,异常
if tw.when < ts.heap[p].when {
print("bad timer heap at ", i, ": ", p, ": ", ts.heap[p].when, ", ", i, ": ", tw.when, "\n")
throw("bad timer heap")
}
}
// len字段验证
if n := int(ts.len.Load()); len(ts.heap) != n {
println("timer heap len", len(ts.heap), "!= atomic len", n)
throw("bad timer heap len")
}
}

// 运行最小的timer,如果还没到时间就不执行
func (ts *timers) run(now int64) int64 {
// debug,忽略
ts.trace("run")
// ts.mu加锁
assertLockHeld(&ts.mu)
Redo:
// 最小堆为空
if len(ts.heap) == 0 {
return -1
}
// 第一个/最小timer
tw := ts.heap[0]
t := tw.timer
// 不在同一个P上,异常
if t.ts != ts {
throw("bad ts")
}

// timer没有修改/标记删除 and when大于now(还没准备运行)
if t.astate.Load()&(timerModified|timerZombie) == 0 && tw.when > now {
return tw.when
}

t.lock()
// 根据timer状态更新最小堆,定时器已停止则删除,定时器已修改则同步
if t.updateHeap() {
t.unlock()
goto Redo
}

// timer不在最小堆 or 当前定时器已被修改
if t.state&timerHeaped == 0 || t.state&timerModified != 0 {
badTimer()
}

// when大于now(还没准备运行)
if t.when > now {
t.unlock()
return t.when
}

// 更新timer状态、执行函数f
t.unlockAndRun(now)
// ts重新加锁
assertLockHeld(&ts.mu)
return 0
}

timeSleepUntil

遍历所有P,找到全局最小的when。由sysmon、checkdead函数调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 遍历所有P,找到全局最小的when
func timeSleepUntil() int64 {
// 最大值2^63-1
next := int64(maxWhen)

// 确保allp切片不会被更改
lock(&allpLock)
// 遍历所有P,获取在所有P中最小的when
for _, pp := range allp {
if pp == nil {
// 可能在扩容,还没有创建好新的P
continue
}

// 获取最小堆的最小when
if w := pp.timers.wakeTime(); w != 0 {
next = min(next, w)
}
}
unlock(&allpLock)

return next
}

channel相关

虽说是跟channel相关,实际上,在执行<-t.C等待超时时,就会使用到下面的方法

maybeRunChan

判断是否需要更新timer状态、执行函数f。具体逻辑如下

  1. 不满足条件则返回
    • timer已经放在最小堆上,那么过期后自动发送到channel
    • timer从未执行过
    • timer还未到触发时刻
  2. 满足条件则更新timer状态、执行函数f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 判断是否需要更新timer状态、执行函数f
// 目前只有channel、select相关的代码在使用
func (t *timer) maybeRunChan() {
// 测试,忽略
if t.isFake {
t.lock()
var timerGroup *synctestGroup
if t.ts != nil {
timerGroup = t.ts.syncGroup
}
// 用state字段数据更新astate,然后再解锁
t.unlock()
sg := getg().syncGroup
if sg == nil {
panic(plainError("synctest timer accessed from outside bubble"))
}
if timerGroup != nil && sg != timerGroup {
panic(plainError("timer moved between synctest bubbles"))
}
return
}
// 如果timer已经在最小堆中,过期后自动发送到channel
if t.astate.Load()&timerHeaped != 0 {
return
}

t.lock()
// 当前时刻,单调时钟
now := nanotime()
// 加锁后double-check
// 1. timer已经在最小堆中
// 2. timer从未执行过
// 3. timer还未到触发时刻
if t.state&timerHeaped != 0 || t.when == 0 || t.when > now {
// debug用,忽略
t.trace("maybeRunChan-")
// 用state字段数据更新astate,然后再解锁
t.unlock()
return
}
// debug用,忽略
t.trace("maybeRunChan+")

systemstack(func() {
// 更新timer状态、执行函数f
t.unlockAndRun(now)
})
}

blockTimerChan & unblockTimerChan

1
2
3
4
5
// 创建一个10ms过期的定时器
t := time.NewTimer(10 * time.Millisecond)

// 等待定时器过期信号
<-t.C

上述示例代码,如果这个channel是属于一个定时器的,那么在G挂起前、唤醒后,需要修改定时器的state-状态、blocked-标记等。函数详细注释如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// G挂起休眠之前,将定时器标记为阻塞并加入最小堆
func blockTimerChan(c *hchan) {
t := c.timer
// 测试,忽略
if t.isFake {
return
}
t.lock()
// debug,忽略
t.trace("blockTimerChan")
// 异步定时器,异常
if !t.isChan {
badTimer()
}

// 阻塞的G数量
t.blocked++

// 1. timer已经在最小堆中
// 2. timer已标记删除
// 3. 过期时刻大于0
if t.state&timerHeaped != 0 && t.state&timerZombie != 0 && t.when > 0 {
t.state &^= timerZombie // 移除标志位
t.ts.zombies.Add(-1) // 计数器zombies-=1
}

// 是否需要把定时器放到最小堆
add := t.needsAdd()
t.unlock()
if add {
// 更新timer状态、添加到最小堆、中断网络轮询(跟modify函数有些相似)
t.maybeAdd()
}
}

// G唤醒后,将定时器标记为非阻塞、删除状态
func unblockTimerChan(c *hchan) {
t := c.timer
// 测试,忽略
if t.isFake {
return
}
t.lock()
// debug,忽略
t.trace("unblockTimerChan")
// 异步定时器 or 计数器为0,异常
if !t.isChan || t.blocked == 0 {
badTimer()
}

// 阻塞的G数量
t.blocked--

// 计数器为0 and timer已经在最小堆中 and timer无删除标记
if t.blocked == 0 && t.state&timerHeaped != 0 && t.state&timerZombie == 0 {
// 标记为删除
t.state |= timerZombie
// 计数器zombies+=1
t.ts.zombies.Add(1)
}
t.unlock()
}

定时器示例解析

  1. 创建定时器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
t := time.NewTimer(2 * time.Second) // 调用NewTimer(2 * time.Second)

// 第一次创建timer,执行newTimer,此时不会触发任何其他函数操作,包括添加到最小堆等。
// timer数据初始化后,各字段数值如下
//
// |---------------------------------|
// v |
// timer.mu channel.timer
// .f => sendTime ^
// .arg => *channel -----|
// .seq => 1
// .sendLock
// .isChan => true
// .period => 0
// .when => xyz
// .state => 0
// .astate => 0
// .init => true
  1. 当前G挂起等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<-t.C    // 调用chanrecv函数,内部调用blockTimerChan

// 将timer放到最小堆
// timers.heap |---------------------------------|
// | v |
// | timer0 | ... | timerx | <-- timer.mu channel.timer
// .f => sendTime ^
// .arg => *channel -----|
// .seq => 1
// .sendLock
// .isChan => true
// .period => 0
// .when => xyz
// .state => 1
// .astate => 1(timerHeaped)
// .init => true
// .blocked => 1
  1. 定时器到期通知

GMP调度执行findRunnable或stealWork,发现定时器过期,发送信号给channel,唤醒goroutine,流程如下

1
// sched -> findRunnable -> check -> unlockAndRun -> sendTime
  1. 当前G唤醒继续执行

当前G唤醒后,调用unblockTimerChan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 执行unblockTimerChan后,timer的数据状态大概如下
//
// |---------------------------------|
// v |
// timer.mu channel.timer
// .f => sendTime ^
// .arg => *channel -----|
// .seq => 1
// .sendLock
// .isChan => true
// .period => 0
// .when => xyz(<=now)
// .state => 0
// .astate => 5(timerHeaped|timerZombie)
// .init => true
// .blocked => 0

参考文档

Resetting timers in Go
timer 在 Golang 中可以有多精确?
论golang Timer Reset方法使用的正确姿势
#74 time.Timer 源码分析 (Go 1.14) 【 Go 夜读 】