golang系列之-sync.Mutex

Mutex(MUTualEx)-互斥锁是一种可以保证每次只有一个goroutine访问贡献资源的方法。这个资源可以是一段程序代码、一个整数、一个map、一个struct、一个channel或其他任何东西。通过观察Mutex的源代码实现,可以将Mutex看作是一个队列(FIFO/LIFO),具体看后面的详细描述

当前go版本:1.24

快速上手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"sync"
)

type Container struct {
mu sync.Mutex
counters map[string]int
}

func (c *Container) inc(name string) {
c.mu.Lock() // 互斥锁,获取失败等待挂起
defer c.mu.Unlock() // 解锁
c.counters[name]++ // 共享资源
}

func main() {
c := Container{
counters: map[string]int{"a": 0, "b": 0},
}

var wg sync.WaitGroup

doIncrement := func(name string, n int) {
for i := 0; i < n; i++ {
c.inc(name)
}
wg.Done()
}

wg.Add(3)
go doIncrement("a", 10000)
go doIncrement("a", 10000)
go doIncrement("b", 10000)

wg.Wait()
fmt.Println(c.counters)
}

上述代码运行结果如下

1
2
3
4
go run ./main.go

# 输出如下
# map[a:20000 b:10000]

数据结构

todo:文章图片待补充

sync.Mutex提供的数据结构跟方法只是一层封装,实际的代码放在src/internal/sync/mutex.go

Mutex的数据结构由state、sema字段组成,其中state是由三个状态位和一个29位的waiter计数器组成,sema使waiter陷入等待

1
2
3
4
5
6
7
8
9
// src/internal/sync/mutex.go
type Mutex struct {
// 由29位的waiter_counter和3个状态位组成,结构如下
// | <- 29bit -> | 1bit | 1bit | 1bit |
// | waiter_counter | mutexStarving | mutexWoken | mutexLocked |
state int32
// semaphore
sema uint32
}

模式

mutex分为Normal-普通模式和Starvation-饥饿模式,两个模式的区别如下

普通模式

  1. 新的goroutine不会进入队列排队等待,而是会尝试通过自旋抢占锁,排队中的goroutine会陷入更长时间的等待
  2. 当goroutine释放锁时,因为支持锁被抢占(避免CPU上下文切换)的缘故,不一定会唤醒其他goroutine

饥饿模式

  1. 禁止自旋抢占,在goroutine释放锁时,一定会唤醒队列中的其他goroutine

注意

只有被唤醒woken的goroutine才能等待锁释放,其他都要挂起排队等待被唤醒

状态位

mutex的三个状态位说明如下

mutexLocked

第1位,为0表示未锁定,为1表示已锁定

mutexWoken

第2位,只在普通模式使用,有woken位才能等待锁释放,该位可被新goroutine抢占。注意:goroutine在Unlock时如果判断有该标志位则不能唤醒队列中的goroutine

mutexStarving

第3位,goroutine等待超过1ms时设置该位,如果当前G的等待时间小于1ms或者是队列中最后一个排队的G则取消该位。该位被设置时的具体行为见上面饥饿模式的介绍

核心方法

Mutex的方法有Lock、Unlock、TryLock,具体如下

Lock

大概逻辑如下

  1. CAS获取锁,成功返回
  2. 满足条件则进入自旋等待锁释放
  3. 更新state
    • 获取锁成功,返回(普通模式)
    • 失败则挂起等待唤醒
    • 唤醒后
      • 如果不是饥饿模式,回到第2步重试
      • 如果是饥饿模式,直接获取锁返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
func (m *Mutex) Lock() {
// 设置locked位成功,返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64 // 用于计算等待耗时
starving := false // 排队等待超过1ms,进入饥饿模式
awoke := false // 普通模式自旋抢占锁
iter := 0 // 自旋计数器,最多4次,被唤醒后重置
old := m.state //

for {
// 1. 普通模式,进入自旋等待锁的释放
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 尝试抢占woken位,让Unlock方法不要唤醒其他goroutine
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 进入自旋等待
runtime_doSpin()
iter++
old = m.state
continue
}

// 2. 其他情况:饥饿模式 or 锁可用 or 自旋次数过多

new := old
// 普通模式
if old&mutexStarving == 0 {
new |= mutexLocked // 饥饿模式下不能动这个位
}

// 饥饿模式 or 锁被占用
if old&(mutexLocked|mutexStarving) != 0 {
// 只要没拿到锁就等待,waiter_counter++
new += 1 << mutexWaiterShift
}

// 当前G陷入饥饿 and 锁被占用
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 已经是饥饿模式的话不用再设置
}

