golang系列之-channel

channel-管道,是go语言中一种常见的goroutine的通信方式

当前go版本:1.24

快速上手

示例1. 两个goroutine之间使用channel传递数据

1
2
3
4
5
6
7
8
9
10
11
message := make(chan string)

// 新goroutine
go func() {
time.Sleep(time.Second * 2)
message <- "Hello from goroutine!"
}()

// 当前goroutine
msg := <-message
fmt.Println(msg)

示例2. 使用select同时监听多个goroutine的响应数据,实际上,业务代码中一般都是跟定时器搭配使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ch1 := make(chan int)
ch2 := make(chan int)

go func() {
time.Sleep(time.Second * 1)
ch1 <- 1
}()

go func() {
time.Sleep(time.Second * 2)
ch2 <- 2
}()

for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received from ch1:", msg1)
case msg2 := <-ch2:
fmt.Println("Received from ch2:", msg2)
}
}

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type hchan struct {
qcount uint // len,元素个数
dataqsiz uint // cap,循环队列的长度
buf unsafe.Pointer // 指针,指向一个循环队列
elemsize uint16 // 元素大小
synctest bool // true if created in a synctest bubble
closed uint32 // 是否已关闭
timer *timer // 定时器,双向绑定timer
elemtype *_type // 元素类型
sendx uint // 写索引
recvx uint // 读索引
recvq waitq // 读队列
sendq waitq // 写队列
lock mutex // 锁
}

// 双向队列,sudog内部有prev和next指针
type waitq struct {
first *sudog // head
last *sudog // tail
}

创建channel

创建channel,具体逻辑如下

  1. guard,让错误尽早返回
  2. 计算创建channel所需的内存大小(header+buf)
  3. 创建channel、初始化字段数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}

return makechan(t, int(size))
}

func makechan(t *chantype, size int) *hchan {
elem := t.Elem

// 确保元素大小没有超过2^16=64KB
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}

// 如果channel有buf缓冲区
// 计算buf总大小=type_size*size
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

// 创建channel
var c *hchan
switch {
case mem == 0: // size=0或者type_size=0,无buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case !elem.Pointers(): // 元素非指针类型,一次性为header和buf申请内存
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 指针类型,分别为header和buf申请内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
// synctest,忽略
if getg().syncGroup != nil {
c.synctest = true
}
// 锁优先级设置,见src/runtime/lockrank.go
lockInit(&c.lock, lockRankHchan)

// debug,忽略
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}

// 获取buf可写入的地址
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

// buf是否已满
func full(c *hchan) bool {
// 容量为0
if c.dataqsiz == 0 {
// 有G在等待读数据
return c.recvq.first == nil
}
// 容量不为0,确认队列是否已满
return c.qcount == c.dataqsiz
}

// buf是否为空
func empty(c *hchan) bool {
// 容量为0
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
// 异步timer(<=go1.22)
if c.timer != nil {
// 判断是否需要更新timer状态、执行函数f
c.timer.maybeRunChan()
}
// 容量不为0,确认队列是否为空
return atomic.Loaduint(&c.qcount) == 0
}

发送数据

如何发送数据到channel?当使用代码c <- x时,系统将编译为对chansend1的调用;当使用select发送数据时,编译为对selectnbsend的调用;而这两个函数最终会调用chansend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// `c <- x`
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, sys.GetCallerPC())
}

// select代码块
//
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// 编译为如下代码
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, sys.GetCallerPC())
}

chansend代码如下,具体逻辑为

  1. select场景,非阻塞
    • 如果channel 已关闭,异常
    • 如果channel 未初始化或buf已满,发送失败,返回
  2. c <- v场景,阻塞
    • 如果channel 未初始化或已关闭,异常
  3. 共同逻辑
    • 加锁double-check,如果channel 已关闭,异常
    • 如果已经有读G在等待,说明buf为空,把数据给队列的第一个读G并唤醒,返回
    • buf未满,写入下一个空位置,更新索引、计数器,返回
    • buf已满,非阻塞返回写入失败,阻塞则把当前G封装到sudog放进写队列,挂起等待
    • 被唤醒后
      • 如果是因为channel 被关闭导致的唤醒,异常
      • 数据已被读G拿走,清理收尾
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// 传递数据到channel
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 1. guard
// channel未初始化
if c == nil {
// 非阻塞-select
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}

