golang系列之-netpoll

netpoll是golang用来处理网络I/O事件的底层机制,主要通过操作系统的I/O多路复用机制如Linux的epoll、BSD的kqueue、Windows的IOCP等来实现

数据结构

核心的数据结构是pollDesc,用于存储与文件描述符相关的事件数据,一般被放入如epoll的epoll_event.data来传递信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type pollDesc struct {
_ sys.NotInHeap // 放置转换成interface{}时申请heap内存
link *pollDesc // next指针,用于pollcache链表
fd uintptr // 文件描述符
fdseq atomic.Uintptr // 计数器,类似时间戳,确保过期的消息不会被处理,只在获取/放回cache时改变
atomicInfo atomic.Uint32 // 5个状态位+fdseq(这两个数据有位交叉冲突,没搞懂)
rg atomic.Uintptr // 读状态,读G的地址也作为一种状态
wg atomic.Uintptr // 写状态,写G的地址也作为一种状态
lock mutex // 锁,保护下列字段
closing bool // 是否被移除出netpoll
rrun bool // rt-读定时器是否在运行
wrun bool // wt-写定时器是否在运行
user uint32 // cookie,linux/bsd应该没用到
rseq uintptr // 读计数器,类似fdseq,只有获取/放回cache以及设置deadline时改变
rt timer // 读定时器
rd int64 // 读过期时刻,-1为已过期
wseq uintptr // 写计数器,类似fdseq,只有获取/放回cache以及设置deadline时改变
wt timer // 写定时器
wd int64 // 写过期时刻,-1为已过期
self *pollDesc // 当前实例指针
}

// pollDesc缓存,重复使用,避免反复申请内存
type pollCache struct {
lock mutex // 锁
first *pollDesc // 链表头部指针,pollDesc指针都从头部写入 new -> old -> ...
}

pollDesc部份字段讲解如下

  1. atomicInfo是一个无符号32位整型数,每位用途如下
16bit 11bit 1bit 1bit 1bit 1bit 1bit
fdseq unused pollFDSeq pollExpiredWriteDeadline pollExpiredReadDeadline pollEventErr pollClosing

注意:fdseq占据20位数据,但在atomicInfo里,fdseq要向左移位16位,看起来是数据丢失了,没搞明白。同样有问题的还有taggedPointerPack

  1. rgwg的状态列表如下
state_name state_val description
pdNil 0 默认值
pdReady 1 io可读,下一个状态是pdNil
pdWait 2 准备挂起,下一个状态是G pointer-挂起,pdReady-io可读,pdNil-超时/关闭
G pointer 0xabc goroutine指针-挂起,下一个状态是pdReady-io可读,pdNil-超时/关闭

netpoll初始化

初始化与netpoll有关的底层资源,如epoll实例、eventfd实例等,用sync.Once限制只执行一次。逻辑如下

  1. 通用/平台无关
    • 初始化锁,包括netpollInitLock、pollcache.lock
    • 如果netpollInited为0,执行平台相关初始化,最后netpollInited设为1
  2. 平台相关(linux-epoll)
    • 生成epoll实例
    • 生成eventfd实例、封装epoll事件数据
    • 将文件描述符eventfd和事件数据添加到epoll实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// 通用/平台无关
// src/runtime/netpoll.go
var (
netpollInitLock mutex // netpoll初始化锁
netpollInited atomic.Uint32 // 判断netpoll是否已初始化

pollcache pollCache // pollDesc链表
netpollWaiters atomic.Uint32 // 挂起的goroutine数量
)

// 编译器链接为internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}

// netpoll初始化
func netpollGenericInit() {
if netpollInited.Load() == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lockInit(&pollcache.lock, lockRankPollCache)
lock(&netpollInitLock)
// 加锁后double-check
if netpollInited.Load() == 0 {
// 调用平台相关实现,如linux-epoll
netpollinit()
// 0 -> 1(已初始化)
netpollInited.Store(1)
}
unlock(&netpollInitLock)
}
}

