注:本文已发布超过一年,请注意您所使用工具的相关版本是否适用
本系列为 Go 进阶训练营 笔记,访问 博客: Go进阶训练营, 即可查看当前更新进度,部分文章篇幅较长,使用 PC 大屏浏览体验更佳。
序
在上一篇文章 Go 可用性(二) 限流 1: 令牌桶原理及使用 当中我们简单的介绍了令牌桶实现的原理,然后利用 /x/time/rate 这个库 10 行代码写了一个基于 ip 的 gin 限流中间件,那这个功能是怎么实现的呢?接下来我们就从源码层面来了解一下这个库的实现。这个实现很有意思,并没有真正的使用一个定时器不断的生成令牌,而是靠计算的方式来完成
rate/limt
本文源码基于 https://pkg.go.dev/golang.org/x/time@v0.0.0-20210220033141-f8bda1e9f3ba/rate
上回我们讲到,使用限速器的时候我们需要调用 NewLimiter
方法,然后 Limiter
提供了三组限速的方法,这三组方法其实都是通过调用 reserveN
实现的 reserveN
返回一个 *Reservation
指针,我们先来看一下这两个结构体吧。
Limiter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| type Limiter struct { mu sync.Mutex
limit Limit
burst int
tokens float64
last time.Time
lastEvent time.Time }
|
Reservation
这个结构体挺有意思的,表示预约某个时间的 token
1 2 3 4 5 6 7 8 9 10 11 12
| type Reservation struct { ok bool lim *Limiter tokens int timeToAct time.Time limit Limit }
|
这个库并没有使用定时器来发放 token 而是用了 lazyload 的方式,等需要消费 token 的时候才通过时间去计算然后更新 token 的数量,下面我们先通过一个例子来看一下这个流程是怎么跑的

