Golang 同步互斥:正常模式和饥饿模式


在 Go 中,互斥(或MUT ual EX排斥)基本上是一种确保一次只有一个 goroutine 干扰共享资源的方法。此资源可以是一段代码、一个整数、一个映射、一个结构、一个通道或几乎任何东西。

为什么我们需要 sync.Mutex?
如果你花了足够多的时间摆弄 Go 中的map,你可能会遇到如下令人讨厌的错误:

fatal error: concurrent map read and map write

发生这种情况是因为我们没有保护我们的地图免受多个 goroutine 同时尝试访问和写入它。

现在,我们可以使用带有互斥锁或 的映射sync.Map,但这不是我们今天的重点。这里的主角是sync.Mutex,它有三个主要操作:Lock、Unlock和TryLock(我们现在不讨论)。

当一个 goroutine 锁定一个互斥锁时,它基本上是在说:“嘿,我要使用这个共享资源一段时间”,并且每个其他 goroutine 都必须等待,直到互斥锁解锁。一旦完成,它应该解锁互斥锁,以便其他 goroutine 可以轮到它们。

就这么简单,让我们通过一个简单的反例看看它是如何工作的:

var counter = 0
var wg sync.WaitGroup

func incrementCounter() {
    counter++
    wg.Done()
}

func main() {
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go incrementCounter()
    }

    wg.Wait()
    fmt.Println(counter)
}


因此,我们得到了一个counter在 1000 个 goroutine 之间共享的变量。Go 新手会认为结果应该是 1000,但事实并非如此。这是因为存在所谓的“竞争条件”。

当多个 goroutine 尝试同时访问和更改共享数据而没有进行适当的同步时,就会发生竞争条件。在这种情况下,增量操作 ( counter++) 不是原子的。

它由多个步骤组成,下面是counter++ARM64 架构下的 Go 汇编代码:

MOVD    main.counter(SB), R0
ADD    $1, R0, R0
MOVD    R0, main.counter(SB)

这counter++是一个读取-修改-写入操作,上述步骤不是原子的,这意味着它们不是作为单一、不间断的操作执行的。

例如,goroutine G1 读取了 counter 的值,在它写入更新的值之前,goroutine G2 读取了相同的值。然后,两者都将更新后的值写回,但由于它们读取的是相同的原始值,因此实际上丢失了一个增量。

使用atomic包是处理这个问题的好方法,但今天我们来关注一下互斥锁如何解决这个问题:

var counter = 0
var wg sync.WaitGroup
var mutex sync.Mutex

func incrementCounter() {
    mutex.Lock()
    counter++
    mutex.Unlock()
    wg.Done()
}

func main() {
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go incrementCounter()
    }

    wg.Wait()
    fmt.Println(counter)
}

现在,结果是 1000,正如我们所料。 在这里使用互斥是超级简单的:用 Lock 和 Unlock 封装临界部分。 但要注意,如果在已经解锁的互斥体上调用 Unlock,就会导致致命的同步错误:sync: unlock of unlocked mutex解锁已解锁的互斥体。

此外,您还可以通过运行 runtime.GOMAXPROCS(1) 将 GOMAXPROCS 设置为 1,这样结果仍然是正确的 1000。 这是因为我们的 goroutines 不会并行运行,而且函数非常简单,不会在运行时被抢占先机。

互斥锁流程
在mutex.Lock函数中,有两条路径:用于通常情况的快速路径和用于处理不常见情况的慢速路径。

1、快速路径
快速路径设计得非常快,预计可以处理大多数尚未使用互斥锁的锁获取。

func (m *Mutex) Lock() {
    // 快速路径:抓取未锁定的互斥。
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // 慢速路径(勾勒出快速路径,以便内联)
    m.lockSlow()
}

快速路径路径也是内联的,这意味着它直接嵌入到调用函数中:

$ go build -gcflags="-m"

./main.go:13:12: inlining call to sync.(*Mutex).Lock
./main.go:15:14: inlining call to sync.(*Mutex).Unlock

仅供参考,这个内联快速路径是一个巧妙的技巧,它利用了 Go 的内联优化,并且在 Go 的源代码中被大量使用。