// 平台相关-linux/epoll
// src/runtime/netpoll_epoll.go
var (
epfd int32 = -1 // epoll实例
netpollEventFd uintptr // eventfd实例
netpollWakeSig atomic.Uint32 // 标志,防止重复调用netpollBreak
)

func netpollinit() {
var errno uintptr
// 1. 创建epoll实例
// EPOLL_CLOEXEC => 安全设置,fork()创建子进程或exec()执行新程序时,关闭epoll实例
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if errno != 0 {
println("runtime: epollcreate failed with", errno)
throw("runtime: netpollinit failed")
}

// 2. 创建eventfd实例
// 2.1. 创建eventfd文件描述符
// 0 => 计数器初始值,如果为0,epollWait时会阻塞住
// EPOLL_CLOEXEC => 执行exec()时,关闭efd
// EFD_NONBLOCK => 非阻塞
efd, errno := syscall.Eventfd(0, syscall.EFD_CLOEXEC|syscall.EFD_NONBLOCK)
if errno != 0 {
println("runtime: eventfd failed with", -errno)
throw("runtime: eventfd failed")
}

// 2.2. 创建epoll事件
// EPOLLIN => 可读时通知
// 默认水平触发
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN,
}
// 事件元数据绑定
// efd指针存储在netpollEventFd,最后再存储于Data
*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollEventFd

// 3. 将eventfd文件描述符efd加入epoll实例epfd
errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, efd, &ev)
if errno != 0 {
println("runtime: epollctl failed with", errno)
throw("runtime: epollctl failed")
}
netpollEventFd = uintptr(efd)
}

netpoll添加文件描述符

将一个文件描述符(FD)或网络连接添加到I/O事件多路复用系统中,使其能够被监听,以便在该文件描述符上发生事件时被唤醒并进行相应处理。逻辑如下

  1. 生成/初始化事件元数据pd
    • 确保rg/wg重置为pdNil
    • fdseq默认为1
    • 更新atomicInfo错误标志
    • rd/wd过期时刻设置为0,rseq/wseq计数器更新
    • 绑定self
    • 重新更新atomicInfo
  2. 将文件描述符、事件数据添加到epoll实例(平台相关)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 通用/平台无关
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// 1. 生成/初始化事件元数据
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != pdNil && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != pdNil && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
if pd.fdseq.Load() == 0 {
// 修改fdseq,0-特殊用途,不能使用
pd.fdseq.Store(1)
}
pd.closing = false
// 更新atomicInfo错误标志
pd.setEventErr(false, 0)
pd.rseq++
pd.rg.Store(pdNil)
pd.rd = 0
pd.wseq++
pd.wg.Store(pdNil)
pd.wd = 0
pd.self = pd
// 更新atomicInfo字段
pd.publishInfo()
unlock(&pd.lock)

// 2. 将fd加入epoll实例(平台相关)
errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
// 返回事件元数据、错误代码
return pd, 0
}

// 平台相关
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
// 1. 创建epoll事件
// 1.1. 参数解读如下
// EPOLLIN => 可读
// EPOLLOUT => 可写
// EPOLLRDHUP => 连接被对端关闭(tcp服务器)
// EPOLLET => 边缘触发模式
var ev syscall.EpollEvent
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
// 1.2. 将事件元数据pd组装成带标签的指针
// pd指针放高48位,fdseq放低19位,这里交叉的3个位没搞懂
tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
// 1.3. 绑定
*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp

// 2. 将文件描述符fd加入epfd实例
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

为了避免代码过长影响阅读,其他依赖方法列在下方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// 获取一个可用的事件元数据pollDesc
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
// 链表为空
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
// n=4096/280=14
n := pollBlockSize / pdSize
if n == 0 {
n = 1 // 最低为1
}
// 一次性申请14个pollDesc的内存
// 注意:这里必须位于非GC内存区域,epoll/queue内部使用
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
lockInit(&pd.lock, lockRankPollDesc)
pd.rt.init(nil, nil) // 读定时器初始化
pd.wt.init(nil, nil) // 写定时器初始化
pd.link = c.first // 放到cache链表
c.first = pd
}
}
// 拿链表头部第一个pollDesc
pd := c.first
c.first = pd.link
unlock(&c.lock)
return pd
}