// debug,忽略
if debugChan {
print("chansend: chan=", c, "\n")
}

// 测试,忽略
if c.synctest && getg().syncGroup == nil {
panic(plainError("send on synctest channel from outside bubble"))
}

// 非阻塞-select and channel未关闭 and buf已满
if !block && c.closed == 0 && full(c) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

// 2. 写入
// 加锁
lock(&c.lock)

// 已关闭,异常
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// 已经有读G在等待,说明buf为空,把数据给队列的第一个G并唤醒
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 以下场景:0 <= buf_len <= cap

// buf未满
if c.qcount < c.dataqsiz {
// 获取buf下一个可写入的地址
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
// 写索引
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 元素个数
c.qcount++
unlock(&c.lock)
return true
}

// buf已满
// 非阻塞
if !block {
// 加上上面的channel未关闭、buf已满,可以当作是double-check
unlock(&c.lock)
return false
}

// 阻塞,挂起等待
// Ps. 下面这交叉写看的好乱
gp := getg() // 当前G
mysg := acquireSudog() // sudog
mysg.releasetime = 0 // 纪录G在channel上阻塞的耗时
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // x元素指针(c <- x)
mysg.waitlink = nil // ?
mysg.g = gp // 把当前G放到sudog的g,双向绑定
mysg.isSelect = false // 非select操作
mysg.c = c // channel
gp.waiting = mysg // 把sudog放到当前G的waiting,双向绑定
gp.param = nil // 重置sudog指针
c.sendq.enqueue(mysg) // 把sudog放进队列
gp.parkingOnChan.Store(true) // 是否阻塞在channel
reason := waitReasonChanSend
if c.synctest {
reason = waitReasonSynctestChanSend
}

// 挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)
// 被唤醒

// 确保x元素还活着
KeepAlive(ep)

// 双向绑定异常
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil // 当前G移除sudog绑定
gp.activeStackChans = false // 是否在等待channel操作
closed := !mysg.success // 关闭channel时设置为false,其他情况为true
gp.param = nil
if mysg.releasetime > 0 {
// 纪录阻塞事件
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil // 移除channel绑定
releaseSudog(mysg) // 删除sudog
// 因为channel被关闭而导致的唤醒
if closed {、
// 状态不匹配
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 已关闭channel不可写入
panic(plainError("send on closed channel"))
}
return true
}

// 发送元素value给读G并将其唤醒,buf为空才会走到这里
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// synctest,忽略
if c.synctest && sg.g.syncGroup != getg().syncGroup {
unlockf()
panic(plainError("send on synctest channel from outside bubble"))
}
// sg => 从recvq拿到的,ep => x元素指针(c <- x)
// y元素指针(y := <- c),如果是丢弃数值则不处理 => <- c
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}

// 读G
gp := sg.g
unlockf()
// 纪录读sudog写到param
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒读G
// 将g放到本地队列,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// sg => 从recvq获取的
// 元素指针
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
// 复制数据
memmove(dst, src, t.Size_)
}

// goroutine被挂起休眠之前调用
func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
gp.activeStackChans = true // 是否在等待channel操作
gp.parkingOnChan.Store(false) // 是否阻塞在channel
unlock((*mutex)(chanLock)) // 解锁
return true
}

// 把sudog放进队列
func (q *waitq) enqueue(sgp *sudog) {
sgp.next = nil
x := q.last
// 队列为空
if x == nil {
sgp.prev = nil
q.first = sgp
q.last = sgp
return
}
sgp.prev = x
x.next = sgp
q.last = sgp
}

