注:本文已发布超过一年,请注意您所使用工具的相关版本是否适用
本系列为 Go 进阶训练营 笔记,访问 博客: Go进阶训练营 , 即可查看当前更新进度,部分文章篇幅较长,使用 PC 大屏浏览体验更佳。
回顾 在上一篇文章《Week03: Go 并发编程(四) 深入理解 Mutex 》当中我们主要讲到了互斥锁以及读写锁的使用以及源码解析。在看源码的时候我们可以发现里面使用了很多 atomic 包的方法来保证原子,那么我们就趁热打铁接下来就随着本文来看一看 atomic 应该怎么用,以及它又是如何实现的
案例 上一篇文章我们在讲读写锁的时候讲到了一个配置读取的例子,这里我们使用 atomic 实现看一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type Config struct { v atomic.Value }func (c *Config) Get () []int { return (*c.v.Load().(*[]int )) }func (c *Config) Set (n []int ) { c.v.Store(&n) }
跑一个一样的测试,可以发现 atomic 的性能又好上了许多
1 2 3 4 5 6 7 8 ❯ go test -race -bench=. goos: linux goarch: amd64 pkg: github.com/mohuishou/go -training/Week03/blog/05 _atomic BenchmarkMutexConfig-4 1021684 1121 ns/op BenchmarkRWMutexConfig-4 2604524 433 ns/op BenchmarkConfig-4 6941658 170 ns/op PASS
atomic.Value
这种适合配置文件这种读特别多,写特别少的场景,因为他是 COW(Copy On Write)写时复制的一种思想,COW 就是指我需要写入的时候我先把老的数据复制一份到一个新的对象,然后再写入新的值。 我们看看维基百科的描述,我觉得已经说得很清楚了
写入时复制(英语:Copy-on-write,简称 COW)是一种计算机程序设计领域的优化策略。其核心思想是,如果有多个调用者(callers)同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的。此作法主要的优点是如果调用者没有修改该资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。
这种思路会有一个问题,就是可能有部分 goroutine 在使用老的对象,所以老的对象不会立即被回收,如果存在大量写入的话,会导致产生大量的副本,性能反而不一定好 。 这种方式的好处就是不用加锁,所以也不会有 goroutine 的上下文切换,并且在读取的时候大家都读取的相同的副本所以性能上回好一些。 COW 策略在 linux, redis 当中都用的很多,具体可以看一下我后面的参考文献,本文就不展开讲了。
源码分析 方法一览 如果去看文档会发现 atomic 的函数签名有很多,但是大部分都是重复的为了不同的数据类型创建了不同的签名,这就是没有泛型的坏处了,基础库会比较麻烦
1、第一类 AddXXX
当需要添加的值为负数的时候,做减法,正数做加法
1 2 3 4 5 6 func AddInt32 (addr *int32 , delta int32 ) (new int32 ) func AddInt64 (addr *int64 , delta int64 ) (new int64 ) func AddUint32 (addr *uint32 , delta uint32 ) (new uint32 ) func AddUint64 (addr *uint64 , delta uint64 ) (new uint64 ) func AddUintptr (addr *uintptr , delta uintptr ) (new uintptr )
2、第二类 CompareAndSwapXXX
CAS 操作, 会先比较传入的地址的值是否是 old,如果是的话就尝试赋新值,如果不是的话就直接返回 false,返回 true 时表示赋值成功。
1 2 3 4 5 6 func CompareAndSwapInt32 (addr *int32 , old, new int32 ) (swapped bool ) func CompareAndSwapInt64 (addr *int64 , old, new int64 ) (swapped bool ) func CompareAndSwapPointer (addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool ) func CompareAndSwapUint32 (addr *uint32 , old, new uint32 ) (swapped bool ) func CompareAndSwapUint64 (addr *uint64 , old, new uint64 ) (swapped bool ) func CompareAndSwapUintptr (addr *uintptr , old, new uintptr ) (swapped bool )
3、第三类 LoadXXX
,从某个地址中取值
1 2 3 4 5 6 func LoadInt32 (addr *int32 ) (val int32 ) func LoadInt64 (addr *int64 ) (val int64 ) func LoadPointer (addr *unsafe.Pointer) (val unsafe.Pointer) func LoadUint32 (addr *uint32 ) (val uint32 ) func LoadUint64 (addr *uint64 ) (val uint64 ) func LoadUintptr (addr *uintptr ) (val uintptr )
4、第四类 StoreXXX
,给某个地址赋值
1 2 3 4 5 6 func StoreInt32 (addr *int32 , val int32 ) func StoreInt64 (addr *int64 , val int64 ) func StorePointer (addr *unsafe.Pointer, val unsafe.Pointer) func StoreUint32 (addr *uint32 , val uint32 ) func StoreUint64 (addr *uint64 , val uint64 ) func StoreUintptr (addr *uintptr , val uintptr )
5、第五类 SwapXXX
,交换两个值,并且返回老的值
1 2 3 4 5 6 func SwapInt32 (addr *int32 , new int32 ) (old int32 ) func SwapInt64 (addr *int64 , new int64 ) (old int64 ) func SwapPointer (addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) func SwapUint32 (addr *uint32 , new uint32 ) (old uint32 ) func SwapUint64 (addr *uint64 , new uint64 ) (old uint64 ) func SwapUintptr (addr *uintptr , new uintptr ) (old uintptr )
6、最后一类 Value
用于任意类型的值的 Store、Load,我们开始的案例就用到了这个,这是 1.4 版本之后引入的,签名的方法都只能作用于特定的类型,引入这个方法之后就可以用于任意类型了。
1 2 3 type Valuefunc (v *Value) Load () (x interface {}) func (v *Value) Store (x interface {})
CAS 在 sync/atomic
包中的源码除了 Value
之外其他的函数都是没有直接的源码的,需要去 runtime/internal/atomic
中找寻,这里为 CAS
函数为例,其他的都是大同小异
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0 -17 MOVQ ptr+0 (FP), BX MOVL old+8 (FP), AX MOVL new +12 (FP), CX LOCK CMPXCHGL CX, 0 (BX) SETEQ ret+16 (FP) RET
在注释部分写的非常清楚,这个函数主要就是先比较一下当前传入的地址的值是否和 old 值相等,如果相等,就赋值新值返回 true,如果不相等就返回 false 我们看这个具体汇编代码就可以发现,使用了 LOCK
来保证操作的原子性,《Week03: Go 并发编程(二) Go 内存模型 》提到过的一致性问题, CMPXCHG
指令其实就是 CPU 实现的 CAS 操作。
关于 LOCK 指令通过查阅 intel 的手册我们可以发现,对于P6之前的处理器,LOCK 指令会总是锁总线,但是 P6 之后可能会执行“缓存锁定”,如果被锁定的内存区域被缓存在了处理器中,这个时候会通过缓存一致性来保证操作的原子性
Value 1 2 3 type Value struct { v interface {} }
结构非常简单,只有一个 v 用来保存传入的值
Store 我们先看看 store 方法,store 方法会将值存储为 x,这里需要注意,每次传入的 x 不能为 nil,并且他们类型必须是相同的,不然会导致 panic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func (v *Value) Store (x interface {}) { if x == nil { panic ("sync/atomic: store of nil value into Value" ) } vp := (*ifaceWords)(unsafe.Pointer(v)) xp := (*ifaceWords)(unsafe.Pointer(&x)) for { typ := LoadPointer(&vp.typ) if typ == nil { runtime_procPin() if !CompareAndSwapPointer(&vp.typ, nil , unsafe.Pointer(^uintptr (0 ))) { runtime_procUnpin() continue } StorePointer(&vp.data, xp.data) StorePointer(&vp.typ, xp.typ) runtime_procUnpin() return } if uintptr (typ) == ^uintptr (0 ) { continue } if typ != xp.typ { panic ("sync/atomic: store of inconsistently typed value into Value" ) } StorePointer(&vp.data, xp.data) return } }
具体的逻辑都写在注释中了,这里面复杂逻辑在第一次写入,因为第一次写入的时候有两次原子写操作,所以这个时候用 typ 值作为一个判断,通过不同值判断当前所处的状态,这个在我们业务代码中其实也经常用到。然后因为引入了这个中间状态,所以又使用了 runtime_procPin
方法避免抢占
1 2 3 4 5 6 7 8 9 10 11 12 13 func sync_runtime_procPin () int { return procPin() }func procPin () int { _g_ := getg() mp := _g_.m mp.locks++ return int (mp.p.ptr().id) }
Load 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (v *Value) Load () (x interface {}) { vp := (*ifaceWords)(unsafe.Pointer(v)) typ := LoadPointer(&vp.typ) if typ == nil || uintptr (typ) == ^uintptr (0 ) { return nil } data := LoadPointer(&vp.data) xp := (*ifaceWords)(unsafe.Pointer(&x)) xp.typ = typ xp.data = data return }
实战: 实现一个“无锁”栈 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package mainimport ( "sync/atomic" "unsafe" )type LFStack struct { head unsafe.Pointer }type Node struct { val int32 next unsafe.Pointer }func NewLFStack () *LFStack { n := unsafe.Pointer(&Node{}) return &LFStack{head: n} }func (s *LFStack) Push (v int32 ) { n := &Node{val: v} for { old := atomic.LoadPointer(&s.head) n.next = old if atomic.CompareAndSwapPointer(&s.head, old, unsafe.Pointer(n)) { return } } }func (s *LFStack) Pop () int32 { for { old := atomic.LoadPointer(&s.head) if old == nil { return 0 } oldNode := (*Node)(old) next := atomic.LoadPointer(&oldNode.next) if atomic.CompareAndSwapPointer(&s.head, old, next) { return oldNode.val } } }
这里的无锁其实只是没用互斥锁,用了原子操作,前面我们看 atomic 的源码的时候可以发现实际上在 CPU 上还是有锁的,只是我们这个锁的粒度非常小
总结 虽然在一些情况下 atomic 的性能要好很多,但是这个是一个 low level 的库,在实际的业务代码中最好还是使用 channel 但是我们也需要知道,在一些基础库,或者是需要极致性能的地方用上这个还是很爽的,但是使用的过程中一定要小心,不然还是会容易出 bug。
参考文献 https://pkg.go.dev/sync/atomic Go 语言标准库中 atomic.Value 的前世今生 深入浅出 Go - sync/atomic 源码分析 COW 奶牛!Copy On Write 机制了解一下 维基百科: COW 聊聊 CPU 的 LOCK 指令 浅论 Lock 与 X86 Cache 一致性 intel 手册 使用 Go 实现 lock-free 的队列 关注我获取更新 猜你喜欢