golang系列之-sync.Map

map不支持并发读写,但我们可以转变下思路,将value改为一个指向结构体entry的指针,结构体内部的字段我们是可以随意修改的,如下,将并发读写map改为并发读map,读写转移到entry

1
2
3
4
5
type entry struct {
p any
}

var m map[string]*entry

上面的方法看起来解决了并发读写的问题,但还不够,当有新的key写入时,还是变回了原来的map并发读写。sync.Map提供了一种思路,使用两个map,read负责已有key的并发读写,dirty负责新key的读写,只有当read找不到key,才去找dirty。

现在还剩最后一个问题,read和dirty如何保证数据一致/同步,我们可以改造entry,使其指向value的指针,如此一来,read和dirty的entry可以指向同一个value,如下,这就是sync.Map的大致思路

1
2
3
4
5
6
7
8
9
type entry struct {
p *any
}
// read -> key0|*entry0 entry0.p -> &value
// -> key1|*entry1
//
// dirty -> key0|*entry0
// -> key1|*entry1
// -> key2|*entry2

注意:尽管如此,sync.Map并不完美,以上设计导致我们无法直接计算出哈希表的元素数量,需要遍历进行统计,而且还不一定准确

当前go版本:1.23,1.24版本改为HashTrieMap实现

快速上手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"sync"
)

func main() {
var syncMap sync.Map

// 写
syncMap.Store("blog", "VictoriaMetrics")

// 读
value, ok := syncMap.Load("blog")
fmt.Println(value, ok)

// 删除
syncMap.Delete("blog")
value, ok = syncMap.Load("blog")
fmt.Println(value, ok)
}

上述代码运行结果如下

1
2
3
4
5
go run ./main.go

# 输出如下
# VictoriaMetrics true
# <nil> false

数据结构

todo:文章图片待补充

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// src/sync/map.go
type Map struct {
mu Mutex // dirty锁
read atomic.Pointer[readOnly] // 负责已有key的读写
dirty map[any]*entry // 负责新key的读写,替换read后设置为nil
misses int // 计数器,如果read找不到key时加一,当misses==len(dirty),用dirty替换掉read
}

// map+amended
type readOnly struct {
m map[any]*entry //
amended bool // 修正,当dirty有新的key写入时为true
}

// 指针-运行时生成,表示数据已完全删除
var expunged = new(any)

// value的封装,被read和dirty共享
type entry struct {
p atomic.Pointer[any] // 指针-任意类型
}

在这里值得注意的是entry的p指针,有三个状态

p 说明
&value 正常状态
nil 已删除,可以当作是墓碑来理解
expunged dirty替代read时,从nil改为expunged,在下一轮替换中,移除该key

三种状态转移路线:

  1. &value -> nil -> expunged
  2. expunged -> nil -> &value

核心方法

Load

