golang系列之-singleflight

在使用redis/memcached缓存系统时,可能会遇到以下三个问题

  1. cache penetration(缓存穿透) - 数据既不在cache中也不在db中,可以用布龙过滤器处理
  2. cache avalanche(缓存雪崩) - 同一时刻出现大量的key失效,可以将过期时间随机化或者不设置过期时间
  3. cache breakdown(缓存击穿)/cache stampede(缓存踩踏) - 热门的key过期,客户端加锁或者不设置过期时间

缓存击穿问题中,客户端加锁使穿行化访问是一个值得考虑的解决方法,可以降低服务器(cache/db)的压力。但另一方面,这也会让大量的请求被阻塞,吞吐量下降。实际上,同一时刻的请求可以共享响应数据,这就是singleflight解决的问题

singleflight不是标准库的一部份,但go的internal目录内复制了一份singleflight源码,该源码也是本文在讨论的

当前go版本:1.24

快速上手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

"golang.org/x/sync/singleflight"
)

var callCount atomic.Int32
var wg sync.WaitGroup

// 模拟db请求
func fetchData() (interface{}, error) {
callCount.Add(1)
time.Sleep(100 * time.Millisecond)
// 返回的数据是随机的
return rand.Intn(100), nil
}

// 包装fetchData和singleflight
func fetchDataWrapper(g *singleflight.Group, id int) error {
defer wg.Done()

time.Sleep(time.Duration(id) * 40 * time.Millisecond)
v, err, shared := g.Do("key-fetch-data", fetchData)
if err != nil {
return err
}

fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared)
return nil
}

func main() {
var g singleflight.Group

const numGoroutines = 5
wg.Add(numGoroutines)

// 模拟:发起5个请求访问db
for i := 0; i < numGoroutines; i++ {
go fetchDataWrapper(&g, i)
}

wg.Wait()
fmt.Printf("Function was called %d times\n", callCount.Load())
}

上述代码运行结果如下

1
2
3
4
5
6
7
8
9
go run ./main.go

# 输出如下,结果是随机的
# Goroutine 1: result: 2, shared: true
# Goroutine 0: result: 2, shared: true
# Goroutine 2: result: 2, shared: true
# Goroutine 3: result: 94, shared: true
# Goroutine 4: result: 94, shared: true
# Function was called 2 times

可以看到,G0、G1、G2共享result=2,G3、G4共享result=94

数据结构

todo:文章图片待补充

singleflight数据结构如下所示,其中

  • Group - 由mutex和map组成,本质就是map,因为map不允许并发读写,所以才加了mutex
  • call - 是响应数据的封装,与请求key关联在一起,因为内部的dup计数器,或许可以将其按上下文来理解
  • Result - 也是响应数据的封装,跟channel搭配使用

响应数据的封装方式有两种

  1. 通过函数返回值返回
  2. 通过channel返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// src/internal/singleflight/singleflight.go
type Group struct {
mu sync.Mutex // 锁,map不能并发读写
m map[string]*call // 关联key跟结果
}

// 响应数据封装
type call struct {
wg sync.WaitGroup
// 方式1
val any // 结果值
err error // 异常值
dups int // 计数器(等待中的goroutine),同时用来判断是否共享
// 方式2
chans []chan<- Result // 结果值/异常值/是否共享(通过channel传递)
}

// 响应数据封装,与channel搭配
type Result struct {
Val any // 结果值
Err error // 异常值
Shared bool // 是否共享
}

核心方法

核心的方法有Do、DoChan、ForgetUnshared,具体看下面描述

Do

具体逻辑如下

  1. 如果key已存在,挂起等待,唤醒后读取响应数据
  2. 如果key不存在,为key创建响应数据封装,写入map,然后调用doCall获取响应数据
  3. 删除key
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool) {
// 1. 为key与结果创建关联

g.mu.Lock() // map不能并发读写,需要加锁

if g.m == nil {
g.m = make(map[string]*call)
}

// 1.1. 相同的请求已存在,等待结果返回
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait() // 挂起等待
return c.val, c.err, true
}

// 1.2. 新请求,创建结果并关联
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

// 2. 调用函数获取结果
g.doCall(c, key, fn)

return c.val, c.err, c.dups > 0
}

func (g *Group) doCall(c *call, key string, fn func() (any, error)) {
// 1. 拿到结果
c.val, c.err = fn()

// 2. map删除该key
g.mu.Lock()
c.wg.Done()
if g.m[key] == c {
delete(g.m, key)
}
// 如果是通过channel传递数据
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}

DoChan

具体逻辑同Do,只是响应数据是通过channel传递的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (g *Group) DoChan(key string, fn func() (any, error)) <-chan Result {
// 1. 为key与结果创建关联

// 设置容量,使当前goroutine不阻塞
ch := make(chan Result, 1)

g.mu.Lock() // map不能并发读写,需要加锁

if g.m == nil {
g.m = make(map[string]*call)
}

// 1.1. 相同的请求已存在,等待结果返回
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}

// 1.2. 新请求,创建结果并关联
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1) // 如果Do和DoChan混用还是需要使用到wg,否则异常
g.m[key] = c
g.mu.Unlock()

// 2. 调用函数获取结果
go g.doCall(c, key, fn)

return ch
}

ForgetUnshared

删除指定key,如果该key已经有其他goroutine等待,不处理

具体逻辑如下

  1. key不存在,返回true
  2. key存在,如果dup等于0,删除并返回true,否则不处理,返回false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (g *Group) ForgetUnshared(key string) bool {
// map加锁
g.mu.Lock()
defer g.mu.Unlock()
c, ok := g.m[key]
// key不存在
if !ok {
return true
}
// key存在但没有共享
if c.dups == 0 {
delete(g.m, key)
return true
}
return false
}

参考文档

Go Singleflight Melts in Your Code, Not in Your DB
Developing with Dragonfly: Solve Caching Problems
A Complete Beginner Guide for Cache Penetration, Stampede, and Avalanche