golang系列之-sync.RWMutex

RWMutex-读写锁,该锁可以被任意多个reader持有,或被一个writer持有。通过观察RWMutex的源代码实现,可以将RWMutex看作是FIFO队列,具体看后面的详细描述

当前go版本:1.24

快速上手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"fmt"
"sync"
)

func main() {
var size = 100
var count int
var wg sync.WaitGroup
var rwm sync.RWMutex

queue := make([]int, size)

for i := 0; i < size; i++ {
go func() {
wg.Add(1)
defer wg.Done()

rwm.Lock() // 写锁
count++ // 更新资源
rwm.Unlock()
}()
}

for i := 0; i < size; i++ {
indx := i
go func() {
wg.Add(1)
defer wg.Done()

rwm.RLock() // 读锁
queue[indx] = count // 只读
rwm.RUnlock()
}()
}

wg.Wait()

fmt.Println(count)
fmt.Println(queue)
}

数据结构

todo:文章图片待补充

RWMutex结构如下所示

1
2
3
4
5
6
7
8
// src/sync/rwmutex.go
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount atomic.Int32
readerWait atomic.Int32
}

各字段说明如下

字段 说明
w 写锁,所有的writer都会阻塞在这里
writerSem writer队列,获取w锁成功后,如果readerCount不为0,挂起等待reader释放锁
readerSem reader队列,readerCount为负时,新的reader都在这里挂起等待
readerCount 读锁/reader总数,max=2^30=1GB,为负数时表示有writer等待
readerWait 待释放锁reader总数,获取w锁成功后纪录当前reader数量,等待reader释放读锁

核心方法

RLock

读加锁,具体逻辑如下

  1. readerCount++,如果返回的结果小于0,说明有writer在等待或执行,放进readerSem等待唤醒
1
2
3
4
5
6
7
8
func (rw *RWMutex) RLock() {
// readerCount++
// readerCount < 0?有writer等待
if rw.readerCount.Add(1) < 0 {
// 放到队列里等待
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}

TryRLock

尝试获取读锁,具体逻辑如下

  1. readerCount如果小于0,说明有writer在等待或执行,返回false
  2. CAS尝试更新readerCount,成功则返回,失败则回到第一步重试,永久尝试
1
2
3
4
5
6
7
8
9
10
11
12
13
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
// readerCount < 0?有writer等待
if c < 0 {
return false
}
// CAS加锁,成功则返回,失败则再次尝试
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}

RUnlock

读解锁,具体逻辑如下

  1. readerCount减1,如果结果小于0,说明有writer在等待
  2. readerWait减1,如果结果等于0,唤醒writer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (rw *RWMutex) RUnlock() {
// readerCount < 0?有writer等待
if r := rw.readerCount.Add(-1); r < 0 {
//
rw.rUnlockSlow(r)
}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
// 重复释放锁,异常
// org_r == 0 => 无任何goroutine访问
// org_r == -rwmutexMaxReaders => 有writer无reader
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}

// writer在等待,新reader肯定不会unlock,能unlock的只有readerWait
if rw.readerWait.Add(-1) == 0 {
// readerWait归零,唤醒writer
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

Lock

写加锁,具体逻辑如下

  1. w加锁,使其它writer排队等待
  2. readerCount-=rwmutexMaxReaders,通知新的reader有writer在等待
  3. 原readerCount不为0,说明有reader在运行,纪录到readerWait
  4. 将writer放到writerSem队列,等待唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (rw *RWMutex) Lock() {
// w加锁,使其它writer排队等待
rw.w.Lock()

// 通知新的reader有writer在等待
// r = readerCount => 理想情况下为0
// readerCount -= 2^30
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders

// 纪录在运行的reader的数量到readerWait
if r != 0 && rw.readerWait.Add(r) != 0 {
// 将当前goroutine放进writerSem队列,等待唤醒
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}

TryLock

尝试换取写锁,具体逻辑如下

  1. w尝试加锁,失败返回false
  2. CAS尝试更新readerCount-=rwmutexMaxReaders,成功则返回true,失败则解锁w并返回false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (rw *RWMutex) TryLock() bool {
// w尝试加锁
if !rw.w.TryLock() {
return false
}

// 通知新的reader有writer在等待写入
// readerCount=-rwmutexMaxReaders => 期望当前reader的数量为0
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
// 失败解锁w
rw.w.Unlock()
return false
}
return true
}

Unlock

写解锁,具体逻辑如下

  1. readerCount+=rwmutexMaxReaders,使其为正数,让新的reader知道锁可用
  2. 如果readerCount不为0,说明有reader在排队,唤醒全部reader
  3. w解锁,唤醒其他排队的writer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (rw *RWMutex) Unlock() {
// 更新readerCount,让新的reader知道锁可用
r := rw.readerCount.Add(rwmutexMaxReaders)

// 重复释放锁,异常
// org_r == 0 => 无任何goroutine访问
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}

// 将readerSem队列里等待的reader全部唤醒
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}

// w解锁,唤醒其他排队的writer
rw.w.Unlock()
}