// 把事件元数据pollDesc放到pollCache,留待后续使用
func (c *pollCache) free(pd *pollDesc) {
// pd不能被共享
lock(&pd.lock)

// fdseq++,确保pd状态不会被设置为ready
fdseq := pd.fdseq.Load()
fdseq = (fdseq + 1) & (1<<taggedPointerBits - 1) // fdseq++,保留低19位
pd.fdseq.Store(fdseq)

// 更新atomicInfo字段
pd.publishInfo()

unlock(&pd.lock)

// 把pollDesc放回cache链表的头部
lock(&c.lock)
pd.link = c.first
c.first = pd
unlock(&c.lock)
}

// 更新atomicInfo错误标志
func (pd *pollDesc) setEventErr(b bool, seq uintptr) {
// fdseq1 => 参数值,只保留低20位
mSeq := uint32(seq & pollFDSeqMask)
// 获取atomicInfo
x := pd.atomicInfo.Load()
// fdseq2 => 事件元数据存储,atomicInfo丢弃低16位后再保留低20位
xSeq := (x >> pollFDSeq) & pollFDSeqMask
// 两者不相等
if seq != 0 && xSeq != mSeq {
return
}
// 更新错误标志失败时重试
// 1. atomicInfo的错误标志跟b不同,表示错误标志变更
// 2. 取反错误标志失败
for (x&pollEventErr != 0) != b && !pd.atomicInfo.CompareAndSwap(x, x^pollEventErr) {
// 逻辑同上,这里应为double-check
x = pd.atomicInfo.Load()
xSeq := (x >> pollFDSeq) & pollFDSeqMask
if seq != 0 && xSeq != mSeq {
return
}
}
}

// 更新atomicInfo字段
func (pd *pollDesc) publishInfo() {
var info uint32
if pd.closing { // fd被移除出netpoll
info |= pollClosing
}
if pd.rd < 0 { // 读过期时刻,-1为已过期
info |= pollExpiredReadDeadline
}
if pd.wd < 0 { // 写过期时刻,-1为已过期
info |= pollExpiredWriteDeadline
}
// fdseq低20位放到info高16位,没搞懂
info |= uint32(pd.fdseq.Load()&pollFDSeqMask) << pollFDSeq

// 旧info
x := pd.atomicInfo.Load()
// 旧info的错误标志位纪录到新的info,然后替换
for !pd.atomicInfo.CompareAndSwap(x, (x&pollEventErr)|info) {
x = pd.atomicInfo.Load()
}
}

netpoll移除文件描述符

将不再需要监听的文件描述符或网络连接从事件轮询中移除,以释放资源并停止对该描述符的轮询。逻辑如下

  1. guard,确认无异常情况
  2. 调用平台相关实现,如epoll实例,删除目标文件描述符fd
  3. 事件元数据清理
    • fdseq++,确保pd状态不会被设置为ready
    • 更新atomicInfo字段
    • 把pd事件元数据放到cache链表的头部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 通用/平台无关
// 彻底删除
func poll_runtime_pollClose(pd *pollDesc) {
// 需要先调用poll_runtime_pollUnblock
if !pd.closing {
throw("runtime: close polldesc w/o unblock")
}
// 写G/读G状态异常
wg := pd.wg.Load()
if wg != pdNil && wg != pdReady {
throw("runtime: blocked write on closing polldesc")
}
rg := pd.rg.Load()
if rg != pdNil && rg != pdReady {
throw("runtime: blocked read on closing polldesc")
}
// epoll删除fd
netpollclose(pd.fd)
// 把pollDesc放回cache链表
pollcache.free(pd)
}

