注:本文已发布超过一年,请注意您所使用工具的相关版本是否适用
本系列为 Go 进阶训练营 笔记,访问 博客: Go进阶训练营, 即可查看当前更新进度,部分文章篇幅较长,使用 PC 大屏浏览体验更佳。
序
3 月进度: 08/15 (月初定的目标感觉快完不成了)
SingleFlight
为什么我们需要 SingleFlight(使用场景)?
一般情况下我们在写一写对外的服务的时候都会有一层 cache 作为缓存,用来减少底层数据库的压力,但是在遇到例如 redis 抖动或者其他情况可能会导致大量的 cache miss 出现。
如下图所示,可能存在来自桌面端和移动端的用户有 1000 的并发请求,他们都访问的获取文章列表的接口,获取前 20 条信息,如果这个时候我们服务直接去访问 redis 出现 cache miss 那么我们就会去请求 1000 次数据库,这时可能会给数据库带来较大的压力(这里的 1000 只是一个例子,实际上可能远大于这个值)导致我们的服务异常或者超时。
这时候就可以使用 singleflight 库了,直译过来就是单飞,这个库的主要作用就是将一组相同的请求合并成一个请求,实际上只会去请求一次,然后对所有的请求返回相同的结果。
如下图所示,使用 singleflight 之后,我们在一个请求的时间周期内实际上只会向底层的数据库发起一次请求大大减少对数据库的压力。
SingleFlight 包怎么用(使用教程)?
函数签名
主要是一个 Group
结构体,三个方法,具体信息看下方注释
1 2 3 4 5 6 7 8 9 10 11 12
| type Group func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
func (g *Group) Forget(key string)
|
使用示例
接下来我们看看实际上我们是怎么使用的,先使用一个普通的例子,这时一个获取文章详情的函数,我们在函数里面使用一个 count 模拟不同并发下的耗时的不同,并发越多请求耗时越多
1 2 3 4 5 6 7
| func getArticle(id int) (article string, err error) { atomic.AddInt32(&count, 1) time.Sleep(time.Duration(count) * time.Millisecond)
return fmt.Sprintf("article: %d", id), nil }
|
我们使用 singleflight 的时候就只需要 new(singleflight.Group)
然后调用一下相对应的 Do 方法就可了,是不是很简单
1 2 3 4 5 6 7
| func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) { v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) { return getArticle(id) })
return v.(string), err }
|
效果测试
光说不练假把式,写一个简单的测试代码,下面我们启动 1000 个 Goroutine 去并发调用这两个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| var count int32
func main() { time.AfterFunc(1*time.Second, func() { atomic.AddInt32(&count, -count) })
var ( wg sync.WaitGroup now = time.Now() n = 1000 sg = &singleflight.Group{} )
for i := 0; i < n; i++ { wg.Add(1) go func() { res, _ := getArticle(1) if res != "article: 1" { panic("err") } wg.Done() }() }
wg.Wait() fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now)) }
|
可以看到这个是调用 getArticle
方法的耗时,花费了 1s 多
1 2 3
| ❯ go run ./1.go 同时发起 1000 次请求,耗时: 1.0022831s
|
而使用 singleflight 的方法,花费了不到 3ms
1 2 3
| ❯ go run ./1.go 同时发起 1000 次请求,耗时: 2.5119ms
|
当然每个库都有自己的使用场景,软件领域里面没有银弹,如果我们用的不太好的话甚至可能会得到适得其反的效果,而多看源码不仅能够帮助我们进行学习,也可以尽量少踩坑
它是如何实现的(源码分析)?
本文基于 [https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/singleflight](https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/singleflight) 进行分析,这个库的实现很简单,但是功能很强大,还有一些小技巧,非常值得学习
Group
1 2 3 4
| type Group struct { mu sync.Mutex m map[string]*call }
|
Group 结构体由一个互斥锁和一个 map 组成,可以看到注释 map 是懒加载的,所以 Group 只要声明就可以使用,不用进行额外的初始化零值就可以直接使用。call 保存了当前调用对应的信息,map 的键就是我们调用 Do
方法传入的 key
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| type call struct { wg sync.WaitGroup
val interface{} err error
forgotten bool
dups int chans []chan<- Result }
|
Do
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock()
if g.m == nil { g.m = make(map[string]*call) }
if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok { panic(e) } else if c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true }
c := new(call)
c.wg.Add(1) g.m[key] = c g.mu.Unlock()
g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 }
|
doCall
这个方法的实现有点意思,使用了两个 defer 巧妙的将 runtime 的错误和我们传入 function 的 panic 区别开来避免了由于传入的 function panic 导致的死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { normalReturn := false recovered := false
defer func() {
}()
func() { defer func() { if !normalReturn { if r := recover(); r != nil { c.err = newPanicError(r) } } }()
c.val, c.err = fn()
normalReturn = true }()
if !normalReturn { recovered = true } }
|
再来看看第一个 defer 中的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| defer func() { if !normalReturn && !recovered { c.err = errGoexit }
c.wg.Done() g.mu.Lock() defer g.mu.Unlock()
if !c.forgotten { delete(g.m, key) }
if e, ok := c.err.(*panicError); ok { if len(c.chans) > 0 { go panic(e) select {} } else { panic(e) } } else if c.err == errGoexit { } else { for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }()
|
DoChan
Do chan 和 Do 类似,其实就是一个是同步等待,一个是异步返回,主要实现上就是,如果调用 DoChan 会给 call.chans 添加一个 channel 这样等第一次调用执行完毕之后就会循环向这些 channel 写入数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock()
go g.doCall(c, key, fn)
return ch }
|
Forget
forget 用于手动释放某个 key 下次调用就不会阻塞等待了
1 2 3 4 5 6 7 8
| func (g *Group) Forget(key string) { g.mu.Lock() if c, ok := g.m[key]; ok { c.forgotten = true } delete(g.m, key) g.mu.Unlock() }
|
有哪些注意事项(避坑指南)?
单飞虽好但也不要滥用哦,还是存在一些坑的
1. 一个阻塞,全员等待
使用 singleflight 我们比较常见的是直接使用 Do 方法,但是这个极端情况下会导致整个程序 hang 住,如果我们的代码出点问题,有一个调用 hang 住了,那么会导致所有的请求都 hang 住
还是之前的例子,我们加一个 select 模拟阻塞
1 2 3 4 5 6 7 8 9
| func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) { v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) { select {} return getArticle(id) })
return v.(string), err }
|
执行就会发现死锁了
1 2 3
| fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
|
这时候我们可以使用 DoChan 结合 select 做超时控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func singleflightGetArticle(ctx context.Context, sg *singleflight.Group, id int) (string, error) { result := sg.DoChan(fmt.Sprintf("%d", id), func() (interface{}, error) { select {} return getArticle(id) })
select { case r := <-result: return r.Val.(string), r.Err case <-ctx.Done(): return "", ctx.Err() } }
|
调用的时候传入一个含 超时的 context 即可,执行时就会返回超时错误
1 2
| ❯ go run ./1.go panic: context deadline exceeded
|
2. 一个出错,全部出错
这个本身不是什么问题,因为 singleflight 就是这么设计的,但是实际使用的时候 如果我们一次调用要 1s,我们的数据库请求或者是 下游服务可以支撑 10rps 的请求的时候这会导致我们的错误阈提高,因为实际上我们可以一秒内尝试 10 次,但是用了 singleflight 之后只能尝试一次,只要出错这段时间内的所有请求都会受影响
这种情况我们可以启动一个 Goroutine 定时 forget 一下,相当于将 rps 从 1rps 提高到了 10rps
1 2 3 4 5
| go func() { time.Sleep(100 * time.Millisecond) g.Forget(key) }()
|
总结
这篇文章从使用场景,到使用方法,再到源码分析和可能存在的坑给大家介绍了 singleflight,希望你能有所收获,没事看看官方的代码还是很有收获的,这次又学到了一个骚操作,用双重 defer 来避免死锁,你学废了么?
我们下一篇会开启一个新的系列,Go 可用性,敬请期待!
参考文献
- golang.org/x/sync/singleflight
- sync.singleflight 到底怎么用才对?
- Go 语言并发编程、同步原语与锁 | Go 语言设计与实现
- Go 进阶训练营-极客时间
- x/sync/singleflight: panic in Do fn results in deadlock · Issue #33519 · golang/go · GitHub
关注我获取更新
猜你喜欢