Go并发编程(四) 深入理解 Mutex

注:本文已发布超过一年,请注意您所使用工具的相关版本是否适用

本系列为 Go 进阶训练营 笔记,访问 博客: Go进阶训练营, 即可查看当前更新进度,部分文章篇幅较长,使用 PC 大屏浏览体验更佳。

回顾

前面几篇文章当中我们都反复提到了 goroutine 建是简单的,但是我们仍然要小心,习惯总会不经意间的导致我们写出很多 bug 对于语言规范没有定义的内容我们不要做任何假设。我们需要通过同步语义控制他们的执行顺序,关于之前的内容可以看前面的三篇文章:

接下来的几篇文章就让我们我们一起来了解一下 sync 包相关的一些用法,以及部分实现原理,当然这里说是 sync 包,实际上包含了三个包分别是: sync, sync/atomic, golang.org/x/sync/errgroup

这些包提供了一些基础的同步语义,但是在实际的并发编程当中,我们应该使用 channel 来进行同步控制。“Share memory by communicating; don't communicate by sharing memory.”

Mutex

案例

我们先来看一下上一篇文章说到的例子应该怎么改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var mu sync.Mutex

func main() {
for i := 1; i <= 2; i++ {
wg.Add(1)
go routine(i)
}
wg.Wait()
fmt.Printf("Final Counter: %d\n", counter)
}

func routine(id int) {
for i := 0; i < 2; i++ {
mu.Lock()
counter++
mu.Unlock()
}
wg.Done()
}

这里主要的目的就是为了保护我们临界区的数据,通过锁来进行保证。锁的使用非常的简单,但是还是有几个需要注意的点

  • 锁的范围要尽量的小,不要搞很多大锁
  • 用锁一定要解锁,小心产生死锁

实现原理

我们来看一下在 Go 中锁是怎么实现的

锁的实现模式[5]

  • Barging: 这种模式是为了提高吞吐量,当锁被释放时,它会唤醒第一个等待者,然后把锁给第一个等待者或者给第一个请求锁的人

1_B1atM-b6GPDS0_Q_TPEUBw.png

  • Handoff: 当锁释放时候,锁会一直持有直到第一个等待者准备好获取锁。它降低了吞吐量,因为锁被持有,即使另一个 goroutine 准备获取它。这种模式可以解决公平性的问题,因为在 Barging 模式下可能会存在被唤醒的 goroutine 永远也获取不到锁的情况,毕竟一直在 cpu 上跑着的 goroutine 没有上下文切换会更快一些。缺点就是性能会相对差一些

image.png

  • Spining:自旋在等待队列为空或者应用程序重度使用锁时效果不错。Parking 和 Unparking goroutines 有不低的性能成本开销,相比自旋来说要慢得多。_但是自旋是有成本的,所以在 go 的实现中进入自旋的条件十分的苛刻。_

image.png

Go Mutex 实现原理

我们先来看一下在 Go 中具体是怎么实现的,我们先讲原理再看源码,避免看的云里雾里的。**

加锁

如下图所示,Go 在 1.15 的版本中锁的实现结合上面提到的三种模式,调用 Lock 方法的时候。

  1. 首先如果当前锁处于初始化状态就直接用 CAS 方法尝试获取锁,这是**_ Fast Path_**
  2. 如果失败就进入*** Slow Path***
    1. 会首先判断当前能不能进入自旋状态,如果可以就进入自旋,最多自旋 4 次
    2. 自旋完成之后,就会去计算当前的锁的状态
    3. 然后尝试通过 CAS 获取锁
    4. 如果没有获取到就调用 runtime_SemacquireMutex 方法休眠当前 goroutine 并且尝试获取信号量
    5. goroutine 被唤醒之后会先判断当前是否处在饥饿状态,(如果当前 goroutine 超过 1ms 都没有获取到锁就会进饥饿模式) 1. 如果处在饥饿状态就会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出 1. 如果不在,就会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环
      CAS 方法在这里指的是 `atomic.CompareAndSwapInt32(addr, old, new) bool` 方法,这个方法会先比较传入的地址的值是否是 old,如果是的话就尝试赋新值,如果不是的话就直接返回 false,返回 true 时表示赋值成功 饥饿模式是 Go 1.9 版本之后引入的优化,用于解决公平性的问题[10]
      ![02_Go进阶03_blog_sync.drawio.svg](https://img.lailin.xyz/image/1608970759375-09d8cda7-77ac-48d3-b2f3-b8890e927bd4.svg)

解锁

解锁的流程相对于加锁简单许多
02_Go进阶03_blog_sync.drawio.svg

源码分析

Mutex 基本结构

知道其中的原理之后,我们再来看看源码分析

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

Mutex  结构体由 state  sema  两个 4 字节成员组成,其中 state  表示了当前锁的状态, sema  是用于控制锁的信号量
02_Go进阶03_blog_sync.drawio.svg
state  字段的最低三位表示三种状态,分别是 mutexLocked  mutexWoken mutexStarving ,剩下的用于统计当前在等待锁的 goroutine 数量

  • mutexLocked  表示是否处于锁定状态
  • mutexWoken 表示是否处于唤醒状态
  • mutexStarving 表示是否处于饥饿状态

加锁

回味一下上面看到的流程图,我们来看看互斥锁是如何加锁的

1
2
3
4
5
6
7
8
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
  • 当我们调用 Lock  方法的时候,会先尝试走 Fast Path,也就是如果当前互斥锁如果处于未加锁的状态,尝试加锁,只要加锁成功就直接返回
  • 否则的话就进入 slow path
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 等待时间
starving := false // 是否处于饥饿状态
awoke := false // 是否处于唤醒状态
iter := 0 // 自旋迭代次数
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}

