注:本文已发布超过一年,请注意您所使用工具的相关版本是否适用
本系列为 Go 进阶训练营 笔记,访问 博客: Go进阶训练营 , 即可查看当前更新进度,部分文章篇幅较长,使用 PC 大屏浏览体验更佳。
楔子 在 Week03: Go 并发编程(六) 深入理解 WaitGroup 、 Week03: Go 并发编程(七) 深入理解 errgroup 中我们提到了等待多个 goroutine 协作的方式,但是我们现在想一下这么一个常见的场景。现在有一个 Server 服务在执行,当请求来的时候我们启动一个 goroutine 去处理,然后在这个 goroutine 当中有对下游服务的 rpc 调用,也会去请求数据库获取一些数据,这时候如果下游依赖的服务比较慢,但是又没挂,只是很慢,可能一次调用要 1min 才能返回结果,这个时候我们该如何处理?
如下图所示,首先假设我们使用 WaitGroup 进行控制,等待所有的 goroutine 处理完成之后返回,可以看到我们实际的耗时远远大于了用户可以容忍的时间。 如下图所示,再考虑一个常见的场景,万一上面的 rpc goroutine 很早就报错了,但是 下面的 db goroutine 又执行了很久,我们最后要返回错误信息,很明显后面 db goroutine 执行的这段时间都是在白白的浪费用户的时间。
这时候就应该请出 context 包了,context 主要就是用来在多个 goroutine 中设置截止日期、同步信号,传递请求相关值。 每一次 context 都会从顶层一层一层的传递到下面一层的 goroutine 当上面的 context 取消的时候,下面所有的 context 也会随之取消。 ** 上面的例子当中,如果引入 context 后就会是这样,如下图所示,context 会类似一个树状结构一样依附在每个 goroutine 上,当上层的 req goroutine 的 context 超时之后就会将取消信号同步到下面的所有 goroutine 上一起返回,从而达到超时控制的作用 如下图所示,当 rpc 调用失败之后,会出发 context 取消,然后这个取消信号就会同步到其他的 goroutine 当中
package context 使用说明 在使用一个新的库的时候,我们一般需要先看它的官方说明,得益于 godoc 的约束,所以标准库和第三方库的文档都可以通过 pkg.go.dev 进行搜索查询
使用准则 context 包一开始就告诉了我们应该怎么用,不应该怎么用,这是应该被共同遵守的约定。
对 server 应用而言,传入的请求应该创建一个 context,接受 通过 WithCancel
, WithDeadline
, WithTimeout
创建的 Context 会同时返回一个 cancel 方法,这个方法必须要被执行,不然会导致 context 泄漏,这个可以通过执行 go vet
命令进行检查 应该将 context.Context
作为函数的第一个参数进行传递,参数命名一般为 ctx
不应该将 Context 作为字段放在结构体中。 不要给 context 传递 nil,如果你不知道应该传什么的时候就传递 context.TODO()
不要将函数的可选参数放在 context 当中,context 中一般只放一些全局通用的 metadata 数据,例如 tracing id 等等 context 是并发安全的可以在多个 goroutine 中并发调用 函数签名 context 包暴露的方法不多,看下方说明即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func WithCancel (parent Context) (ctx Context, cancel CancelFunc) func WithDeadline (parent Context, d time.Time) (Context, CancelFunc) func WithTimeout (parent Context, timeout time.Duration) (Context, CancelFunc) type CancelFunctype Context func Background () Context func TODO () Context func WithValue (parent Context, key, val interface {}) Context
源码分析 context.Context 接口 1 2 3 4 5 6 7 8 9 10 11 12 type Context interface { Deadline() (deadline time.Time, ok bool ) Done() <-chan struct {} Err() error Value(key interface {}) interface {} }
默认上下文: context.Backgroud Backgroud(), 在前面有讲到, 一般用于创建 root context,这个 context 永远也不会被取消,或超时 **TODO(), **底层和 Background 一致,但是含义不同,当不清楚用什么的时候或者是还没准备好的时候可以用它
1 2 3 4 5 6 7 8 9 10 11 12 var ( background = new (emptyCtx) todo = new (emptyCtx) )func Background () Context { return background }func TODO () Context { return todo }
查看源码我们可以发现,background 和 todo 都是实例化了一个 emptyCtx
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type emptyCtx int func (*emptyCtx) Deadline () (deadline time.Time, ok bool ) { return }func (*emptyCtx) Done () <-chan struct {} { return nil }func (*emptyCtx) Err () error { return nil }func (*emptyCtx) Value (key interface {}) interface {} { return nil }
emptyCtx 就如同他的名字一样,全都返回空值
如何取消 context : WithCancel WithCancel(), 方法会创建一个可以取消的 context
1 2 3 4 5 6 7 8 9 10 func WithCancel (parent Context) (ctx Context, cancel CancelFunc) { if parent == nil { panic ("cannot create context from nil parent" ) } c := newCancelCtx(parent) propagateCancel(parent, &c) return &c, func () { c.cancel(true , Canceled) } }
不止 WithCancel 方法,其他的 WithXXX 方法也不允许传入一个 nil 值的父 contextnewCancelCtx
只是一个简单的包装就不展开了, propagateCancel
比较有意思,我们一起来看看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func propagateCancel (parent Context, child canceler) { done := parent.Done() if done == nil { return } select { case <-done: child.cancel(false , parent.Err()) return default : } if p, ok := parentCancelCtx(parent); ok { p.mu.Lock() if p.err != nil { child.cancel(false , p.err) } else { if p.children == nil { p.children = make (map [canceler]struct {}) } p.children[child] = struct {}{} } p.mu.Unlock() } else { go func () { select { case <-parent.Done(): child.cancel(false , parent.Err()) case <-child.Done(): } }() } }
接下来我们就看看 cancelCtx 长啥样
1 2 3 4 5 6 7 8 type cancelCtx struct { Context mu sync.Mutex done chan struct {} children map [canceler]struct {} err error }
在 Done 方法这里采用了 懒汉式加载的方式,第一次调用的时候才会去创建这个 channel
1 2 3 4 5 6 7 8 9 func (c *cancelCtx) Done () <-chan struct {} { c.mu.Lock() if c.done == nil { c.done = make (chan struct {}) } d := c.done c.mu.Unlock() return d }
Value 方法很有意思,这里相当于是内部 cancelCtxKey
这个变量的地址作为了一个特殊的 key,当查询这个 key 的时候就会返回当前 context 如果不是这个 key 就会向上递归的去调用 parent context 的 Value 方法查找有没有对应的值
1 2 3 4 5 6 func (c *cancelCtx) Value (key interface {}) interface {} { if key == &cancelCtxKey { return c } return c.Context.Value(key) }
在前面讲到构建父子上下文之间的关系的时候,有一个去查找可以被取消的父 context 的方法 parentCancelCtx
就用到了这个特殊 value
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func parentCancelCtx (parent Context) (*cancelCtx, bool ) { done := parent.Done() if done == closedchan || done == nil { return nil , false } p, ok := parent.Value(&cancelCtxKey).(*cancelCtx) if !ok { return nil , false } p.mu.Lock() ok = p.done == done p.mu.Unlock() if !ok { return nil , false } return p, true }
接下来我们来看最重要的这个 cancel 方法,cancel 接收两个参数,removeFromParent 用于确认是不是把自己从 parent context 中移除,err 是 ctx.Err() 最后返回的错误信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (c *cancelCtx) cancel (removeFromParent bool , err error) { if err == nil { panic ("context: internal error: missing cancel error" ) } c.mu.Lock() if c.err != nil { c.mu.Unlock() return } c.err = err if c.done == nil { c.done = closedchan } else { close (c.done) } for child := range c.children { child.cancel(false , err) } c.children = nil c.mu.Unlock() if removeFromParent { removeChild(c.Context, c) } }
超时自动取消如何实现: WithDeadline, WithTimeout 我们先看看比较常用的 WithTimeout, 可以发现 WithTimeout 其实就是调用了 WithDeadline 然后再传入的参数上用当前时间加上了 timeout 的时间
1 2 3 func WithTimeout (parent Context, timeout time.Duration) (Context, CancelFunc) { return WithDeadline(parent, time.Now().Add(timeout)) }
再来看一下实现超时的 timerCtx,WithDeadline 我们放到后面一点点
1 2 3 4 5 6 type timerCtx struct { cancelCtx timer *time.Timer deadline time.Time }
Deadline()
就是返回了结构体中保存的过期时间
1 2 3 func (c *timerCtx) Deadline () (deadline time.Time, ok bool ) { return c.deadline, true }
cancel
其实就是复用了 cancelCtx 中的取消方法,唯一区别的地方就是在后面加上了对 timer 的判断,如果 timer 没有结束主动结束 timer
1 2 3 4 5 6 7 8 9 10 11 12 13 func (c *timerCtx) cancel (removeFromParent bool , err error) { c.cancelCtx.cancel(false , err) if removeFromParent { removeChild(c.cancelCtx.Context, c) } c.mu.Lock() if c.timer != nil { c.timer.Stop() c.timer = nil } c.mu.Unlock() }
timerCtx 并没有重新实现 Done() 和 Value 方法,直接复用了 cancelCtx 的相关方法
最后我们再看看这个最重要的 WithDeadline 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func WithDeadline (parent Context, d time.Time) (Context, CancelFunc) { if parent == nil { panic ("cannot create context from nil parent" ) } if cur, ok := parent.Deadline(); ok && cur.Before(d) { return WithCancel(parent) } c := &timerCtx{ cancelCtx: newCancelCtx(parent), deadline: d, } propagateCancel(parent, c) dur := time.Until(d) if dur <= 0 { c.cancel(true , DeadlineExceeded) return c, func () { c.cancel(false , Canceled) } } c.mu.Lock() defer c.mu.Unlock() if c.err == nil { c.timer = time.AfterFunc(dur, func () { c.cancel(true , DeadlineExceeded) }) } return c, func () { c.cancel(true , Canceled) } }
可以发现超时控制其实就是在复用 cancelCtx 的基础上加上了一个 timer 来做定时取消
如何为 Context 附加一些值: WithValue WithValue 相对简单一点,主要就是校验了一下 Key 是不是可比较的,然后构造出一个 valueCtx 的结构
1 2 3 4 5 6 7 8 9 10 11 12 func WithValue (parent Context, key, val interface {}) Context { if parent == nil { panic ("cannot create context from nil parent" ) } if key == nil { panic ("nil key" ) } if !reflectlite.TypeOf(key).Comparable() { panic ("key is not comparable" ) } return &valueCtx{parent, key, val} }
valueCtx 主要就是嵌入了 parent context 然后附加了一个 key val
1 2 3 4 type valueCtx struct { Context key, val interface {} }
Value 的查找和之前 cancelCtx 类似,都是先判断当前有没有,没有就向上递归,只是在 cancelCtx 当中 key 是一个固定的 key 而已
1 2 3 4 5 6 func (c *valueCtx) Value (key interface {}) interface {} { if c.key == key { return c.val } return c.Context.Value(key) }
Value 就没有实现 Context 接口的其他方法了,其他的方法全都是复用的 parent context 的方法
使用场景 超时控制 这就是文章开始时候第一个场景下的一个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package mainimport ( "context" "fmt" "time" )func rpc () (string , error) { time.Sleep(100 * time.Millisecond) return "rpc done" , nil }type result struct { data string err error }func handle (ctx context.Context, ms int ) { ctx, cancel := context.WithTimeout(ctx, time.Duration(ms)*time.Millisecond) defer cancel() r := make (chan result) go func () { data, err := rpc() r <- result{data: data, err: err} }() select { case <-ctx.Done(): fmt.Printf("timeout: %d ms, context exit: %+v\n" , ms, ctx.Err()) case res := <-r: fmt.Printf("result: %s, err: %+v\n" , res.data, res.err) } }func main () { for i := 1 ; i < 5 ; i++ { time.Sleep(1 * time.Second) go handle(context.Background(), i*50 ) } time.Sleep(time.Second) }
执行结果
1 2 3 4 5 ▶ go run *.go timeout: 50 ms, context exit: context deadline exceeded result: rpc done, err: <nil > result: rpc done, err: <nil > result: rpc done, err: <nil >
我们可以发现在第一次执行的时候传入的超时时间 50ms 程序超时直接退出了,但是后面超过 50ms 的时候均返回了结果。
错误取消 这是第二个场景的一个例子,假设我们在 main 中并发调用了 f1
f2
两个函数,但是 f1
很快就返回了,但是 f2
还在阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package mainimport ( "context" "fmt" "sync" "time" )func f1 (ctx context.Context) error { select { case <-ctx.Done(): return fmt.Errorf("f1: %w" , ctx.Err()) case <-time.After(time.Millisecond): return fmt.Errorf("f1 err in 1ms" ) } }func f2 (ctx context.Context) error { select { case <-ctx.Done(): return fmt.Errorf("f2: %w" , ctx.Err()) case <-time.After(time.Hour): return nil } }func main () { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() var wg sync.WaitGroup wg.Add(2 ) go func () { defer wg.Done() if err := f1(ctx); err != nil { fmt.Println(err) cancel() } }() go func () { defer wg.Done() if err := f2(ctx); err != nil { fmt.Println(err) cancel() } }() wg.Wait() }
执行结果,可以看到 f1 返回之后 f2 立即就返回了,并且报错 context 被取消
1 2 3 ▶ go run *.go f1 err in 1 ms f2: context canceled
细心的同学可能发现了,这个例子不就是 errgroup 的逻辑么,是的它就是类似 errgroup 的简单逻辑,这时候再反过来去看一下 《Week03: Go 并发编程(七) 深入理解 errgroup - Mohuishou 》这篇文章可能会有不一样的体会
传递共享数据 一般会用来传递 tracing id, request id 这种数据,不要用来传递可选参数,这里借用一下饶大的一个例子,在实际的生产案例中我们代码也是这样大同小异
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 const requestIDKey int = 0 func WithRequestID (next http.Handler) http .Handler { return http.HandlerFunc( func (rw http.ResponseWriter, req *http.Request) { reqID := req.Header.Get("X-Request-ID" ) ctx := context.WithValue( req.Context(), requestIDKey, reqID) req = req.WithContext(ctx) next.ServeHTTP(rw, req) } ) }func GetRequestID (ctx context.Context) string { ctx.Value(requestIDKey).(string ) }func Handle (rw http.ResponseWriter, req *http.Request) { reqID := GetRequestID(req.Context()) ... }func main () { handler := WithRequestID(http.HandlerFunc(Handle)) http.ListenAndServe("/" , handler) }
在某些情况下可以用来防止 goroutine 泄漏 我们看一下官方文档的这个例子, 这里面 gen 这个函数中如果不使用 context done 来控制的话就会导致 goroutine 泄漏,因为这里面的 for 是一个死循环,没有 ctx 就没有相关的退出机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func main () { gen := func (ctx context.Context) <-chan int { dst := make (chan int ) n := 1 go func () { for { select { case <-ctx.Done(): return case dst <- n: n++ } } }() return dst } ctx, cancel := context.WithCancel(context.Background()) defer cancel() for n := range gen(ctx) { fmt.Println(n) if n == 5 { break } } }
总结 使用准则 context 包一开始就告诉了我们应该怎么用,不应该怎么用,这是应该被共同遵守的约定。
对 server 应用而言,传入的请求应该创建一个 context,接受 通过 WithCancel
, WithDeadline
, WithTimeout
创建的 Context 会同时返回一个 cancel 方法,这个方法必须要被执行,不然会导致 context 泄漏,这个可以通过执行 go vet
命令进行检查 应该将 context.Context
作为函数的第一个参数进行传递,参数命名一般为 ctx
不应该将 Context 作为字段放在结构体中。 不要给 context 传递 nil,如果你不知道应该传什么的时候就传递 context.TODO()
不要将函数的可选参数放在 context 当中,context 中一般只放一些全局通用的 metadata 数据,例如 tracing id 等等 context 是并发安全的可以在多个 goroutine 中并发调用 使用场景 超时控制 错误取消 跨 goroutine 数据同步 防止 goroutine 泄漏 缺点 最显著的一个就是 context 引入需要修改函数签名,并且会病毒的式的扩散到每个函数上面,不过这个见仁见智,我看着其实还好 某些情况下虽然是可以做到超时返回提高用户体验,但是实际上是不会退出相关 goroutine 的,这时候可能会导致 goroutine 的泄漏,针对这个我们来看一个例子 我们使用标准库的 timeout handler 来实现超时控制,底层是通过 context 来实现的。我们设置了超时时间为 1ms 并且在 handler 中模拟阻塞 1000s 不断的请求,然后看 pprof 的 goroutine 数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package mainimport ( "net/http" _ "net/http/pprof" "time" )func main () { mux := http.NewServeMux() mux.HandleFunc("/" , func (rw http.ResponseWriter, r *http.Request) { time.Sleep(1000 * time.Second) rw.Write([]byte ("hello" )) }) handler := http.TimeoutHandler(mux, time.Millisecond, "xxx" ) go func () { if err := http.ListenAndServe("0.0.0.0:8066" , nil ); err != nil { panic (err) } }() http.ListenAndServe(":8080" , handler) }
查看数据我们可以发现请求返回后, goroutine 其实并未回收,但是如果不阻塞的话是会立即回收的
1 2 goroutine profile: total 29 24 @ 0x103b125 0x106cc9f 0x1374110 0x12b9584 0x12bb4ad 0x12c7fbf 0x106fd01
我们来看看它的源码,超时控制主要在 ServeHTTP 中实现,我删掉了部分不关键的数据, 我们可以看到函数内部启动了一个 goroutine 去处理请求逻辑,然后再外面等待,但是这里的问题是,当 context 超时之后 ServeHTTP 这个函数就直接返回了,在这里面启动的这个 goroutine 就没人管了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 func (h *timeoutHandler) ServeHTTP (w ResponseWriter, r *Request) { ctx := h.testContext if ctx == nil { var cancelCtx context.CancelFunc ctx, cancelCtx = context.WithTimeout(r.Context(), h.dt) defer cancelCtx() } r = r.WithContext(ctx) done := make (chan struct {}) tw := &timeoutWriter{ w: w, h: make (Header), req: r, } panicChan := make (chan interface {}, 1 ) go func () { defer func () { if p := recover (); p != nil { panicChan <- p } }() h.handler.ServeHTTP(tw, r) close (done) }() select { case p := <-panicChan: panic (p) case <-done: case <-ctx.Done(): } }
总结 context 是一个优缺点都十分明显的包,这个包目前基本上已经成为了在 go 中做超时控制错误取消的标准做法,但是为了添加超时取消我们需要去修改所有的函数签名,对代码的侵入性比较大,如果之前一直都没有使用后续再添加的话还是会有一些改造成本
参考文献 context · pkg.go.dev Go 语言实战笔记(二十)| Go Context Go 语言并发编程与 Context | Go 语言设计与实现 深度解密 Go 语言之 context | qcrao Go Concurrency Patterns: Context - The Go Blog Go Concurrency Patterns: Pipelines and cancellation - The Go Blog 关注我获取更新 猜你喜欢