channel-管道,是go语言中一种常见的goroutine的通信方式

当前go版本:1.24

快速上手

示例1. 两个goroutine之间使用channel传递数据

1
2
3
4
5
6
7
8
9
10
11
message := make(chan string)

// 新goroutine
go func() {
time.Sleep(time.Second * 2)
message <- "Hello from goroutine!"
}()

// 当前goroutine
msg := <-message
fmt.Println(msg)

示例2. 使用select同时监听多个goroutine的响应数据,实际上,业务代码中一般都是跟定时器搭配使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ch1 := make(chan int)
ch2 := make(chan int)

go func() {
time.Sleep(time.Second * 1)
ch1 <- 1
}()

go func() {
time.Sleep(time.Second * 2)
ch2 <- 2
}()

for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received from ch1:", msg1)
case msg2 := <-ch2:
fmt.Println("Received from ch2:", msg2)
}
}
阅读全文 »

从1.24版开始,sync.Map改用HashTrieMap重构,与之前的双map实现不同,HashTrieMap更像是一个B树,简单的示例图如下

1
2
3
4
5
6
7
//  root -> | idx0    | idx1       | ... | idx15 |
// | &entry0 | &indirect0 | ... | nil |
// |
// children
// v
// | idx0 | idx1 | ... | idx15 |
// | nil | &entry1 | ... | nil |

使用哈希函数生成64位的哈希值,从高到低4位为一个idx,最多有16层,每个节点可容纳16个元素,最多可容纳16^16=2^64个元素

当前go版本:1.24

HashTrieMap的开关放在文件src/internal/buildcfg/exp.go的函数ParseGOEXPERIMENT

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type HashTrieMap[K comparable, V any] struct {
inited atomic.Uint32 // 是否已初始化
initMu Mutex // 锁,用于初始化
root atomic.Pointer[indirect[K, V]] // 根节点
keyHash hashFunc // 哈希函数,用于key
valEqual equalFunc // cmp函数,用于value
seed uintptr // 哈希种子
}

// 内部节点
type indirect[K comparable, V any] struct {
node[K, V] // isEntry=false
dead atomic.Bool // 是否被删除
mu Mutex // 锁,用于children
parent *indirect[K, V] // 父节点指针
children [nChildren]atomic.Pointer[node[K, V]] // 16个子节点,指向indirect或entry
}

// 叶子节点
type entry[K comparable, V any] struct {
node[K, V] // isEntry=true
overflow atomic.Pointer[entry[K, V]] // 指针,当两个entry哈希值相同时以链表方式存储
key K // 键
value V // 值
}

// entry和indirect的共有属性
type node[K comparable, V any] struct {
isEntry bool // 判断是叶子节点还是内部节点
}
阅读全文 »

map不支持并发读写,但我们可以转变下思路,将value改为一个指向结构体entry的指针,结构体内部的字段我们是可以随意修改的,如下,将并发读写map改为并发读map,读写转移到entry

1
2
3
4
5
type entry struct {
p any
}

var m map[string]*entry

上面的方法看起来解决了并发读写的问题,但还不够,当有新的key写入时,还是变回了原来的map并发读写。sync.Map提供了一种思路,使用两个map,read负责已有key的并发读写,dirty负责新key的读写,只有当read找不到key,才去找dirty。

现在还剩最后一个问题,read和dirty如何保证数据一致/同步,我们可以改造entry,使其指向value的指针,如此一来,read和dirty的entry可以指向同一个value,如下,这就是sync.Map的大致思路

1
2
3
4
5
6
7
8
9
type entry struct {
p *any
}
// read -> key0|*entry0 entry0.p -> &value
// -> key1|*entry1
//
// dirty -> key0|*entry0
// -> key1|*entry1
// -> key2|*entry2

注意:尽管如此,sync.Map并不完美,以上设计导致我们无法直接计算出哈希表的元素数量,需要遍历进行统计,而且还不一定准确

当前go版本:1.23,1.24版本改为HashTrieMap实现

快速上手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"sync"
)

func main() {
var syncMap sync.Map

// 写
syncMap.Store("blog", "VictoriaMetrics")

// 读
value, ok := syncMap.Load("blog")
fmt.Println(value, ok)

// 删除
syncMap.Delete("blog")
value, ok = syncMap.Load("blog")
fmt.Println(value, ok)
}