lockSlow  方法中我们可以看到,有一个大的 for 循环,不断的尝试去获取互斥锁,在循环的内部,第一步就是判断能否自旋状态。
进入自旋状态的判断比较苛刻,具体需要满足什么条件呢? runtime_canSpin  源码见下方

  • 当前互斥锁的状态是非饥饿状态,并且已经被锁定了
  • 自旋次数不超过 4 次
  • cpu 个数大于一,必须要是多核 cpu
  • 当前正在执行当中,并且队列空闲的 p 的个数大于等于一
1
2
3
4
5
6
7
8
9
10
11
12
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

如果可以进入自旋状态之后就会调用 runtime_doSpin  方法进入自旋, doSpin  方法会调用 procyield(30)  执行三十次 PAUSE  指令

1
2
3
4
5
6
7
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
为什么使用 PAUSE 指令呢? PAUSE 指令会告诉 CPU 我当前处于处于自旋状态,这时候 CPU 会针对性的做一些优化,并且在执行这个指令的时候 CPU 会降低自己的功耗,减少能源消耗
1
2
3
4
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
在自旋的过程中会尝试设置 `mutexWoken` 来通知解锁,从而避免唤醒其他已经休眠的 `goroutine` 在自旋模式下,当前的 `goroutine` 就能更快的获取到锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
自旋结束之后就会去计算当前互斥锁的状态,如果当前处在饥饿模式下则不会去请求锁,而是会将当前 goroutine 放到队列的末端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
}
状态计算完成之后就会尝试使用 CAS 操作获取锁,如果获取成功就会直接退出循环 如果获取失败,则会调用 `runtime_SemacquireMutex(&m.sema, queueLifo, 1)` 方法保证锁不会同时被两个 goroutine 获取。`runtime_SemacquireMutex` 方法的主要作用是:
  • 不断调用尝试获取锁
  • 休眠当前 goroutine
  • 等待信号量,唤醒 goroutine

goroutine 被唤醒之后就会去判断当前是否处于饥饿模式,如果当前等待超过 1ms  就会进入饥饿模式

  • 饥饿模式下:会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出
  • 正常模式下:会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环

解锁

