Mutex(MUTualEx)-互斥锁是一种可以保证每次只有一个goroutine访问贡献资源的方法。这个资源可以是一段程序代码、一个整数、一个map、一个struct、一个channel或其他任何东西。通过观察Mutex的源代码实现,可以将Mutex看作是一个队列(FIFO/LIFO),具体看后面的详细描述
当前go版本:1.24
快速上手
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package main
import ( "fmt" "sync" )
type Container struct { mu sync.Mutex counters map[string]int }
func (c *Container) inc(name string) { c.mu.Lock() defer c.mu.Unlock() c.counters[name]++ }
func main() { c := Container{ counters: map[string]int{"a": 0, "b": 0}, }
var wg sync.WaitGroup
doIncrement := func(name string, n int) { for i := 0; i < n; i++ { c.inc(name) } wg.Done() }
wg.Add(3) go doIncrement("a", 10000) go doIncrement("a", 10000) go doIncrement("b", 10000)
wg.Wait() fmt.Println(c.counters) }
|
上述代码运行结果如下
数据结构
todo:文章图片待补充
sync.Mutex提供的数据结构跟方法只是一层封装,实际的代码放在src/internal/sync/mutex.go
Mutex的数据结构由state、sema字段组成,其中state是由三个状态位和一个29位的waiter计数器组成,sema使waiter陷入等待
1 2 3 4 5 6 7 8 9
| type Mutex struct { state int32 sema uint32 }
|
模式
mutex分为Normal-普通模式和Starvation-饥饿模式,两个模式的区别如下
普通模式
- 新的goroutine不会进入队列排队等待,而是会尝试通过自旋抢占锁,排队中的goroutine会陷入更长时间的等待
- 当goroutine释放锁时,因为支持锁被抢占(避免CPU上下文切换)的缘故,不一定会唤醒其他goroutine
饥饿模式
- 禁止自旋抢占,在goroutine释放锁时,一定会唤醒队列中的其他goroutine
注意
只有被唤醒woken的goroutine才能等待锁释放,其他都要挂起排队等待被唤醒
状态位
mutex的三个状态位说明如下
mutexLocked
第1位,为0表示未锁定,为1表示已锁定
mutexWoken
第2位,只在普通模式使用,有woken位才能等待锁释放,该位可被新goroutine抢占。注意:goroutine在Unlock时如果判断有该标志位则不能唤醒队列中的goroutine
mutexStarving
第3位,goroutine等待超过1ms时设置该位,如果当前G的等待时间小于1ms或者是队列中最后一个排队的G则取消该位。该位被设置时的具体行为见上面饥饿模式的介绍
核心方法
Mutex的方法有Lock、Unlock、TryLock,具体如下
Lock
大概逻辑如下
- CAS获取锁,成功返回
- 满足条件则进入自旋等待锁释放
- 更新state
- 获取锁成功,返回(普通模式)
- 失败则挂起等待唤醒
- 唤醒后
- 如果不是饥饿模式,回到第2步重试
- 如果是饥饿模式,直接获取锁返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
| func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } m.lockSlow() }
func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state
for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue }
new := old if old&mutexStarving == 0 { new |= mutexLocked }
if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift }
if starving && old&mutexLocked != 0 { new |= mutexStarving }
if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken }
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break }
queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }
runtime_SemacquireMutex(&m.sema, queueLifo, 2)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") }
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving }
atomic.AddInt32(&m.state, delta) break }
awoke = true iter = 0 } else { old = m.state } } }
func internal_sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
func internal_sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes, waitReasonSyncMutexLock) }
|
TryLock
大概逻辑如下
- 获取state判断,如果锁被其他goroutine获取则返回false
- 锁可以被获取则使用CAS尝试获得锁,失败返回false,成功返回true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (m *Mutex) TryLock() bool { old := m.state if old&(mutexLocked|mutexStarving) != 0 { return false }
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) { return false }
return true }
|
Unlock
大概逻辑如下
- state-1,非常理想的情况下,如没有排队的goroutine,则state为0
- 如果unlock了一个unlocked的mutex,异常,不允许这么做
- 普通模式下因为支持抢占的原因,需要根据woken标记判断是否要唤醒队列中的goroutine
- 饥饿模式下直接唤醒一个goroutine
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| func (m *Mutex) Unlock() { new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new) } }
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }
if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return }
new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 2) return } old = m.state } } else { runtime_Semrelease(&m.sema, true, 2) } }
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) }
|
参考文档
Go sync.Mutex: Normal and Starvation Mode