golang系列之-sync/atomic

sync/atomic标准库包中提供的原子操作。原子操作是无锁的,直接通过CPU指令实现。

当你想要在多个goroutine中无锁访问一个变量时,就可以考虑使用atomic包提供的数据类型实现

当前go版本:1.24

快速上手

以下是一个使用atomic.Uint64数据类型实现的计数器,它确保了多个goroutine按顺序正确更新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"sync"
"sync/atomic"
)

func main() {

var ops atomic.Uint64

var wg sync.WaitGroup

for i := 0; i < 50; i++ {
wg.Add(1)

go func() {
for c := 0; c < 1000; c++ {

ops.Add(1)
}

wg.Done()
}()
}

wg.Wait()

fmt.Println("ops:", ops.Load())
}

数据结构

todo:文章图片待补充

通用

sync/atomic包中,通用的数据结构有

  • Value

使用Value可以存储任意类型的数据

1
2
3
4
5
6
7
8
9
type Value struct {
v any
}

// efaceWords is interface{} internal representation.
type efaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}

其他

其他具体的数据结构有

  • Bool
  • Int32
  • Int64
  • Uint32
  • Uint64
  • Uintptr
  • Pointer

举个例子,Pointer类型的数据结构如下

1
2
3
4
5
6
7
8
9
10
// A Pointer is an atomic pointer of type *T. The zero value is a nil *T.
type Pointer[T any] struct {
// Mention *T in a field to disallow conversion between Pointer types.
// See go.dev/issue/56603 for more details.
// Use *T, not T, to avoid spurious recursive type definition errors.
_ [0]*T

_ noCopy
v unsafe.Pointer
}

通用方法

基本上,atomic提供的数据类型有几个通用的方法:Load、Store、Swap、CompareAndSwap

下面展示一个基本的使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// init  -> v=nil
v := atomic.Value{}

// store -> v=1
v.Store(1)

// load
// print 1
fmt.Println(v.Load())

// swap -> v=2
// print 1
fmt.Println(v.Swap(2))
// print 2
fmt.Println(v.Load())

// CAS -> v=3
// print true
fmt.Println(v.CompareAndSwap(2, 3))
// print 3
fmt.Println(v.Load())

Load

获取atomic类型存储的数值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (val any) {
vp := (*efaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {
// First store not yet completed.
return nil
}
data := LoadPointer(&vp.data)
vlp := (*efaceWords)(unsafe.Pointer(&val))
vlp.typ = typ
vlp.data = data
return
}

Store

更新/覆盖atomic类型的数值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Store sets the value of the [Value] v to val.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(val any) {
if val == nil {
panic("sync/atomic: store of nil value into Value")
}
vp := (*efaceWords)(unsafe.Pointer(v))
vlp := (*efaceWords)(unsafe.Pointer(&val))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// Attempt to start first store.
// Disable preemption so that other goroutines can use
// active spin wait to wait for completion.
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
runtime_procUnpin()
continue
}
// Complete first store.
StorePointer(&vp.data, vlp.data)
StorePointer(&vp.typ, vlp.typ)
runtime_procUnpin()
return
}
if typ == unsafe.Pointer(&firstStoreInProgress) {
// First store in progress. Wait.
// Since we disable preemption around the first store,
// we can wait with active spinning.
continue
}
// First store completed. Check type and overwrite data.
if typ != vlp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, vlp.data)
return
}
}

Swap

类似Store,更新atomic类型的数值->newVal,并返回先前数值->oldVal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Swap stores new into Value and returns the previous value. It returns nil if
// the Value is empty.
//
// All calls to Swap for a given Value must use values of the same concrete
// type. Swap of an inconsistent type panics, as does Swap(nil).
func (v *Value) Swap(new any) (old any) {
if new == nil {
panic("sync/atomic: swap of nil value into Value")
}
vp := (*efaceWords)(unsafe.Pointer(v))
np := (*efaceWords)(unsafe.Pointer(&new))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// Attempt to start first store.
// Disable preemption so that other goroutines can use
// active spin wait to wait for completion; and so that
// GC does not see the fake type accidentally.
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
runtime_procUnpin()
continue
}
// Complete first store.
StorePointer(&vp.data, np.data)
StorePointer(&vp.typ, np.typ)
runtime_procUnpin()
return nil
}
if typ == unsafe.Pointer(&firstStoreInProgress) {
// First store in progress. Wait.
// Since we disable preemption around the first store,
// we can wait with active spinning.
continue
}
// First store completed. Check type and overwrite data.
if typ != np.typ {
panic("sync/atomic: swap of inconsistently typed value into Value")
}
op := (*efaceWords)(unsafe.Pointer(&old))
op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data)
return old
}
}

CompareAndSwap

对比atomic类型当前数值,相同则更新并返回true,否则不处理并返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// CompareAndSwap executes the compare-and-swap operation for the [Value].
//
// All calls to CompareAndSwap for a given Value must use values of the same
// concrete type. CompareAndSwap of an inconsistent type panics, as does
// CompareAndSwap(old, nil).
func (v *Value) CompareAndSwap(old, new any) (swapped bool) {
if new == nil {
panic("sync/atomic: compare and swap of nil value into Value")
}
vp := (*efaceWords)(unsafe.Pointer(v))
np := (*efaceWords)(unsafe.Pointer(&new))
op := (*efaceWords)(unsafe.Pointer(&old))
if op.typ != nil && np.typ != op.typ {
panic("sync/atomic: compare and swap of inconsistently typed values")
}
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
if old != nil {
return false
}
// Attempt to start first store.
// Disable preemption so that other goroutines can use
// active spin wait to wait for completion; and so that
// GC does not see the fake type accidentally.
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
runtime_procUnpin()
continue
}
// Complete first store.
StorePointer(&vp.data, np.data)
StorePointer(&vp.typ, np.typ)
runtime_procUnpin()
return true
}
if typ == unsafe.Pointer(&firstStoreInProgress) {
// First store in progress. Wait.
// Since we disable preemption around the first store,
// we can wait with active spinning.
continue
}
// First store completed. Check type and overwrite data.
if typ != np.typ {
panic("sync/atomic: compare and swap of inconsistently typed value into Value")
}
// Compare old and current via runtime equality check.
// This allows value types to be compared, something
// not offered by the package functions.
// CompareAndSwapPointer below only ensures vp.data
// has not changed since LoadPointer.
data := LoadPointer(&vp.data)
var i any
(*efaceWords)(unsafe.Pointer(&i)).typ = typ
(*efaceWords)(unsafe.Pointer(&i)).data = data
if i != old {
return false
}
return CompareAndSwapPointer(&vp.data, data, np.data)
}
}