// 把sudog移出队列
func (q *waitq) dequeue() *sudog {
for {
// 从头部开始
sgp := q.first
// 队列为空
if sgp == nil {
return nil
}
// next
y := sgp.next
// 队列只有一个数据
if y == nil {
q.first = nil
q.last = nil
} else {
// 队列有多个数据
y.prev = nil
q.first = y
sgp.next = nil // mark as removed (see dequeueSudoG)
}

// 找到了一个sudog,但还有一个问题需要判断
// G被select唤醒时,可能还未被移出队列
if sgp.isSelect {
if !sgp.g.selectDone.CompareAndSwap(0, 1) {
// We lost the race to wake this goroutine.
continue
}
}

return sgp
}
}

接收数据

如何从channel接收数据?当使用代码<- c时,系统将根据返回值编译为对chanrecv1或chanrecv2的调用;当使用select接收数据时,编译为对selectnbrecv的调用;而这三个函数最终会调用chanrecv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// `<- c` 或 `y := <- c`
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

// `y, ok := <- c`
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

// select代码块
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// 编译为如下代码
//
// if selected, ok = selectnbrecv(&v, c); selected {
// ... foo
// } else {
// ... bar
// }
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}

chanrecv代码如下,具体逻辑为

  1. select场景,非阻塞
    • 如果 channel 未初始化或已关闭或buf为空,无数据,返回
  2. c <- v场景,阻塞
    • 如果channel 未初始化,异常
  3. 共同逻辑
    • 加锁double-check
    • 如果channel 已关闭且buf为空,无数据,返回
    • 如果已经有写G在等待,说明buf已满,读取队列的第一个写G并唤醒,返回
    • buf不为空,读循环队列,更新索引、计数器,返回
    • buf为空,非阻塞则返回读取失败,阻塞则把当前G封装到sudog放进读队列,挂起等待
      • 数据已从写G读到,清理收尾
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 1. guard
// debug,忽略
if debugChan {
print("chanrecv: chan=", c, "\n")
}

// channel未初始化
if c == nil {
// 非阻塞
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}

// 测试,忽略
if c.synctest && getg().syncGroup == nil {
panic(plainError("receive on synctest channel from outside bubble"))
}

// 异步timer(<=go1.22)
if c.timer != nil {
// 判断是否需要更新timer状态、执行函数f
c.timer.maybeRunChan()
}

