在使用redis/memcached缓存系统时,可能会遇到以下三个问题
- cache penetration(缓存穿透) - 数据既不在cache中也不在db中,可以用布龙过滤器处理
- cache avalanche(缓存雪崩) - 同一时刻出现大量的key失效,可以将过期时间随机化或者不设置过期时间
- cache breakdown(缓存击穿)/cache stampede(缓存踩踏) - 热门的key过期,客户端加锁或者不设置过期时间
缓存击穿问题中,客户端加锁使穿行化访问是一个值得考虑的解决方法,可以降低服务器(cache/db)的压力。但另一方面,这也会让大量的请求被阻塞,吞吐量下降。实际上,同一时刻的请求可以共享响应数据,这就是singleflight解决的问题
singleflight不是标准库的一部份,但go的internal目录内复制了一份singleflight源码,该源码也是本文在讨论的
当前go版本:1.24
快速上手
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package main
import ( "fmt" "math/rand" "sync" "sync/atomic" "time"
"golang.org/x/sync/singleflight" )
var callCount atomic.Int32 var wg sync.WaitGroup
func fetchData() (interface{}, error) { callCount.Add(1) time.Sleep(100 * time.Millisecond) return rand.Intn(100), nil }
func fetchDataWrapper(g *singleflight.Group, id int) error { defer wg.Done()
time.Sleep(time.Duration(id) * 40 * time.Millisecond) v, err, shared := g.Do("key-fetch-data", fetchData) if err != nil { return err }
fmt.Printf("Goroutine %d: result: %v, shared: %v\n", id, v, shared) return nil }
func main() { var g singleflight.Group
const numGoroutines = 5 wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ { go fetchDataWrapper(&g, i) }
wg.Wait() fmt.Printf("Function was called %d times\n", callCount.Load()) }
|
上述代码运行结果如下
1 2 3 4 5 6 7 8 9
| go run ./main.go
|
可以看到,G0、G1、G2共享result=2,G3、G4共享result=94
数据结构
todo:文章图片待补充
singleflight数据结构如下所示,其中
Group
- 由mutex和map组成,本质就是map,因为map不允许并发读写,所以才加了mutex
call
- 是响应数据的封装,与请求key关联在一起,因为内部的dup计数器,或许可以将其按上下文来理解
Result
- 也是响应数据的封装,跟channel搭配使用
响应数据的封装方式有两种
- 通过函数返回值返回
- 通过channel返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| type Group struct { mu sync.Mutex m map[string]*call }
type call struct { wg sync.WaitGroup val any err error dups int chans []chan<- Result }
type Result struct { Val any Err error Shared bool }
|
核心方法
核心的方法有Do、DoChan、ForgetUnshared,具体看下面描述
Do
具体逻辑如下
- 如果key已存在,挂起等待,唤醒后读取响应数据
- 如果key不存在,为key创建响应数据封装,写入map,然后调用doCall获取响应数据
- 删除key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool) {
g.mu.Lock()
if g.m == nil { g.m = make(map[string]*call) }
if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait() return c.val, c.err, true }
c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0 }
func (g *Group) doCall(c *call, key string, fn func() (any, error)) { c.val, c.err = fn()
g.mu.Lock() c.wg.Done() if g.m[key] == c { delete(g.m, key) } for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } g.mu.Unlock() }
|
DoChan
具体逻辑同Do,只是响应数据是通过channel传递的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (g *Group) DoChan(key string, fn func() (any, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil { g.m = make(map[string]*call) }
if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch }
c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock()
go g.doCall(c, key, fn)
return ch }
|
ForgetUnshared
删除指定key,如果该key已经有其他goroutine等待,不处理
具体逻辑如下
- key不存在,返回true
- key存在,如果dup等于0,删除并返回true,否则不处理,返回false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (g *Group) ForgetUnshared(key string) bool { g.mu.Lock() defer g.mu.Unlock() c, ok := g.m[key] if !ok { return true } if c.dups == 0 { delete(g.m, key) return true } return false }
|
参考文档
Go Singleflight Melts in Your Code, Not in Your DB
Developing with Dragonfly: Solve Caching Problems
A Complete Beginner Guide for Cache Penetration, Stampede, and Avalanche