// 平台相关
func netpollclose(fd uintptr) uintptr {
var ev syscall.EpollEvent
// EPOLL_CTL_DEL => 删除
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int32(fd), &ev)
}

netpoll轮询事件

通常由调度器触发,轮询所有的I/O事件并唤醒相应的goroutine。与netpollBreak搭配使用,目前用在findRunnable、startTheWorldWithSema、pollWork、sysmon函数。具体逻辑如下

  1. 计算等待事件waitms
  2. 调用syscall.EpollWait来等待事件
    • 异常情况中断轮询
    • 被中断,重新执行
  3. 遍历所有发生的事件
    • 如果Events为0,跳过
    • 如果是eventfd事件,检查类型、读取事件数据并重置事件标志
    • 其他事件
      • 检查事件类型是读/写
      • 通过fdseq和tag判断是否已处理
      • 调用netpollready处理事件

参数delay的说明如下

delay description
<0 永久阻塞
=0 非阻塞,轮训
>0 阻塞delay时长,单位ms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// 平台相关,不通用
func netpoll(delay int64) (gList, int32) {
// 确保epoll已初始化
if epfd == -1 {
return gList{}, 0
}
// 计算等待时间(epoll需要毫秒)
var waitms int32
if delay < 0 { // 无限等待
waitms = -1
} else if delay == 0 { // 立刻返回,非阻塞
waitms = 0
} else if delay < 1e6 { // 等待 1 毫秒
waitms = 1
} else if delay < 1e15 { // 转换为毫秒
waitms = int32(delay / 1e6)
} else { // 超过最大值1e15,设置1e9,约为11.5天
waitms = 1e9
}
// 128个事件
var events [128]syscall.EpollEvent
retry:
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
if errno != 0 {
// 异常
if errno != _EINTR {
println("runtime: epollwait on fd", epfd, "failed with", errno)
throw("runtime: netpoll failed")
}
// 被中断,重新执行
if waitms > 0 {
return gList{}, 0
}
goto retry
}
// 待执行的 goroutines 列表
var toRun gList
// 累计处理事件
delta := int32(0)
// 遍历所有发生的事件
for i := int32(0); i < n; i++ {
ev := events[i]
// 跳过,一般不为0
if ev.Events == 0 {
continue
}

// eventfd
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollEventFd {
// 是否可读
if ev.Events != syscall.EPOLLIN {
println("runtime: netpoll: eventfd ready for", ev.Events)
throw("runtime: netpoll: eventfd ready for something unexpected")
}
if delay != 0 {
// 读取事件数据
var one uint64
read(int32(netpollEventFd), noescape(unsafe.Pointer(&one)), int32(unsafe.Sizeof(one)))
// 重置
netpollWakeSig.Store(0)
}
continue
}

// 其他fd
var mode int32
// 可读
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'r'
}
// 可写
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 获取pollDesc以及fdseq
tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
pd := (*pollDesc)(tp.pointer()) // *pollDesc
tag := tp.tag() // fdseq
// 只在获取/放回cache时改变
// 如果fdseq不同,那么可能是过期的或重复使用的*pollDesc
if pd.fdseq.Load() == tag {
// 更新atomicInfo错误标志
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
// 调用netpollready处理事件
delta += netpollready(&toRun, pd, mode)
}
}
}
return toRun, delta
}

为了避免代码过长影响阅读,其他依赖方法列在下方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 唤醒goroutine,数据已经可读/可写
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
delta := int32(0)
var rg, wg *g
// 1. 唤醒goroutine

// rg/wg状态修改
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true, &delta)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true, &delta)
}

// 2. 将唤醒的goroutine放到toRun列表
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
// 累计唤醒的goroutine数量
return delta
}