// 非阻塞 and buf为空
if !block && empty(c) {
// channel未关闭
if atomic.Load(&c.closed) == 0 {
return
}
// 已关闭
// double-check
if empty(c) {
// `y := <- c` => ep不为nil,`<- c` => ep为nil
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

// 2. 读取
// 加锁
lock(&c.lock)

// 已关闭
if c.closed != 0 {
// buf为空
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// channel已关闭但buf还有数据 0 < buf_len <= cap
} else {
// 已经有写G在等待,说明buf已满,读取buf第一个数据,队列的第一个写G数据补上buf然后唤醒
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// buf未满 0 <= buf_len < cap
}

// 以下场景:0 <= buf_len <= cap

// buf不为空
if c.qcount > 0 {
// 获取buf下一个可读取的地址
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// 读索引
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 元素个数
c.qcount--
unlock(&c.lock)
return true, true
}

// buf为空
// 非阻塞
if !block {
// 可以理解为加锁后的double-check
unlock(&c.lock)
return false, false
}

// 阻塞,挂起等待
gp := getg() // 当前G
mysg := acquireSudog() // 获取sudog
mysg.releasetime = 0 // 纪录G在channel上阻塞的耗时
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // y元素指针(y := <- c)
mysg.waitlink = nil // ?
gp.waiting = mysg // 把sudog放到当前G的waiting,双向绑定
mysg.g = gp // 把当前G放到sudog的g,双向绑定
mysg.isSelect = false // 非select操作
mysg.c = c // channel
gp.param = nil // 重置sudog指针
c.recvq.enqueue(mysg) // 把sudog放进队列
// 同步定时器
if c.timer != nil {
// G挂起休眠之前,将定时器标记为阻塞并加入最小堆
blockTimerChan(c)
}

gp.parkingOnChan.Store(true) // 是否阻塞在channel
reason := waitReasonChanReceive
if c.synctest {
reason = waitReasonSynctestChanReceive
}

// 挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)
// 被唤醒

// 双向绑定异常
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 同步定时器
if c.timer != nil {
// G唤醒后,将定时器标记为非阻塞、删除状态
unblockTimerChan(c)
}
gp.waiting = nil // 当前G移除sudog绑定
gp.activeStackChans = false // 是否在等待channel操作
if mysg.releasetime > 0 {
// 纪录阻塞事件
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success // 关闭channel时设置为false,其他情况为true
gp.param = nil // 重置sudog指针
mysg.c = nil // 移除channel绑定
releaseSudog(mysg) // 释放sudog
return true, success
}

// 从写G读取元素value并将其唤醒,buf已满才会走到这里
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// synctest,忽略
if c.synctest && sg.g.syncGroup != getg().syncGroup {
unlockf()
panic(plainError("receive on synctest channel from outside bubble"))
}
// sg => 从sendq拿到的,ep => y元素指针(y := <- c)
// 无缓冲channel
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
// 有缓冲channel
// buf已满
qp := chanbuf(c, c.recvx)
// 把数组第一个可读元素复制给读G
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 把写G的数据复制到这个位置上,补充数组元素
typedmemmove(c.elemtype, qp, sg.elem)
// 读索引
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 写索引,既然buf已满,读跟写的位置自然一样
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// 写G的数据已经复制到buf上了
sg.elem = nil
// 写G
gp := sg.g
unlockf()
// 写G的sudog放到param
gp.param = unsafe.Pointer(sg)
// 关闭channel时设置为false,其他情况为true
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒写G
// 将g放到本地队列,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
goready(gp, skip+1)
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// sg => 从sendq获取的
// 元素指针
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
memmove(dst, src, t.Size_)
}

关闭channel

关闭channel ,具体逻辑如下

  1. 如果channel 未初始化或已关闭,异常
  2. channel 设为已关闭closed=1
  3. 收集并唤醒所有在读写队列的G(写G会抛出异常)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func closechan(c *hchan) {
// channel未初始化
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)
// 已关闭,异常
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

// 关闭channel
c.closed = 1

// 收集所有在读写队列里的G
var glist gList

// 所有读G
for {
sg := c.recvq.dequeue()
// 没数据,退出
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}

// 所有写G
for {
sg := c.sendq.dequeue()
// 没数据,退出
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
unlock(&c.lock)

// 唤醒所有G
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// 将g放到本地队列,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
goready(gp, 3)
}
}

获取channel 数据量

当调用len(c)时,系统调用chanlen实现,具体逻辑如下

  1. 未初始化的channel 数据量为0
  2. 如果是异步timer,返回qcount的数值
  3. 如果是同步timer,返回0
  4. 其他情况一律返回channel 字段qcount的数值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func chanlen(c *hchan) int {
// channel未初始化
if c == nil {
return 0
}
// asynctimerchan=0 => 如果是go1.23版本及以后
// asynctimerchan=1 => 如果是go1.22版本及以前
async := debug.asynctimerchan.Load() != 0
// 异步timer(<=go1.22)
if c.timer != nil && async {
// 判断是否需要更新timer状态、执行函数f
c.timer.maybeRunChan()
}
// 同步timer(>=go1.23),dirty hack,让timer的len一直为0
if c.timer != nil && !async {
return 0
}
return int(c.qcount)
}

获取channel 容量

当调用cap(c)时,系统调用chancap实现,具体逻辑如下

  1. 未初始化的channel 容量为0
  2. 如果是异步timer,返回dataqsiz的数值
  3. 如果是同步timer,返回0
  4. 其他情况一律返回channel 字段dataqsiz的数值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func chancap(c *hchan) int {
// channel未初始化
if c == nil {
return 0
}
if c.timer != nil {
// asynctimerchan=0 => 如果是go1.23版本及以后
// asynctimerchan=1 => 如果是go1.22版本及以前
async := debug.asynctimerchan.Load() != 0
if async {
return int(c.dataqsiz)
}
// 同步timer(>=go1.23),dirty hack,让timer的cap一直为0
return 0
}
return int(c.dataqsiz)
}

