golang系列之-sync.Waitgroup

并发情况下,如果需要等待所有的goroutine完成任务,需要使用Waitgroup等待

当前go版本:1.24

快速上手

先简单列举一个使用案例,了解Waitgroup的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"sync"
"time"
)

func worker(id int) {
fmt.Printf("Worker %d starting\n", id)

time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}

func main() {
var wg sync.WaitGroup

for i := 1; i <= 5; i++ {
// counter++
wg.Add(1)

go func() {
// counter--
defer wg.Done()
worker(i)
}()
}

// wait
wg.Wait()
}

上述代码执行后,系统输出如下,可以看到系统等待5个goroutine完成任务后才退出

1
2
3
4
5
6
7
8
9
10
11
12
13
go run ./main.go

# 输出如下
# Worker 5 starting
# Worker 2 starting
# Worker 1 starting
# Worker 3 starting
# Worker 4 starting
# Worker 4 done
# Worker 5 done
# Worker 2 done
# Worker 3 done
# Worker 1 done

数据结构

todo:文章图片待补充

Waitgroup的数据结构由noCopy、state、sema三个字段组成,其中state是goroutine、waiter的计数器,sema使waiter陷入等待

1
2
3
4
5
6
type WaitGroup struct {
noCopy noCopy

state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32 // semaphore
}

state的高32位:正在运行中的goroutine数量
state低低32为:等待中的goroutine数量

核心方法

WaitGroup只提供了Add、Done、Wait三个方法

Add

Add方法的大概逻辑如下

  1. counter+=delta
  2. counter小于0,数值异常
  3. 发现goroutine出现并发调用Add、Wait,异常
  4. 有正在运行的goroutine或者没有等待的goroutine,返回(成功)
  5. 唤醒所有等待中的goroutine

具体源代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)

v := int32(state >> 32) // counter:还在运行中的goroutine数量
w := uint32(state) // waiter_counter:等待中的goroutine数量

// counter不能为负,只能大于等于0
if v < 0 {
panic("sync: negative WaitGroup counter")
}

// 异常,这个场景不好想象,先忽略
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}

// 有正在运行的goroutine or 没有等待的goroutine
if v > 0 || w == 0 {
return
}

// 有等待中的goroutine

// 异常
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}

// 一次性将waiter_counter清零
wg.state.Store(0)

// 唤醒所有等待中的goroutine
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}

Done

Done方法时add方法的一种封装,不多介绍,见Add

1
2
3
4
// counter--
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

Wait

Wait本质是一个无限循环,一直等待sema释放

  1. counter==0,不需要等待,返回
  2. waiter_counter++,当前goroutine挂起等待
  3. 被唤醒后直接返回

具体源代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Wait blocks until the [WaitGroup] counter is zero.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()

v := int32(state >> 32) // counter:还在运行中的goroutine数量
w := uint32(state) // waiter_counter:等待中的goroutine数量

// 没有在运行中的goroutine,不需要等待
if v == 0 {
return
}

// waiter_counter++
if wg.state.CompareAndSwap(state, state+1) {
// 尝试获取wg.sema,失败则挂起等待
runtime_SemacquireWaitGroup(&wg.sema)
// 被唤醒

// 异常
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}

//go:linkname sync_runtime_SemacquireWaitGroup sync.runtime_SemacquireWaitGroup
func sync_runtime_SemacquireWaitGroup(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0, waitReasonSyncWaitGroupWait)
}

参考文档

Go sync.WaitGroup and The Alignment Problem