和加锁比解锁就很简单了,直接看注释就好

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 解锁一个没有锁定的互斥量会报运行时错误
// 解锁没有绑定关系,可以一个 goroutine 锁定,另外一个 goroutine 解锁
func (m *Mutex) Unlock() {
// Fast path: 直接尝试设置 state 的值,进行解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
// 如果减去了 mutexLocked 的值之后不为零就会进入慢速通道,这说明有可能失败了,或者是还有其他的 goroutine 等着
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
// 解锁一个没有锁定的互斥量会报运行时错误
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 判断是否处于饥饿模式
if new&mutexStarving == 0 {
// 正常模式
old := new
for {
// 如果当前没有等待者.或者 goroutine 已经被唤醒或者是处于锁定状态了,就直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 唤醒等待者并且移交锁的控制权
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式,走 handoff 流程,直接将锁交给下一个等待的 goroutine,注意这个时候不会从饥饿模式中退出
runtime_Semrelease(&m.sema, true, 1)
}
}

RWMutex

读写锁相对于互斥锁来说粒度更细,使用读写锁可以并发读,但是不能并发读写,或者并发写写

YN
NN

案例

其实大部分的业务应用都是读多写少的场景,这个时候使用读写锁的性能就会比互斥锁要好一些,例如下面的这个例子,是一个配置读写的例子,我们分别使用读写锁和互斥锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// RWMutexConfig 读写锁实现
type RWMutexConfig struct {
rw sync.RWMutex
data []int
}

// Get get config data
func (c *RWMutexConfig) Get() []int {
c.rw.RLock()
defer c.rw.RUnlock()
return c.data
}

// Set set config data
func (c *RWMutexConfig) Set(n []int) {
c.rw.Lock()
defer c.rw.Unlock()
c.data = n
}

互斥锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

// MutexConfig 互斥锁实现
type MutexConfig struct {
data []int
mu sync.Mutex
}

// Get get config data
func (c *MutexConfig) Get() []int {
c.mu.Lock()
defer c.mu.Unlock()
return c.data
}

// Set set config data
func (c *MutexConfig) Set(n []int) {
c.mu.Lock()
defer c.mu.Unlock()
c.data = n
}

并发基准测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type iConfig interface {
Get() []int
Set([]int)
}

func bench(b *testing.B, c iConfig) {
b.RunParallel(func(p *testing.PB) {
for p.Next() {
c.Set([]int{100})
c.Get()
c.Get()
c.Get()
c.Set([]int{100})
c.Get()
c.Get()
}
})
}

func BenchmarkMutexConfig(b *testing.B) {
conf := &MutexConfig{data: []int{1, 2, 3}}
bench(b, conf)
}

func BenchmarkRWMutexConfig(b *testing.B) {
conf := &RWMutexConfig{data: []int{1, 2, 3}}
bench(b, conf)
}

执行结果

1
2
3
4
5
6
7
8
go test -race -bench=.
goos: linux
goarch: amd64
pkg: github.com/mohuishou/go-training/Week03/blog/04_sync/02_rwmutex
BenchmarkMutexConfig-4 179577 6912 ns/op
BenchmarkRWMutexConfig-4 341620 3425 ns/op
PASS
ok github.com/mohuishou/go-training/Week03/blog/04_sync/02_rwmutex 3.565s

可以看到首先是没有 data race 问题,其次读写锁的性能几乎是互斥锁的一倍

源码解析

基本结构

1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // 复用互斥锁
writerSem uint32 // 信号量,用于写等待读
readerSem uint32 // 信号量,用于读等待写
readerCount int32 // 当前执行读的 goroutine 数量
readerWait int32 // 写操作被阻塞的准备读的 goroutine 的数量
}

由于复用了互斥锁的代码,读写锁的源码很简单,这里我就不单独画图了

读锁

加锁

1
2
3
4
5
6
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}

首先是读锁, atomic.AddInt32(&rw.readerCount, 1)  调用这个原子方法,对当前在读的数量加一,如果返回负数,那么说明当前有其他写锁,这时候就调用 runtime_SemacquireMutex  休眠 goroutine 等待被唤醒

解锁

1
2
3
4
5
6
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}

解锁的时候对正在读的操作减一,如果返回值小于 0 那么说明当前有在写的操作,这个时候调用 rUnlockSlow  进入慢速通道

1
2
3
4
5
6
7
8
9
10
11
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

被阻塞的准备读的 goroutine 的数量减一,readerWait 为 0,就表示当前没有正在准备读的 goroutine 这时候调用 runtime_Semrelease  唤醒写操作

写锁

加锁

1
2
3
4
5
6
7
8
9
10
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}

首先调用互斥锁的 lock,获取到互斥锁之后,

  • atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)  调用这个函数阻塞后续的读操作
  • 如果计算之后当前仍然有其他 goroutine 持有读锁,那么就调用 runtime_SemacquireMutex  休眠当前的 goroutine 等待所有的读操作完成

解锁

1
2
3
4
5
6
7
8
9
10
11
12
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
}

解锁的操作,会先调用 atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)  将恢复之前写入的负数,然后根据当前有多少个读操作在等待,循环唤醒

参考文献

  1. https://pkg.go.dev/sync 官网文档,必读
  2. https://pkg.go.dev/golang.org/x/sync@v0.0.0-20201207232520-09787c993a3a/errgroup 官网文档,必读
  3. https://pkg.go.dev/sync/atomic 官网文档,必读
  4. Go: How to Reduce Lock Contention with the Atomic Package
  5. Go: Mutex and Starvation
  6. Go 进阶 27:Go 语言 Mutex Starvation(译)
  7. Go 语言设计与实现-6.2 同步原语与锁 这本书值得一看
  8. PAUSE — Spin Loop Hint
  9. Linux x86 自旋锁的实现
  10. https://github.com/golang/go/commit/0556e26273f704db73df9e7c4c3d2e8434dec7be

关注我获取更新

wechat
知乎
github

猜你喜欢