// rg/wg状态修改
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
// 读G/写G
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// pdReady/G pointer -> pdReady
for {
old := gpp.Load()
// 已经是pdReady状态
if old == pdReady {
return nil
}
// 判断io是否可读/写,超时/取消由runtime_pollWait负责判断
if old == pdNil && !ioready {
return nil
}
new := pdNil // 解除等待
if ioready {
new = pdReady // 读/写就绪
}
// 修改状态
if gpp.CompareAndSwap(old, new) {
if old == pdWait { // pdWait
old = pdNil
} else if old != pdNil { // G pointer
*delta -= 1 // 挂起数量减一
}
return (*g)(unsafe.Pointer(old))
}
}
}

打破当前I/O轮询循环

打破当前的I/O轮询循环,使得正在等待I/O事件的goroutine能够被唤醒。与netpoll方法搭配使用,目前用在findRunnable和wakeNetPoller函数。具体逻辑如下

  1. guard,放置重复调用netpollBreak
  2. 中断信号准备
  3. 更新eventfd计数器,此时计数器的值不为0,epollWait被中断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func netpollBreak() {
// 1. 防重验证
// 标志为1,表示有其他G在执行当前函数
if !netpollWakeSig.CompareAndSwap(0, 1) {
return
}

// 2. 数据准备
// 中断信号数据
var one uint64 = 1
oneSize := int32(unsafe.Sizeof(one))

// 3. 发出信号,中断netpoll方法
for {
// 更新eventfd计数器,此时计数器的值不为0,epollWait被中断
n := write(netpollEventFd, noescape(unsafe.Pointer(&one)), oneSize)
// 成功
if n == oneSize {
break
}
// 被中断
if n == -_EINTR {
continue
}
// 资源暂时不可用,如写缓冲区满
if n == -_EAGAIN {
return
}
// 其他异常
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}

为将要挂起的G设置超时

通常在调用poll_runtime_pollWait之前使用,设置一个具体的超时时间,确保挂起的I/O操作能够在指定的时间内完成。具体逻辑如下

  1. guard,已被移除出netpoll不处理
  2. 计算读/写定时器新过期时刻、更新atomicInfo状态
  3. 修改读/写定时器
  4. rg/wg状态修改,如果有读G/写G需要唤醒,则唤醒并更新netpollWaiters计数器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
lock(&pd.lock)
// 已被移除出netpoll
if pd.closing {
unlock(&pd.lock)
return
}

// 获取读G/写G旧过期时刻
rd0, wd0 := pd.rd, pd.wd
// 是否读写都设置同一个过期时刻
combo0 := rd0 > 0 && rd0 == wd0
if d > 0 {
d += nanotime() // 获取过期时刻-单调时钟
if d <= 0 {
d = 1<<63 - 1 // 溢出时设为最大值
}
}
// 设置读G/写G新过期时刻
if mode == 'r' || mode == 'r'+'w' {
pd.rd = d
}
if mode == 'w' || mode == 'r'+'w' {
pd.wd = d
}

// 更新atomicInfo字段
pd.publishInfo()

combo := pd.rd > 0 && pd.rd == pd.wd
// 选择定时器过期时触发的函数
rtf := netpollReadDeadline
if combo {
rtf = netpollDeadline
}
// 读/写过期定时器是否在运行
if !pd.rrun {
// 没有运行rt
if pd.rd > 0 {
// 需要更新定时器过期时间
pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
pd.rrun = true
}
} else if pd.rd != rd0 || combo != combo0 {
// 有运行rt,但现在过期时刻有修改
pd.rseq++ // 版本更新
if pd.rd > 0 {
// 需要更新定时器过期时间
pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
} else {
// <=0,已过期
pd.rt.stop()
pd.rrun = false
}
}
if !pd.wrun {
// 没有运行wt
if pd.wd > 0 && !combo {
// 需要更新定时器过期时间,且读/写过期时刻不同
pd.wt.modify(pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
pd.wrun = true
}
} else if pd.wd != wd0 || combo != combo0 {
// 有运行wt,但现在过期时刻有修改
pd.wseq++ // 版本更新
if pd.wd > 0 && !combo {
// 需要更新定时器过期时间,且读/写过期时刻不同
pd.wt.modify(pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
} else {
// <=0,已过期
pd.wt.stop()
pd.wrun = false
}
}

// 累计唤醒的G数量
delta := int32(0)
var rg, wg *g
// rg/wg状态修改,ioready为false,不一定有G被唤醒
if pd.rd < 0 {
rg = netpollunblock(pd, 'r', false, &delta)
}
if pd.wd < 0 {
wg = netpollunblock(pd, 'w', false, &delta)
}
unlock(&pd.lock)
// 唤醒读G/写G
if rg != nil {
netpollgoready(rg, 3)
}
if wg != nil {
netpollgoready(wg, 3)
}
// 计数器+=delta => netpollWaiters+=delta
netpollAdjustWaiters(delta)
}

// 读写定时器同时刻过期时调用
func netpollDeadline(arg any, seq uintptr, delta int64) {
netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
}

// 读定时器过期时调用
func netpollReadDeadline(arg any, seq uintptr, delta int64) {
netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
}

// 写定时器过期时调用
func netpollWriteDeadline(arg any, seq uintptr, delta int64) {
netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
}

// 读/写定时器过期时调用
func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
lock(&pd.lock)
// 获取pollDesc当前版本的seq
currentSeq := pd.rseq
if !read {
// 读写放rseq,只写放wseq
currentSeq = pd.wseq
}
// 新旧版本号不一致,pd可能被重用或定时器reset
if seq != currentSeq {
unlock(&pd.lock)
return
}

// 累计唤醒的G数量
delta := int32(0)
var rg *g
// 读或读写
if read {
if pd.rd <= 0 || !pd.rrun {
throw("runtime: inconsistent read deadline")
}
// -1为已过期
pd.rd = -1
// 更新atomicInfo字段
pd.publishInfo()
// rg状态修改,ioready为false,不一定有goroutine被唤醒
rg = netpollunblock(pd, 'r', false, &delta)
}
var wg *g
if write {
if pd.wd <= 0 || !pd.wrun && !read {
throw("runtime: inconsistent write deadline")
}
// -1为已过期
pd.wd = -1
// 更新atomicInfo字段
pd.publishInfo()
// wg状态修改,ioready为false,不一定有goroutine被唤醒
wg = netpollunblock(pd, 'w', false, &delta)
}
unlock(&pd.lock)
// 唤醒读G/写G
if rg != nil {
netpollgoready(rg, 0)
}
if wg != nil {
netpollgoready(wg, 0)
}
// 计数器+=delta => netpollWaiters+=delta
netpollAdjustWaiters(delta)
}

为了避免代码过长影响阅读,其他依赖方法列在下方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 唤醒goroutine
func netpollgoready(gp *g, traceskip int) {
// 将g放到本地队列,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
goready(gp, traceskip+1)
}

// 计数器+=delta => netpollWaiters+=delta
func netpollAdjustWaiters(delta int32) {
if delta != 0 {
netpollWaiters.Add(delta)
}
}

func (pd *pollDesc) makeArg() (i any) {
x := (*eface)(unsafe.Pointer(&i)) // 转换成interface{}
x._type = pdType // 类型为pollDesc
x.data = unsafe.Pointer(&pd.self) // 指向pollDesc实例
return
}

重置当前G的状态

不管是因为何种原因,如果当前G需要重新进入队列进行新的轮询,就需要调用该函数进行状态重置。在执行poll_runtime_pollWait之前设置。具体逻辑如下

  1. 检测atomicInfo标志位,查看是否有异常
  2. 读G/写G状态重置为pdNil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func poll_runtime_pollReset(pd *pollDesc, mode int) int {
// 查看atomicInfo是否有错误标志
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// 读G/写G状态重置
if mode == 'r' {
pd.rg.Store(pdNil)
} else if mode == 'w' {
pd.wg.Store(pdNil)
}
return pollNoError
}

为了避免代码过长影响阅读,其他依赖方法列在下方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 查看atomicInfo是否有错误标志
func netpollcheckerr(pd *pollDesc, mode int32) int {
// atomicInfo
info := pd.info()
// pollClosing标志位是否已设置
if info.closing() {
return pollErrClosing
}
// 读过期或写过期
if (mode == 'r' && info.expiredReadDeadline()) || (mode == 'w' && info.expiredWriteDeadline()) {
return pollErrTimeout
}
// 读发生错误,写错误在写时处理,有更详细的错误信息
if mode == 'r' && info.eventErr() {
return pollErrNotPollable
}
return pollNoError
}

当前G挂起等待事件发生

将当前goroutine挂起,并等待事件通知,与poll_runtime_pollUnblock搭配使用。具体逻辑如下

  1. 检测atomicInfo标志位,查看是否有异常
  2. 尝试将rg或wg状态改为pdWait,准备挂起
  3. 调用gopark将当前goroutine挂起等待,挂起前将rg或wg状态改为goroutine指针
  4. 被唤醒后,把当前状态(已准备好还是超时)告知上层函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 通用/平台无关
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// 查看atomicInfo是否有错误标志
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// 需要使用水平触发的系统
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" || GOOS == "wasip1" {
netpollarm(pd, mode)
}

