sync.Cond经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。底层实现基于信号量semaphore
当前go版本:1.24
快速上手 以下展示一个sync.Cond的使用案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package mainimport ( "fmt" "sync" ) var shared = make (map [string ]interface {})func main () { var wg sync.WaitGroup wg.Add(2 ) cond := sync.NewCond(&sync.Mutex{}) reader := func (key string ) { cond.L.Lock() for len (shared) == 0 { cond.Wait() } fmt.Println(shared[key]) cond.L.Unlock() wg.Done() } go reader("rsc1" ) go reader("rsc2" ) cond.L.Lock() shared["rsc1" ] = "foo" shared["rsc2" ] = "bar" cond.Broadcast() cond.L.Unlock() wg.Wait() }
数据结构 todo:文章图片待补充
Cond的数据结构如下
L
- 只要实现Locker接口的类型都可以,比如Mutex
notify
- 计数器也是ticket生成器,可实现等待队列的先进先出-FIFO,由内部sema使用
checker
- 用来检测是否被复制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 type Cond struct { noCopy noCopy L Locker notify notifyList checker copyChecker } type notifyList struct { wait atomic.Uint32 notify uint32 lock mutex head *sudog tail *sudog }
核心方法 NewCond 创建一个condition variable,没什么好讲的
1 2 3 4 func NewCond (l Locker) *Cond { return &Cond{L: l} }
Wait 大概逻辑如下
更新wait计数器,获得ticket,临时解锁cond,让其他goroutine可以获得ticket并挂起
将当前goroutine和ticket打包进sudog,放进队列,当前goroutine挂起陷入等待
goroutine被唤醒,加锁cond
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify, t) c.L.Lock() } func notifyListWait (l *notifyList, t uint32 ) { lockWithRank(&l.lock, lockRankNotifyList) if less(t, l.notify) { unlock(&l.lock) return } s := acquireSudog() s.g = getg() s.ticket = t s.releasetime = 0 t0 := int64 (0 ) if blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if l.tail == nil { l.head = s } else { l.tail.next = s } l.tail = s goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3 ) if t0 != 0 { blockevent(s.releasetime-t0, 2 ) } releaseSudog(s) }
Signal 大概逻辑如下
如果waiter数量为0(wait==notify),不处理
根据notify计算出待唤醒的waiter_id,并更新notify
在sudog列表查找ticket==waiter_id的goroutine,将其唤醒
如果goroutine在wait时就接到信号,那么它肯定不在sudog列表,不处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify) } func notifyListNotifyOne (l *notifyList) { if l.wait.Load() == atomic.Load(&l.notify) { return } lockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return } atomic.Store(&l.notify, t+1 ) for p, s := (*sudog)(nil ), l.head; s != nil ; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil if s.g.syncGroup != nil && getg().syncGroup != s.g.syncGroup { println ("semaphore wake of synctest goroutine" , s.g.goid, "from outside bubble" ) panic ("semaphore wake of synctest goroutine from outside bubble" ) } readyWithTime(s, 4 ) return } } unlock(&l.lock) }
Broadcast 大概逻辑如下
如果waiter数量为0(wait==notify),不处理
更新notify,使其等于wait
扫描sudog列表,逐个唤醒goroutine
如果goroutine在wait时就接到信号,那么它肯定不在sudog列表,不处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) } func notifyListNotifyAll (l *notifyList) { if l.wait.Load() == atomic.Load(&l.notify) { return } lockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock) for s != nil { next := s.next s.next = nil if s.g.syncGroup != nil && getg().syncGroup != s.g.syncGroup { println ("semaphore wake of synctest goroutine" , s.g.goid, "from outside bubble" ) panic ("semaphore wake of synctest goroutine from outside bubble" ) } readyWithTime(s, 4 ) s = next } }
参考文档 Go sync.Cond, the Most Overlooked Sync Mechanism