2、慢速路径
当快速路径中的 CAS(比较和交换)操作失败时,意味着状态字段不为 0,因此互斥锁当前处于锁定状态。


这里真正需要关注的是m.lockSlow慢速路径,它承担了大部分繁重的工作。我们不会深入研究源代码,因为这需要大量有关 Go 内部工作原理的知识。

在慢速路径中,goroutine 会不断主动自旋循环尝试获取锁,而不会直接进入等待队列。

  • 自旋意味着 goroutine 进入一个紧密的循环,在不放弃 CPU 的情况下反复检查互斥的状态。
  • 在这种情况下,执行自旋等待的不是简单的for循环,而是低级汇编指令。

自旋后,它会再次尝试获取锁。如果失败,它还有三次自旋机会,然后才会放弃。因此,它总共会尝试最多 120 个周期。如果它仍然无法获取锁,它就会增加等待者数量,将自己放入等待队列,进入休眠状态,等待唤醒信号并重试。

自旋背后的想法是等待一小会儿,希望互斥锁能尽快释放,让 goroutine 抓住互斥锁,而无需睡眠 - 唤醒周期的开销。

如果我们的计算机没有多个核心,则不会启用自旋,因为它只会浪费 CPU 时间。

“但是如果另一个 goroutine 已经在等待互斥锁怎么办?如果这个 goroutine 先获取锁,这似乎不公平。”

这就是为什么我们的互斥锁有两种模式:正常模式和饥饿模式。在饥饿模式下,自旋不起作用

在正常模式下,等待互斥锁的 goroutine 会按先进先出 (FIFO) 队列进行组织。当某个 goroutine 被唤醒并尝试获取互斥锁时,它不会立即获得控制权。相反,它必须与当时也想要获取互斥锁的任何新 goroutine 竞争。

这种竞争有利于新的 goroutine,因为它们已经在 CPU 上运行,并且可以快速尝试抓取互斥锁,而排队的 goroutine 仍处于唤醒状态。

因此,刚刚醒来的 goroutine 可能会经常在与新竞争者的竞争中落败,并被放回队列的最前面。

“如果那个 goroutine 运气不好,总是在新的 goroutine 到达时被唤醒怎么办?”

好问题。如果发生这种情况,它就永远不会获得锁。这就是为什么我们需要将互斥量切换到饥饿模式。有一个问题 #13086讨论了以前设计的不公平性。

如果 goroutine 超过 1 毫秒无法获取锁,饥饿模式就会启动。它旨在确保等待的 goroutine 最终获得公平的互斥机会。

在此模式下,当 goroutine 释放互斥锁时,它会直接将控制权传递给队列前面的 goroutine。这意味着没有来自新 goroutine 的竞争。它们甚至不会尝试获取它,而只是加入等待队列的末尾。

互斥解锁流程
解锁流程比锁定流程简单。我们仍然有两条路径:快速路径(内联)和慢速路径(处理异常情况)。

快速路径会在互斥器的状态中丢弃锁定位。 如果你还记得互斥体的结构,这就是 mutex.state 的第一位。 如果去掉这一位使状态为零,就意味着没有其他标志被设置(如等待的 goroutines),我们的互斥现在是完全自由的。 但如果状态不为零呢? 这就是慢速路径的作用,它需要知道我们的互斥是处于正常模式还是饥饿模式。 下面我们来看看慢速路径的实现:

func (m *Mutex) unlockSlow(new int32) {
    // 1.试图解锁已解锁的互斥器
// 将导致致命错误。
    if (new+mutexLocked)&mutexLocked == 0 {
        fatal("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            // 2. 如果没有等待者,或者互斥已经被锁定、
// 或被唤醒,或者处于饥饿模式,则返回。
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // 3. 如果互斥程序处于饥饿模式,则将所有权
// 移交给队列中第一个等待的程序.
        runtime_Semrelease(&m.sema, true, 1)
    }
}


在正常模式下,如果存在等待者,且没有其他程序被唤醒或获取锁,则 mutex 会尝试以原子方式递减等待者计数并打开 mutexWoken 标志。

在饥饿模式下,mutex 会以原子方式递增信号寄存器(mutex.sem),并直接将 mutex 的所有权移交给队列中的第一个等待的程序。 runtime_Semrelease 的第二个参数决定移交是否为真。