如上图所示,假设我们有一个限速器,它的 token 生成速度为 1,也就是一秒一个,桶的大小为 10,每个格子表示一秒的时间间隔
last
表示上一次更新 token 时还有 2 个 token。- 现在我有一个请求进来,我总共需要 7 个 token 才能完成这个请求
now
表示我现在进来的时间,距离 last 已经过去了 2s,那么现在就有 4 个 token- 所以我如果需要 7 个 token 那么也就还需要等待 3s 中才真的有 7 个,所以这就是
timeToAct
所在的时间节点 - 预约成功之后更新
last = now
、token = -3
因为 token 已经被预约出去了所以现在剩下的就是负数了
消费 token
总共有三组消费 token 的方法
AllowN, ReserveN, and WaitN最终都是调用的
reserveN` 这个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { lim.mu.Lock()
if lim.limit == Inf { lim.mu.Unlock() return Reservation{ ok: true, lim: lim, tokens: n, timeToAct: now, } }
now, last, tokens := lim.advance(now)
tokens -= float64(n)
var waitDuration time.Duration if tokens < 0 { waitDuration = lim.limit.durationFromTokens(-tokens) }
ok := n <= lim.burst && waitDuration <= maxFutureReserve
r := Reservation{ ok: ok, lim: lim, limit: lim.limit, } if ok { r.tokens = n r.timeToAct = now.Add(waitDuration) }
if ok { lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { lim.last = last }
lim.mu.Unlock() return r }
|
advance
方法用于计算 token 的数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { last := lim.last if now.Before(last) { last = now }
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) elapsed := now.Sub(last) if elapsed > maxElapsed { elapsed = maxElapsed }
delta := lim.limit.tokensFromDuration(elapsed) tokens := lim.tokens + delta if burst := float64(lim.burst); tokens > burst { tokens = burst }
return now, last, tokens }
|
这个比较有意思的是先去计算了时间的最大值,因为初始化的时候没为 last
赋值,所以 now.Before(last)
出来的结果可能是一个很大的值,再去计算 tokens 数量很可能溢出
durationFromTokens 根据 tokens 的数量计算需要花费的时间
1 2 3 4
| func (limit Limit) durationFromTokens(tokens float64) time.Duration { seconds := tokens / float64(limit) return time.Nanosecond * time.Duration(1e9*seconds) }
|
tokensFromDuration 根据时间计算 tokens 的数量
1 2 3 4 5 6
| func (limit Limit) tokensFromDuration(d time.Duration) float64 { sec := float64(d/time.Second) * float64(limit) nsec := float64(d%time.Second) * float64(limit) return sec + nsec/1e9 }
|
消费 token 总结
消费 token 的逻辑就讲完了,我们大概总结一下
- 需要消费的时候,先去计算一下,从过去到现在可以生成多少个 token
- 然后我们通过需要的 token 减去现在拥有的 token 数量,就得到了需要预约的 token 数量
- 再通过 token 数量转换成时间,就可以得到需要等待的时间长度,以及是否可以消费
- 然后再通过不同的消费方法进行消费
WaitN
其他两类消费方法都很简单,调用 Reservation
进行返回, WaitN
还有一点逻辑,所以我们一起来看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { lim.mu.Lock() burst := lim.burst limit := lim.limit lim.mu.Unlock()
if n > burst && limit != Inf { return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst) }
select { case <-ctx.Done(): return ctx.Err() default: }
now := time.Now() waitLimit := InfDuration if deadline, ok := ctx.Deadline(); ok { waitLimit = deadline.Sub(now) }
r := lim.reserveN(now, n, waitLimit)
if !r.ok { return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) }
delay := r.DelayFrom(now) if delay == 0 { return nil }
t := time.NewTimer(delay) defer t.Stop() select { case <-t.C: return nil case <-ctx.Done(): r.Cancel() return ctx.Err() } }
|
取消消费
WaitN
当中如果预约上了,但是 Context
取消了,会调用 CancelAt
归还 tokens 我们来一起看一下是怎么实现的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| func (r *Reservation) CancelAt(now time.Time) { if !r.ok { return }
r.lim.mu.Lock() defer r.lim.mu.Unlock()
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { return }
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) if restoreTokens <= 0 { return }
now, _, tokens := r.lim.advance(now)
tokens += restoreTokens if burst := float64(r.lim.burst); tokens > burst { tokens = burst }
r.lim.last = now r.lim.tokens = tokens
if r.timeToAct == r.lim.lastEvent { prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) if !prevEvent.Before(now) { r.lim.lastEvent = prevEvent } }
return }
|
存在的问题
除了上面提到的感觉 cancelAt 可能有一个 bug 外,云神的博客还提到了一个问题,就是如果我们 cancel 了的话,后面已经在等待的任务是不会重新调整的,举个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| func wait() { l := rate.NewLimiter(10, 10) t := time.Now() l.ReserveN(t, 10)
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.TODO(), time.Hour) defer cancel()
wg.Add(1) go func() { defer wg.Done() time.Sleep(200 * time.Millisecond) cancel() }()
wg.Add(2) go func() { defer wg.Done() l.WaitN(ctx, 10) fmt.Printf("[1] cost: %s\n", time.Since(t)) }()
time.Sleep(100 * time.Millisecond)
go func() { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), time.Hour) defer cancel() l.WaitN(ctx, 2) fmt.Printf("[2] cost: %s\n", time.Since(t)) }()
wg.Wait() }
|
我们先看一下不提前 cancel 的结果
1 2
| [1] cost: 1.0002113s [2] cost: 1.2007347s
|
再看看提前 cancel 的结果
1 2
| [1] cost: 200.8268ms [2] cost: 1.201066s
|
可以看到就是 1 有变化,从 1s -> 200ms 但是 2 一直都要等 1.2s
总结
仔细看了一下令牌桶的实现,但是也留下了一个疑问,如果哪位童鞋知道希望可以留言告诉我,在取消的时候,会减掉一个预约的时间,但是我发现这里其实应该是重复减了一次
1
| restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
|
下面是测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func main() { t0 := time.Now() t1 := time.Now().Add(100 * time.Millisecond) t2 := time.Now().Add(200 * time.Millisecond) t3 := time.Now().Add(300 * time.Millisecond)
l := rate.NewLimiter(10, 20) l.ReserveN(t0, 15) fmt.Printf("%+v\n", l)
r := l.ReserveN(t1, 10) fmt.Printf("%+v\n", l)
l.ReserveN(t2, 2) fmt.Printf("%+v\n", l)
r.CancelAt(t3) fmt.Printf("%+v\n", l) }
|
参考文献
Go 进阶训练营-极客时间
Token bucket - Wikipedia
令牌桶算法_百度百科 (baidu.com)
限流的概念,算法,分布式限流以及微服务架构下限流的难点 - 知乎 (zhihu.com)
令牌桶工作原理 - 知乎 (zhihu.com)
/x/time/rate
开源限流组件分析(二):Golang-time/rate 限速算法实现分析 - 熊喵君的博客 | PANDAYCHEN
Golang rate 无法延迟重排的 BUG – 峰云就她了 (xiaorui.cc)
关注我获取更新
猜你喜欢