特定类型方法

相对于其他,整型数有几个额外方法:Add、And、Or,相关类型是

  • Int32
  • Int64
  • Uint32
  • Uint64
  • Uintptr

示例代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
counter := int32(99)

// counter += 1
counter = atomic.AddInt32(&counter, 1)
// print 100
fmt.Println(counter)

// 0110 0100 & 0000 0111 -> 0000 0100 => 4
// print 100
fmt.Println(atomic.AndInt32(&counter, 7))
// print 4
fmt.Println(counter)

// 0000 0100 | 0000 0010 -> 0000 0110 => 6
// print 4
fmt.Println(atomic.OrInt32(&counter, 2))
// print 6
fmt.Println(counter)

Add

原子加法操作,两个数相加,返回结果

1
2
3
4
// src/internal/runtime/atomic/atomic_amd64.s
// src/sync/atomic/type.go
// Add atomically adds delta to x and returns the new value.
func (x *Int64) Add(delta int64) (new int64) { return AddInt64(&x.v, delta) }

AddInt64汇编代码如下,本人对汇编代码不了解,下面的注释是由ChatGPT提供的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// uint64 Xadd64(uint64 volatile *val, int64 delta)
// Atomically:
// *val += delta;
// return *val;
TEXT ·Xadd64(SB), NOSPLIT, $0-24
MOVQ ptr+0(FP), BX // ptr(val 的地址)被加载到寄存器 BX 中
MOVQ delta+8(FP), AX // delta 被加载到寄存器 AX 中
MOVQ AX, CX // 复制 AX 的值到 CX,以便后面能把 delta 加到返回值中
LOCK // 确保接下来的操作是原子性的
XADDQ AX, 0(BX) // 指令会将 AX 加到 BX 指向的值,并返回加法操作前的旧值(旧值会存入 AX)
ADDQ CX, AX // 将之前存储的delta加回AX上,得到最终结果
MOVQ AX, ret+16(FP) // 把最终的结果存入返回值地址
RET // 返回

TEXT ·Xaddint64(SB), NOSPLIT, $0-24
JMP ·Xadd64(SB)

更多的注释如下

字段 注释
TEXT 标志这段代码是一个函数
·Xadd64(SB) 函数名称
SB 是指向当前函数栈帧的偏移量,传递给 racecallatomic 作为栈帧的指针。
NOSPLIT 表示该函数不会执行栈分割,也就是说,它不会访问 Go 层的栈分配
$0-24 指示函数参数的大小范围,在这种情况下,0-24 意味着该函数有 24 字节的局部空间(用于存储传入的参数等)

And

原子位与(&)操作,新结果写入变量,返回旧的数据

1
2
3
// And atomically performs a bitwise AND operation on x using the bitmask
// provided as mask and returns the old value.
func (x *Int64) And(mask int64) (old int64) { return AndInt64(&x.v, mask) }

AndInt64汇编代码如下

1
2
3
4
5
TEXT    sync∕atomic·AndInt64(SB), NOSPLIT|NOFRAME, $0-24    // 
GO_ARGS
MOVQ $__tsan_go_atomic64_fetch_and(SB), AX
CALL racecallatomic<>(SB)
RET

相关注释如下,只展示与AddInt64汇编代码不同的部份

字段 注释
GO_ARGS 这是一个宏,表示处理传入的 Go 参数,具体的参数内容会根据函数调用的上下文进行调整。这是 Go 编译器用来处理函数调用时的一些约定。它会将传入的参数从 Go 栈中提取到寄存器中
__tsan_go_atomic64_fetch_and 是一个标识符,Go 在进行原子操作时,通常会使用一个特殊的函数来标识和追踪潜在的数据竞争。这种操作确保了线程安全,并且在并发环境中避免了出现未同步的数据访问
racecallatomic Go 的一个内建函数,用来执行原子操作并同时为 ThreadSanitizer(TSan)提供监控支持。它会根据 AX 寄存器中存储的函数地址(即 __tsan_go_atomic64_fetch_and)执行对应的操作
<> 表示泛型类型参数,在 Go 汇编中表示函数的参数类型。racecallatomic 函数会根据这些类型参数来处理实际的原子操作。

Or

原子位或(|)操作,新结果写入变量,返回旧的数据

1
2
3
// Or atomically performs a bitwise OR operation on x using the bitmask
// provided as mask and returns the old value.
func (x *Int64) Or(mask int64) (old int64) { return OrInt64(&x.v, mask) }

OrInt64的汇编代码如下,不再展示注释

1
2
3
4
5
TEXT    sync∕atomic·OrInt64(SB), NOSPLIT|NOFRAME, $0-24
GO_ARGS
MOVQ $__tsan_go_atomic64_fetch_or(SB), AX
CALL racecallatomic<>(SB)
RET

参考文档

atomic