并发情况下,如果需要等待所有的goroutine完成任务,需要使用Waitgroup等待
当前go版本:1.24
快速上手
先简单列举一个使用案例,了解Waitgroup的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package main
import ( "fmt" "sync" "time" )
func worker(id int) { fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second) fmt.Printf("Worker %d done\n", id) }
func main() { var wg sync.WaitGroup
for i := 1; i <= 5; i++ { wg.Add(1)
go func() { defer wg.Done() worker(i) }() }
wg.Wait() }
|
上述代码执行后,系统输出如下,可以看到系统等待5个goroutine完成任务后才退出
1 2 3 4 5 6 7 8 9 10 11 12 13
| go run ./main.go
|
数据结构
todo:文章图片待补充
Waitgroup的数据结构由noCopy、state、sema三个字段组成,其中state是goroutine、waiter的计数器,sema使waiter陷入等待
1 2 3 4 5 6
| type WaitGroup struct { noCopy noCopy
state atomic.Uint64 sema uint32 }
|
state的高32位:正在运行中的goroutine数量
state低低32为:等待中的goroutine数量
核心方法
WaitGroup只提供了Add、Done、Wait三个方法
Add
Add方法的大概逻辑如下
- counter+=delta
- counter小于0,数值异常
- 发现goroutine出现并发调用Add、Wait,异常
- 有正在运行的goroutine或者没有等待的goroutine,返回(成功)
- 唤醒所有等待中的goroutine
具体源代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| func (wg *WaitGroup) Add(delta int) { state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32) w := uint32(state)
if v < 0 { panic("sync: negative WaitGroup counter") }
if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }
if v > 0 || w == 0 { return }
if wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }
wg.state.Store(0)
for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) } }
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) }
|
Done
Done方法时add方法的一种封装,不多介绍,见Add
1 2 3 4
| func (wg *WaitGroup) Done() { wg.Add(-1) }
|
Wait
Wait本质是一个无限循环,一直等待sema释放
- counter==0,不需要等待,返回
- waiter_counter++,当前goroutine挂起等待
- 被唤醒后直接返回
具体源代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func (wg *WaitGroup) Wait() { for { state := wg.state.Load()
v := int32(state >> 32) w := uint32(state)
if v == 0 { return }
if wg.state.CompareAndSwap(state, state+1) { runtime_SemacquireWaitGroup(&wg.sema)
if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }
func sync_runtime_SemacquireWaitGroup(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0, waitReasonSyncWaitGroupWait) }
|
参考文档
Go sync.WaitGroup and The Alignment Problem