定时器相关

maybeRunChan

判断是否需要更新timer状态、执行函数f。具体逻辑如下

  1. 不满足条件则返回
    • timer已经放在最小堆上,那么过期后自动发送到channel
    • timer从未执行过
    • timer还未到触发时刻
  2. 满足条件则更新timer状态、执行函数f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 判断是否需要更新timer状态、执行函数f
func (t *timer) maybeRunChan() {
// 测试,忽略
if t.isFake {
t.lock()
var timerGroup *synctestGroup
if t.ts != nil {
timerGroup = t.ts.syncGroup
}
// 用state字段数据更新astate,然后再解锁
t.unlock()
sg := getg().syncGroup
if sg == nil {
panic(plainError("synctest timer accessed from outside bubble"))
}
if timerGroup != nil && sg != timerGroup {
panic(plainError("timer moved between synctest bubbles"))
}
return
}
// 如果timer已经在最小堆中,过期后自动发送到channel
if t.astate.Load()&timerHeaped != 0 {
return
}

t.lock()
// 当前时刻,单调时钟
now := nanotime()
// 加锁后double-check
// 1. timer已经在最小堆中
// 2. timer从未执行过
// 3. timer还未到触发时刻
if t.state&timerHeaped != 0 || t.when == 0 || t.when > now {
// debug用,忽略
t.trace("maybeRunChan-")
// 用state字段数据更新astate,然后再解锁
t.unlock()
return
}
// debug用,忽略
t.trace("maybeRunChan+")

systemstack(func() {
// 更新timer状态、执行函数f
t.unlockAndRun(now)
})
}

blockTimerChan & unblockTimerChan

1
2
3
4
5
// 创建一个10ms过期的定时器
t := time.NewTimer(10 * time.Millisecond)

// 等待定时器过期信号
<-t.C

上述示例代码,如果这个channel是属于一个定时器的,那么在G挂起前、唤醒后,需要修改定时器的state-状态、blocked-标记等。函数详细注释如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// G挂起休眠之前,将定时器标记为阻塞并加入最小堆
func blockTimerChan(c *hchan) {
t := c.timer
// 测试,忽略
if t.isFake {
return
}
t.lock()
// debug,忽略
t.trace("blockTimerChan")
// 异步定时器,异常
if !t.isChan {
badTimer()
}

// 阻塞的G数量
t.blocked++

// 1. timer已经在最小堆中
// 2. timer已标记删除
// 3. 过期时刻大于0
if t.state&timerHeaped != 0 && t.state&timerZombie != 0 && t.when > 0 {
t.state &^= timerZombie // 移除标志位
t.ts.zombies.Add(-1) // 计数器zombies-=1
}

// 是否需要把定时器放到timers.heap
add := t.needsAdd()
t.unlock()
if add {
// 更新timer状态、添加到最小堆、中断网络轮询(跟modify函数有些相似)
t.maybeAdd()
}
}

// G唤醒后,将定时器标记为非阻塞、删除状态
func unblockTimerChan(c *hchan) {
t := c.timer
// 测试,忽略
if t.isFake {
return
}
t.lock()
// debug,忽略
t.trace("unblockTimerChan")
// 异步定时器 or 计数器为0,异常
if !t.isChan || t.blocked == 0 {
badTimer()
}

// 阻塞的G数量
t.blocked--

// 计数器为0 and timer已经在最小堆中 and timer无删除标记
if t.blocked == 0 && t.state&timerHeaped != 0 && t.state&timerZombie == 0 {
// 标记为删除
t.state |= timerZombie
// 计数器zombies+=1
t.ts.zombies.Add(1)
}
t.unlock()
}

参考文档

6.4 Channel
GopherCon 2017: Kavya Joshi - Understanding Channels
Diving Deep Into The Golang Channels.