上述代码运行结果如下

1
2
3
4
5
go run ./main.go

# 输出如下
# VictoriaMetrics true
# <nil> false
阅读全文 »

在使用redis/memcached缓存系统时,可能会遇到以下三个问题

  1. cache penetration(缓存穿透) - 数据既不在cache中也不在db中,可以用布龙过滤器处理
  2. cache avalanche(缓存雪崩) - 同一时刻出现大量的key失效,可以将过期时间随机化或者不设置过期时间
  3. cache breakdown(缓存击穿)/cache stampede(缓存踩踏) - 热门的key过期,客户端加锁或者不设置过期时间

缓存击穿问题中,客户端加锁使穿行化访问是一个值得考虑的解决方法,可以降低服务器(cache/db)的压力。但另一方面,这也会让大量的请求被阻塞,吞吐量下降。实际上,同一时刻的请求可以共享响应数据,这就是singleflight解决的问题

singleflight不是标准库的一部份,但go的internal目录内复制了一份singleflight源码,该源码也是本文在讨论的

当前go版本:1.24

快速上手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

"golang.org/x/sync/singleflight"
)

var callCount atomic.Int32
var wg sync.WaitGroup

// 模拟db请求
func fetchData() (interface{}, error) {
callCount.Add(1)
time.Sleep(100 * time.Millisecond)
// 返回的数据是随机的
return rand.Intn(100), nil
}

// 包装fetchData和singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
defer wg.Done()

time.Sleep(time.Duration(id) * 40 * time.Millisecond)
v, err, shared := g.Do("key-fetch-data", fetchData)
if err != nil {
return err
}

fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
return nil
}

func main() {
var g singleflight.Group

const numGoroutines = 5
wg.Add(numGoroutines)

// 模拟:发起5个请求访问db
for i := 0; i < numGoroutines; i++ {
go fetchDataWrapper(&g, i)
}

wg.Wait()
fmt.Printf("Function was called %d times\n", callCount.Load())
}

上述代码运行结果如下

1
2
3
4
5
6
7
8
9
go run ./main.go

# 输出如下,结果是随机的
# Goroutine 1: result: 2, shared: true
# Goroutine 0: result: 2, shared: true
# Goroutine 2: result: 2, shared: true
# Goroutine 3: result: 94, shared: true
# Goroutine 4: result: 94, shared: true
# Function was called 2 times

可以看到,G0、G1、G2共享result=2,G3、G4共享result=94

阅读全文 »

RWMutex-读写锁,该锁可以被任意多个reader持有,或被一个writer持有。通过观察RWMutex的源代码实现,可以将RWMutex看作是FIFO队列,具体看后面的详细描述

当前go版本:1.24

快速上手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"fmt"
"sync"
)

func main() {
var size = 100
var count int
var wg sync.WaitGroup
var rwm sync.RWMutex

queue := make([]int, size)

for i := 0; i < size; i++ {
go func() {
wg.Add(1)
defer wg.Done()

rwm.Lock() // 写锁
count++ // 更新资源
rwm.Unlock()
}()
}

for i := 0; i < size; i++ {
indx := i
go func() {
wg.Add(1)
defer wg.Done()

rwm.RLock() // 读锁
queue[indx] = count // 只读
rwm.RUnlock()
}()
}

wg.Wait()

fmt.Println(count)
fmt.Println(queue)
}
阅读全文 »

Mutex(MUTualEx)-互斥锁是一种可以保证每次只有一个goroutine访问贡献资源的方法。这个资源可以是一段程序代码、一个整数、一个map、一个struct、一个channel或其他任何东西。通过观察Mutex的源代码实现,可以将Mutex看作是一个队列(FIFO/LIFO),具体看后面的详细描述

当前go版本:1.24

快速上手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"sync"
)

type Container struct {
mu sync.Mutex
counters map[string]int
}

func (c *Container) inc(name string) {
c.mu.Lock() // 互斥锁,获取失败等待挂起
defer c.mu.Unlock() // 解锁
c.counters[name]++ // 共享资源
}