// 当前goroutine挂起休眠,唤醒后,rg/wg可读/写返回true
for !netpollblock(pd, int32(mode), false) {
// double-check
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
}
return pollNoError
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// 根据mode选择读G/写G
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// 设置为pdWait状态
for {
// 已经ready,返回
// pdReady -> pdNil
if gpp.CompareAndSwap(pdReady, pdNil) {
return true
}

// 进入等待
// pdNil -> pdWait
if gpp.CompareAndSwap(pdNil, pdWait) {
break
}

// 异常情况
if v := gpp.Load(); v != pdReady && v != pdNil {
throw("runtime: double wait")
}
}

// pdWait or atomicInfo没有错误标志
if waitio || netpollcheckerr(pd, mode) == pollNoError {
// 挂起等待,pdWait -> goroutine指针,netpollWaiters++
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
// 被唤醒
}

// 状态设为pdNil
old := gpp.Swap(pdNil)
// 如果old是goroutine的指针
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

// 挂起前处理
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// pdWait -> goroutine指针
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
// 成功
if r {
// 计数器+1 => netpollWaiters++
netpollAdjustWaiters(1)
}
return r
}

事件发生唤醒挂起的G

事件发生时,唤醒被挂起等待的goroutine,与poll_runtime_pollWait搭配使用。具体逻辑如下

  1. guard,确保不会反复unblock已被移除出netpoll的pollDesc
  2. 事件元数据pollDesc更新
    • 计数器、atomicInfo、rg/wg、读/写定时器状态更新
  3. 唤醒读G/写G,并更新netpollWaiters计数器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func poll_runtime_pollUnblock(pd *pollDesc) {
lock(&pd.lock)
// 已被移除出netpoll
if pd.closing {
throw("runtime: unblock on closing polldesc")
}
pd.closing = true
pd.rseq++
pd.wseq++
var rg, wg *g
// 更新atomicInfo字段
pd.publishInfo()
// 累计唤醒的G数量
delta := int32(0)
// rg/wg状态修改
rg = netpollunblock(pd, 'r', false, &delta)
wg = netpollunblock(pd, 'w', false, &delta)
// 停止读/写定时器
if pd.rrun {
pd.rt.stop()
pd.rrun = false
}
if pd.wrun {
pd.wt.stop()
pd.wrun = false
}
unlock(&pd.lock)
// 唤醒读G/写G
if rg != nil {
netpollgoready(rg, 3)
}
if wg != nil {
netpollgoready(wg, 3)
}
// 计数器+=delta => netpollWaiters+=delta
netpollAdjustWaiters(delta)
}

参考文档

The method to epoll’s madness
golang netpoll Explained
Linux fd 系列 — eventfd 是什么?