// 普通模式下被唤醒,清除woken位
if awoke {
// 异常
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除woken位
new &^= mutexWoken
}

if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 普通模式 获取锁成功
if old&(mutexLocked|mutexStarving) == 0 {
break
}

// 队列进出策略是LIFO还是FIFO
// LIFO:如果被挂起等待好几次了,放队列前面
// FIFO:新的goroutine,放队列后面
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}

// 尝试获取m.sema,失败则挂起等待
runtime_SemacquireMutex(&m.sema, queueLifo, 2)
// 被唤醒

// 计算耗时,如果当前goroutine等待超过1ms,进入饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

// 唤醒后再一次读取state
old = m.state

// 饥饿模式
if old&mutexStarving != 0 {
// Unlock已经将mutexLocked位设置为0
// 饥饿模式下,Unlock唤醒goroutine时不会设置mutexWoken位,也不会更新waiter数量,由当前goroutine处理
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}

// delta = 1-2^3 => -7 (0b00000111)
delta := int32(mutexLocked - 1<<mutexWaiterShift)

// 当前G运行不超过1ms or 当前G是最后一个waiter
if !starving || old>>mutexWaiterShift == 1 {
// delta -= 4 (0b00001011)
delta -= mutexStarving
}

// 初始状态-delta -> 结果状态
//
// 示例1:饥饿模式+两个排队的goroutine
// 10 100 - 00 111(7) -> 01 101
//
// 示例2:饥饿模式+1个排队的goroutine
// 01 100 - 01 011(11) -> 00 001
atomic.AddInt32(&m.state, delta)
// 获取锁成功
break
}

// 以下两个字段影响普通模式,对饥饿模式没影响
awoke = true // 唤醒后设置为true,因为woken标志位在goroutine唤醒时已设置,见Unlock
iter = 0 // 可以重新进入自旋
} else {
// 更新失败,再一次读取state
old = m.state
}
}
}

//go:linkname internal_sync_runtime_canSpin internal/sync.runtime_canSpin
//go:nosplit
func internal_sync_runtime_canSpin(i int) bool {
// 1. 自旋次数不能超过4次
// 2. cpu只有一个的话不能自旋
// 3. 不能有大量的空闲、自旋的goroutine
if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 {
return false
}
// 当前P的runq不为空,不应该自旋
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

//go:linkname internal_sync_runtime_SemacquireMutex internal/sync.runtime_SemacquireMutex
func internal_sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes, waitReasonSyncMutexLock)
}

TryLock

大概逻辑如下

  1. 获取state判断,如果锁被其他goroutine获取则返回false
  2. 锁可以被获取则使用CAS尝试获得锁,失败返回false,成功返回true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (m *Mutex) TryLock() bool {
old := m.state
// 锁被其他goroutine获取 or 饥饿模式
if old&(mutexLocked|mutexStarving) != 0 {
return false
}

// 设置locked位
// 直接return不行?
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}

return true
}

Unlock

大概逻辑如下

  1. state-1,非常理想的情况下,如没有排队的goroutine,则state为0
  2. 如果unlock了一个unlocked的mutex,异常,不允许这么做
  3. 普通模式下因为支持抢占的原因,需要根据woken标记判断是否要唤醒队列中的goroutine
  4. 饥饿模式下直接唤醒一个goroutine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (m *Mutex) Unlock() {
// locked置0
new := atomic.AddInt32(&m.state, -mutexLocked)
// 有waiter or 饥饿模式
if new != 0 {
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
// 异常,不能unlock已经unlocked的mutex
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}

// 普通模式
if new&mutexStarving == 0 {
old := new
for {
// 判断是否要去唤醒goroutine
// 1. waiter数量为0,不用唤醒
// 2. 锁已经被其他goroutine获取
// 3. 如果有goroutine被唤醒,那么不再唤醒其他
// 4. 饥饿模式不走这个逻辑
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}

// waiter数量减一 并 设置woken位
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 唤醒一个goroutine
runtime_Semrelease(&m.sema, false, 2)
return
}
// 失败重试
old = m.state
}
} else {
// 饥饿模式,没有减少counter,也没有设置woken位
// 唤醒一个goroutine
runtime_Semrelease(&m.sema, true, 2)
}
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}

参考文档

Go sync.Mutex: Normal and Starvation Mode