根据key获取value,具体逻辑如下

  1. 根据key在read中寻找entry,如果找到,返回
  2. 加锁,再找一遍read,如果找到,返回(double-check)
  3. amended为true表示dirty有新的key,在dirty找
  4. 更新misses计数器,如果misses==len(dirty),用dirty替换掉read
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (m *Map) Load(key any) (value any, ok bool) {
// 在read找
read := m.loadReadOnly()
e, ok := read.m[key]
// 没找到但dirty有新key写入
if !ok && read.amended {
m.mu.Lock()
// double-check
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
// 在dirty找
e, ok = m.dirty[key]
// misses++
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}

// 获取m.read
func (m *Map) loadReadOnly() readOnly {
if p := m.read.Load(); p != nil {
return *p
}
return readOnly{}
}

// 读取数据
func (e *entry) load() (value any, ok bool) {
p := e.p.Load()
// 已删除
if p == nil || p == expunged {
return nil, false
}
return *p, true
}

// misses计数器更新
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
// misses == len(m.dirty)
// 用dirty替换read
m.read.Store(&readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}

Store

存储key/value。本质就是Swap方法,但丢弃其返回值,具体看Swap

1
2
3
func (m *Map) Store(key, value any) {
_, _ = m.Swap(key, value)
}

LoadOrStore

根据key获取value,如果没有该key/value,则改为写入。该方法逻辑与Swap方法十分相似

具体逻辑如下

  1. 根据key在read中寻找entry,如果找到,返回
  2. 加锁,再找一遍read,如果找到,返回(double-check)
  3. 如果在dirty找到,获取/更新key,同时更新misses计数器,返回
  4. 都没找到说明是新key,写入dirty,返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {
// 在read找
read := m.loadReadOnly()
// 找到了
if e, ok := read.m[key]; ok {
// e有数值则获取,没有则更新并获取
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}

// 在read中没找到(dirty可能有)

m.mu.Lock()
// double-check
read = m.loadReadOnly()
// 在read找到
if e, ok := read.m[key]; ok {
// e指针从expunged改为nil
if e.unexpungeLocked() {
// 凡是expunged变为nil状态的,read要同步到dirty
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
// 在dirty找到
actual, loaded, _ = e.tryLoadOrStore(value)
// misses++
m.missLocked()
} else {
// dirty也没有 => 全新写入
// 有新的key写入,需要更新amended
if !read.amended {
// 克隆read到dirty(只保留未删除的纪录)
m.dirtyLocked()
// 有新key写入,需要将amended改为true
m.read.Store(&readOnly{m: read.m, amended: true})
}
// 凡是新key,都放在dirty
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()

return actual, loaded
}

// 有数值则获取,没有则更新并获取返回
func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) {
p := e.p.Load()
// 完全删除
if p == expunged {
return nil, false, false
}
// 不为nil
if p != nil {
return *p, true, true
}

// 为nil
ic := i
for {
// 替换成功
if e.p.CompareAndSwap(nil, &ic) {
return i, false, true
}
// 重复上面的操作
p = e.p.Load()
if p == expunged {
return nil, false, false
}
if p != nil {
return *p, true, true
}
}
}

// e指针从expunged改为nil
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return e.p.CompareAndSwap(expunged, nil)
}

func (m *Map) dirtyLocked() {
// dirty已经存在了,忽略
if m.dirty != nil {
return
}

read := m.loadReadOnly()
// 创建dirty,与read同等大小
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
// 把nil的entry改为expunged
if !e.tryExpungeLocked() {
// 其他非删除数据复制到dirty
m.dirty[k] = e
}
}
}

// e.p指针从nil改为expunged
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := e.p.Load()
for p == nil {
// nil -> expunged
if e.p.CompareAndSwap(nil, expunged) {
return true
}
p = e.p.Load()
}
return p == expunged
}

Delete

删除指定key,具体看LoadAndDelete

1
2
3
4
// Delete deletes the value for a key.
func (m *Map) Delete(key any) {
m.LoadAndDelete(key)
}

LoadAndDelete

逻辑与Load相似,具体如下

  1. 根据key在read中寻找entry,如果找到,将entry置为nil
  2. 加锁,再找一遍read,如果找到,将entry置为nil(double-check)
  3. amended为true表示dirty有新的key,在dirty找
  4. 更新misses计数器,如果misses==len(dirty),将dirty迁移到read
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
// 在read找
read := m.loadReadOnly()
e, ok := read.m[key]
// 没找到但dirty有新key写入
if !ok && read.amended {
m.mu.Lock()
// double-check
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
// 在dirty找
e, ok = m.dirty[key]
// read没有但dirty有,直接删除
delete(m.dirty, key)
// misses++
m.missLocked()
}
m.mu.Unlock()
}
if ok {
// e.p改为nil
return e.delete()
}
return nil, false
}

// e.p改为nil
func (e *entry) delete() (value any, ok bool) {
for {
p := e.p.Load()
// 已删除
if p == nil || p == expunged {
return nil, false
}
// 置为nil
if e.p.CompareAndSwap(p, nil) {
return *p, true
}
}
}

CompareAndDelete

具体逻辑如下

  1. 根据key在read中寻找entry,如果找到,纪录e
  2. 加锁,再找一遍read,如果找到,纪录e(double-check)
  3. amended为true表示dirty有新的key,在dirty找
  4. 更新misses计数器,如果misses==len(dirty),用dirty替换掉read
  5. 已删除或比对失败返回false,否则将e.p置为nil,返回true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (m *Map) CompareAndDelete(key, old any) (deleted bool) {
// 在read找
read := m.loadReadOnly()
e, ok := read.m[key]
// 没找到但dirty有新key写入
if !ok && read.amended {
m.mu.Lock()
// double-check
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
// 在dirty找
e, ok = m.dirty[key]
// misses++
m.missLocked()
}
m.mu.Unlock()
}
for ok {
p := e.p.Load()
// 已删除或比对失败
if p == nil || p == expunged || *p != old {
return false
}
// 置为nil
if e.p.CompareAndSwap(p, nil) {
return true
}
}
return false
}

Swap

使用value替换key当前存储的数据。具体逻辑如下

  1. 根据key在read中寻找entry,如果找到,替换并返回
  2. 加锁,再找一遍read,如果找到,替换并返回(double-check)
  3. 如果在dirty找到,替换并返回
  4. 都没找到说明是新key,写入dirty,返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