func main() {
c := Container{
counters: map[string]int{"a": 0, "b": 0},
}

var wg sync.WaitGroup

doIncrement := func(name string, n int) {
for i := 0; i < n; i++ {
c.inc(name)
}
wg.Done()
}

wg.Add(3)
go doIncrement("a", 10000)
go doIncrement("a", 10000)
go doIncrement("b", 10000)

wg.Wait()
fmt.Println(c.counters)
}

上述代码运行结果如下

1
2
3
4
go run ./main.go

# 输出如下
# map[a:20000 b:10000]
阅读全文 »

sync.Cond经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。底层实现基于信号量semaphore

当前go版本:1.24

快速上手

以下展示一个sync.Cond的使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"sync"
)

var shared = make(map[string]interface{})

func main() {
var wg sync.WaitGroup
wg.Add(2)

cond := sync.NewCond(&sync.Mutex{})

// reader 打印shared[key]
reader := func(key string) {
cond.L.Lock()
// 等待,直到shared有数据
for len(shared) == 0 {
cond.Wait() // Wait内部会暂时解锁/加锁
}
fmt.Println(shared[key])
cond.L.Unlock()
wg.Done()
}

// 创建两个goroutine
go reader("rsc1")
go reader("rsc2")

// writer
cond.L.Lock()
// 写入
shared["rsc1"] = "foo"
shared["rsc2"] = "bar"
// 通知所有goroutine
cond.Broadcast()
cond.L.Unlock()

wg.Wait()
}
阅读全文 »

并发情况下,如果需要等待所有的goroutine完成任务,需要使用Waitgroup等待

当前go版本:1.24

快速上手

先简单列举一个使用案例,了解Waitgroup的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"sync"
"time"
)

func worker(id int) {
fmt.Printf("Worker %d starting\n", id)

time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}

func main() {
var wg sync.WaitGroup

for i := 1; i <= 5; i++ {
// counter++
wg.Add(1)

go func() {
// counter--
defer wg.Done()
worker(i)
}()
}

// wait
wg.Wait()
}

上述代码执行后,系统输出如下,可以看到系统等待5个goroutine完成任务后才退出

1
2
3
4
5
6
7
8
9
10
11
12
13
go run ./main.go

# 输出如下
# Worker 5 starting
# Worker 2 starting
# Worker 1 starting
# Worker 3 starting
# Worker 4 starting
# Worker 4 done
# Worker 5 done
# Worker 2 done
# Worker 3 done
# Worker 1 done
阅读全文 »

sync/atomic标准库包中提供的原子操作。原子操作是无锁的,直接通过CPU指令实现。

当你想要在多个goroutine中无锁访问一个变量时,就可以考虑使用atomic包提供的数据类型实现

当前go版本:1.24

快速上手

以下是一个使用atomic.Uint64数据类型实现的计数器,它确保了多个goroutine按顺序正确更新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"sync"
"sync/atomic"
)

func main() {

var ops atomic.Uint64

var wg sync.WaitGroup

for i := 0; i < 50; i++ {
wg.Add(1)

go func() {
for c := 0; c < 1000; c++ {

ops.Add(1)
}

wg.Done()
}()
}

wg.Wait()

fmt.Println("ops:", ops.Load())
}
阅读全文 »

sync.Pool-临时对象池,是golang一个很关键的数据结构,通过复用历史对象,缓解因频繁创建、删除对象而导致的内存分配压力、GC压力,在社区中被广泛使用,有如go-gin、kubernetes等

当前go版本:1.24

快速上手

下面展示一个简单的使用示例,用于帮助用户快速上手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"fmt"
"sync"
)

type JobState int

const (
JobStateFresh JobState = iota
JobStateRunning
JobStateRecycled
)

type Job struct {
state JobState
}

func (j *Job) Run() {
switch j.state {
case JobStateRecycled:
fmt.Println("this job came from the pool")
case JobStateFresh:
fmt.Println("this job just got allocated")
}

j.state = JobStateRunning
}

func main() {
// 创建一个对象池
pool := &sync.Pool{
New: func() any {
return &Job{state: JobStateFresh}
},
}

// 获取一个对象,可以是新建的或者是历史使用过的
job := pool.Get().(*Job)

// 执行业务代码
job.Run()

// reset状态并放回池子里,方便下次使用
job.state = JobStateRecycled
pool.Put(job)
}
阅读全文 »
0%