golang系列之-sync.Cond

sync.Cond经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。底层实现基于信号量semaphore

当前go版本:1.24

快速上手

以下展示一个sync.Cond的使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"sync"
)

var shared = make(map[string]interface{})

func main() {
var wg sync.WaitGroup
wg.Add(2)

cond := sync.NewCond(&sync.Mutex{})

// reader 打印shared[key]
reader := func(key string) {
cond.L.Lock()
// 等待,直到shared有数据
for len(shared) == 0 {
cond.Wait() // Wait内部会暂时解锁/加锁
}
fmt.Println(shared[key])
cond.L.Unlock()
wg.Done()
}

// 创建两个goroutine
go reader("rsc1")
go reader("rsc2")

// writer
cond.L.Lock()
// 写入
shared["rsc1"] = "foo"
shared["rsc2"] = "bar"
// 通知所有goroutine
cond.Broadcast()
cond.L.Unlock()

wg.Wait()
}

数据结构

todo:文章图片待补充

Cond的数据结构如下

  • L - 只要实现Locker接口的类型都可以,比如Mutex
  • notify - 计数器也是ticket生成器,可实现等待队列的先进先出-FIFO,由内部sema使用
  • checker - 用来检测是否被复制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// sync.Cond结构体,看notify就行了
type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker // mutex,由外部传入

notify notifyList // 计数器+sudog链表,内部sema使用
checker copyChecker // 检查Cond是否被复制
}

type notifyList struct {
wait atomic.Uint32 // 等待g数量,只增不减

notify uint32 // 唤醒g数量,范围[0,wait],保证waiter先进先出

lock mutex // 锁
// 下面是等待g的链表的头部跟尾部
head *sudog // 指向第一个sudog
tail *sudog // 指向最后一个sudog
}

核心方法

NewCond

创建一个condition variable,没什么好讲的

1
2
3
4
// 没什么好说的
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

Wait

大概逻辑如下

  1. 更新wait计数器,获得ticket,临时解锁cond,让其他goroutine可以获得ticket并挂起
  2. 将当前goroutine和ticket打包进sudog,放进队列,当前goroutine挂起陷入等待
  3. goroutine被唤醒,加锁cond
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 使用示例
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
func (c *Cond) Wait() {
// 上层业务逻辑已加锁 => cond.L.Lock()

// 检查是否被复制
c.checker.check()

// wait计数器更新
// t = (c.notify.wait+=1) - 1
t := runtime_notifyListAdd(&c.notify)

// 解锁,其他g现在可以调用Wait了
c.L.Unlock()

// 将当前g挂起,等待唤醒(函数看下方notifyListWait)
runtime_notifyListWait(&c.notify, t)
// 被唤醒

// 恢复加锁,由上层业务逻辑负责继续解锁 => cond.L.Unlock()
c.L.Lock()
}

// runtime_notifyListWait函数
// sudog纪录g放入链表,g让出CPU挂起等待唤醒
func notifyListWait(l *notifyList, t uint32) {
// 加锁
lockWithRank(&l.lock, lockRankNotifyList)

// t是等待计数器wait,每次唤醒时notify计数器+1,任何wait<notify的id说明该g已被/应被唤醒
if less(t, l.notify) {
unlock(&l.lock)
return
}

// 获取sudog
s := acquireSudog()
// 封装当前g
s.g = getg()
// ticket可以纪录一个uint32的数
s.ticket = t
// 纪录g阻塞耗时
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
// 把sudog加到链表
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 当前g让出CPU,g0执行调度运行其他g
// 在内部g、m解除绑定后会解锁l.lock
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)
if t0 != 0 {
// 纪录阻塞事件
blockevent(s.releasetime-t0, 2)
}
// 释放sudog
releaseSudog(s)
}

Signal

大概逻辑如下

  1. 如果waiter数量为0(wait==notify),不处理
  2. 根据notify计算出待唤醒的waiter_id,并更新notify
  3. 在sudog列表查找ticket==waiter_id的goroutine,将其唤醒
  4. 如果goroutine在wait时就接到信号,那么它肯定不在sudog列表,不处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 唤醒一个g
func (c *Cond) Signal() {
c.checker.check()
// 按顺序唤醒一个g(看下方notifyListNotifyOne)
runtime_notifyListNotifyOne(&c.notify)
}

// runtime_notifyListNotifyOne函数
func notifyListNotifyOne(l *notifyList) {
// 每次唤醒时notify计数器+1,任何wait<notify的id说明该g已被/应被唤醒
if l.wait.Load() == atomic.Load(&l.notify) {
return
}

// 加锁
lockWithRank(&l.lock, lockRankNotifyList)

// 应被唤醒的g的id
t := l.notify
// wait计数器==notify计数器,g已被唤醒
if t == l.wait.Load() {
unlock(&l.lock)
return
}

// notify计数器+=1
atomic.Store(&l.notify, t+1)

// 遍历sudog链表
// prev,curr = nil, l.head
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
// ticket相同,找到了
if s.ticket == t {
// 下一个sudog
n := s.next
// prev不为nil,将其与next相连接
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
// 解锁
unlock(&l.lock)
// 当前sudog的一些字段都要重置
s.next = nil
// 测试,忽略
if s.g.syncGroup != nil && getg().syncGroup != s.g.syncGroup {
println("semaphore wake of synctest goroutine", s.g.goid, "from outside bubble")
panic("semaphore wake of synctest goroutine from outside bubble")
}
// 纪录releasetime,调用goready唤醒g
// 将g放到本地队列,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
readyWithTime(s, 4)
return
}
}
// 没找到sudog
unlock(&l.lock)
}

Broadcast

大概逻辑如下

  1. 如果waiter数量为0(wait==notify),不处理
  2. 更新notify,使其等于wait
  3. 扫描sudog列表,逐个唤醒goroutine
  4. 如果goroutine在wait时就接到信号,那么它肯定不在sudog列表,不处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 
func (c *Cond) Broadcast() {
c.checker.check()
// 唤醒所有goroutine(看下方notifyListNotifyAll)
runtime_notifyListNotifyAll(&c.notify)
}

//
func notifyListNotifyAll(l *notifyList) {
// 每次唤醒时notify计数器+1,任何wait<notify的id说明该g已被/应被唤醒
if l.wait.Load() == atomic.Load(&l.notify) {
return
}

// 加锁
lockWithRank(&l.lock, lockRankNotifyList)
// 获取整个sudog链表
s := l.head
// 重置head和tail
l.head = nil
l.tail = nil

// 使两个计数器数值相等
atomic.Store(&l.notify, l.wait.Load())
// 解锁
unlock(&l.lock)

// 遍历sudog链表
for s != nil {
next := s.next
// 重置
s.next = nil
// 测试
if s.g.syncGroup != nil && getg().syncGroup != s.g.syncGroup {
println("semaphore wake of synctest goroutine", s.g.goid, "from outside bubble")
panic("semaphore wake of synctest goroutine from outside bubble")
}
// 纪录releasetime,调用goready唤醒g
// 将g放到本地队列,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
readyWithTime(s, 4)
s = next
}
}

参考文档

Go sync.Cond, the Most Overlooked Sync Mechanism