// 在read找
read := m.loadReadOnly()
// 找到了
if e, ok := read.m[key]; ok {
// 修改指针指向新的value
if v, ok := e.trySwap(&value); ok {
// 已删除
if v == nil {
return nil, false
}
// 有值,返回
return *v, true
}
}

// 在read中没找到(dirty可能有)

m.mu.Lock()
// double-check
read = m.loadReadOnly()
// 在read找到
if e, ok := read.m[key]; ok {
// e指针从expunged改为nil
if e.unexpungeLocked() {
// 凡是expunged变为nil状态的,read要同步到dirty
m.dirty[key] = e
}
// 修改指针指向新的value
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else if e, ok := m.dirty[key]; ok {
// 在dirty找到
// 修改指针指向新的value
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else {
// dirty也没有 => 全新写入
// 有新的key写入,需要更新amended
if !read.amended {
// 克隆read到dirty(只保留未删除的纪录)
m.dirtyLocked()
// 有新key写入,需要将amended改为true
m.read.Store(&readOnly{m: read.m, amended: true})
}
// 凡是新key,都放在dirty
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
return previous, loaded
}

func (e *entry) trySwap(i *any) (*any, bool) {
for {
p := e.p.Load()
// 已删除
if p == expunged {
return nil, false
}
// 替换成功
if e.p.CompareAndSwap(p, i) {
return p, true
}
}
}

func (e *entry) swapLocked(i *any) *any {
return e.p.Swap(i)
}

CompareAndSwap

具体逻辑如下

  1. 根据key在read中寻找entry,如果找到,替换并返回
  2. 如果dirty没有新key,到此为止,返回
  3. 加锁,再找一遍read,如果找到,替换并返回(double-check)
  4. 如果在dirty找到,替换并返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (m *Map) CompareAndSwap(key, old, new any) (swapped bool) {
// 在read找
read := m.loadReadOnly()
// 找到了
if e, ok := read.m[key]; ok {
// 尝试修改指针指向新的value
return e.tryCompareAndSwap(old, new)
} else if !read.amended {
// 没找到,dirty也没有新key
return false
}

// 在read中没找到但dirty可能有

m.mu.Lock()
defer m.mu.Unlock()
// double-check
read = m.loadReadOnly()
swapped = false
if e, ok := read.m[key]; ok {
// 同上
swapped = e.tryCompareAndSwap(old, new)
} else if e, ok := m.dirty[key]; ok {
// 在dirty找到
// 尝试修改指针指向新的value
swapped = e.tryCompareAndSwap(old, new)
// misses++
m.missLocked()
}
return swapped
}

func (e *entry) tryCompareAndSwap(old, new any) bool {
p := e.p.Load()
// 已删除或比对失败
if p == nil || p == expunged || *p != old {
return false
}

// 原子替换
nc := new // 优化,具体看原注释
for {
if e.p.CompareAndSwap(p, &nc) {
return true
}
p = e.p.Load()
if p == nil || p == expunged || *p != old {
return false
}
}
}

Range

具体逻辑如下

  1. 复制read,如果dirty有新key写入,则复制dirty并用dirty替换read
  2. 遍历哈希表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (m *Map) Range(f func(key, value any) bool) {
// 复制read
read := m.loadReadOnly()
// dirty有新key写入
if read.amended {
m.mu.Lock()
read = m.loadReadOnly()
// double-check
if read.amended {
// 复制dirty
read = readOnly{m: m.dirty}
copyRead := read
// 用dirty替换掉read
m.read.Store(&copyRead)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}

// 遍历哈希表
for k, e := range read.m {
// 读取数据
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}

Clear

具体逻辑如下

  1. read为空且dirty也为空,返回
  2. read不为空或者dirty有新数据
    • 复制read并将read置空
    • 将dirty清空
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (m *Map) Clear() {
// 复制read
read := m.loadReadOnly()
// read为空且dirty也为空
if len(read.m) == 0 && !read.amended {
return
}

// read不为空或者dirty有新数据

m.mu.Lock()
defer m.mu.Unlock()

// 复制read
read = m.loadReadOnly()
// read置空
if len(read.m) > 0 || read.amended {
m.read.Store(&readOnly{})
}

// dirty清空
clear(m.dirty)
// Don't immediately promote the newly-cleared dirty map on the next operation.
m.misses = 0
}

参考文档

Go sync.Map: The Right Tool for the Right Job