当前位置:网站首页>golang源码分析(6):sync.Mutex sync.RWMutex
golang源码分析(6):sync.Mutex sync.RWMutex
2022-08-02 16:49:00 【用户9710217】
Mutex
居然进化了三个版本,从这也可以看到 go 社区一直在积极的优化与演进
- 最朴素的实现互斥锁,拿到锁返回,拿不到就将当前 goroutine 休眠
- 增加了自旋 spinlock 的逻辑,也就是说大部份 Mutex 锁住时间如果很短,那么自旋可以减小无谓的 runtime 调度。推荐看官方 spin commit
- 进化成了公平锁,老版本中当前抢锁中的 goroutine 大概率比休眠的优先拿到锁,会产生 latency 长尾。新版本中超过一定时间没拿到锁,这个优先级会反转,尽可能减小长尾。推荐大家看 #issue 13086,这里面反映了问题,另外看 commit, 里面有很详细的测试数据,值得学习
那么具体怎么实现呢?分别以 1.3, 1.7, 1.12 三个版本源码为例
Mutex 结构体及常用变量
type Mutex struct {
state int32
sema uint32
}
// 1.3 与 1.7 老的实现共用的常量
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
// 1.12 公平锁使用的常量
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
从中可以看到,Mutex 有两个变量:
- state 4 字节 int, 其中低几位用于做标记,高位地址空间用于计数,表示有多少个 goroutine 正在等待而处于休眠中。
- sema 是一个互斥的信号量,初始默认值是 0,用于将 goroutine park 休眠或是唤醒。sema acquire 时如果 sema 大于 0,那么减一返回,否则休眠等待。sema release 将 sema 加一,然后唤醒等待队列的第一个 goroutine
默认直接使用 sync.Mutex 或是嵌入到结构体中,state 零值代表未上锁,sema 零值也是有意义的,参考下面源码加锁与解锁逻辑,稍想下就会明白的。另外参考大胡子 dave 的关于零值的文章
朴素互斥锁
朴素是什么意思呢?就是能用,粗糙...
上锁
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex. 快速上锁,当前 state 为 0,说明没人锁。CAS 上锁后直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if raceenabled {
raceAcquire(unsafe.Pointer(m))
}
return
}
awoke := false // 被唤醒标记,如果是被别的 goroutine 唤醒的那么后面会置 true
for {
old := m.state // 老的 m.state 值
new := old | mutexLocked // 新值要置 mutexLocked 位为 1
if old&mutexLocked != 0 { // 如果 old mutexLocked 位不为 0,那说明有人己经锁上了,那么将 state 变量的 waiter 计数部分 +1
new = old + 1<<mutexWaiterShift
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case. 如果走到这里 awoke 为 true, 说明是被唤醒的,那么清除这个 mutexWoken 位,置为 0
new &^= mutexWoken
}
// CAS 更新,如果 m.state 不等于 old,说明有人也在抢锁,那么 for 循环发起新的一轮竞争。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 { // 如果 old mutexLocked 位为 1,说明当前 CAS 是为了更新 waiter 计数。如果为 0,说明是抢锁成功,那么直接 break 退出。
break
}
runtime_Semacquire(&m.sema) // 此时如果 sema <= 0 那么阻塞在这里等待唤醒,也就是 park 住。走到这里都是要休眠了。
awoke = true // 有人释放了锁,然后当前 goroutine 被 runtime 唤醒了,设置 awoke true
}
}
if raceenabled {
raceAcquire(unsafe.Pointer(m))
}
}
上锁逻辑其实也不难,这里面更改计数都是用 CAS
- fast path 快速上锁,如果当前 state == 0, 肯定是没人上锁,也没人等待,CAS 更新后直接退出好了
- 当前如果有人锁住了,那么更新 m.state 值的 waiter 计数部份,然后
runtime_Semacquire
将自己休眠,等待被唤醒 runtime_Semacquire
函数返回说明锁释放了,有人将自己唤醒了,那么设置 awoke,大循环发起新的一轮竞争。- 新的竞争到最后,cas 更新了 new 值,此时 old 值 mutexLocked 位肯定为 0,获取锁成功,break 退出即可。
解锁
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if raceenabled {
_ = m.state
raceRelease(unsafe.Pointer(m))
}
// Fast path: drop lock bit. 快速将 state 的 mutexLocked 位清 0,然后 new 返回更新后的值,注意此 add 完成后,很有可能新的 goroutine 抢锁,并上锁成功
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 { // 如果释放了一个己经释放的锁,直接 panic
panic("sync: unlock of unlocked mutex")
}
old := new
for {// 如果 state 变量的 waiter 计数为 0 说明没人等待锁,直接 return 就好,同时如果 old 值的 mutexLocked|mutexWoken 任一置 1,说明要么有人己经抢上了锁,要么说明己经有被唤醒的 goroutine 去抢锁了,没必要去做通知操作
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// Grab the right to wake someone. 将 waiter 计数位减一,并设置 awoken 位
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema) // cas 成功后,再做 sema release 操作,唤醒休眠的 goroutine
return
}
old = m.state
}
}
解锁逻辑也不难,注意一个 goroutine 可以释放别的 goroutine 上的锁
- 原子操作, m.state - mutexLocked, 如果之后 (new+mutexLocked)&mutexLocked 说明释放了一个没上锁的 Mutex,直接 panic
- 接下来为什么是 for 循环呢?原因在于,第一步原子操作后,很可能有第三方刚好获得锁了,那么 for 里面的 CAS 肯定会失败
- 快速判断,如果 waiter 计数为 0,说明没有休眠的 goroutine,不用唤醒。如果 old&(mutexLocked|mutexWoken) != 0 说明要么有人获得了锁,要么己经有 woken 的 goroutine 了,也不用去唤醒。注意这里,mutexLocked 是 for 循环再次判断时才有的, old 值是循环底部重新又获取得
- 然后 CAS 更新成 new 值,设置 woken 标记位,并将等待 waiter 计数减一。最后
runtime_Semrelease
真正的唤醒等待 goroutine
朴素锁的问题
因获取 sema 休眠的 goroutine 会以一个 FIFO 的链表形式保存,如果唤醒时可以优先拿到锁。但是看代码的逻辑,处于休眠中的 goroutine 优先级低于当前活跃的。Unlock
解锁的顺间,最新活跃的 goroutine 是会抢到锁的。另外有时锁时间很短,如果没有自旋 spin 的逻辑,所有 goroutine 都要休眠 park, 徒增 runtime 调度的开销。
自旋 spin 的优化
后来优化时增加了 spin 逻辑,自旋只存在 Lock
阶段,代码以 go 1.7 为例
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
awoke := false
iter := 0
for {
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 { // 如果当前己经锁了,那么判断是否可以自旋
if runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue
}
new = old + 1<<mutexWaiterShift
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime_Semacquire(&m.sema)
awoke = true
iter = 0
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
可以看到,for 循环开始增加了 spin 判断逻辑。
- 如果 runtime 判断允许自旋,那么走 if 逻辑,否则走原有的 Lock 逻辑
- 如果当前 m.state 未设置 woken 标记,并且等待 waiter 计数大于 0,说明有人在等待,那么 CAS 更新 m.state 置位 mutexWoken
- 执行
runtime_doSpin
逻辑,同时 iter++ 表示自旋次数
const (
mutex_unlocked = 0
mutex_locked = 1
mutex_sleeping = 2
active_spin = 4
active_spin_cnt = 30
passive_spin = 1
)
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq on on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
判断 runtime_canSpin
是否允许自旋逻辑也简单,也比较严格
- iter 不大小最大的 active_spin 次数,默认是 4
- 当前机器是多核,并且 GOMAXPROCS > 1,这个很好理解,并发为 1 自旋也没意义
- 最后一个就是当前 P 的本地 runq 队列为空,如果有待运行的 G,那么也不允许自旋
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
自旋代码涉及汇编了,在 amd64 平台调用 PAUSE
,循环 active_spin_cnt 30 次。
公平锁的实现逻辑
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
代码以 go1.12 为例,可以看到注释关于公平锁的实现初衷和逻辑。越是基础组件更新越严格,背后肯定有相关测试数据。
- Mutex 两种工作模式,normal 正常模式,starvation 饥饿模式。normal 情况下锁的逻辑与老版相似,休眠的 goroutine 以 FIFO 链表形式保存在 sudog 中,被唤醒的 goroutine 与新到来活跃的 goroutine 竞解,但是很可能会失败。如果一个 goroutine 等待超过 1ms,那么 Mutex 进入饥饿模式
- 饥饿模式下,解锁后,锁直接交给 waiter FIFO 链表的第一个,新来的活跃 goroutine 不参与竞争,并放到 FIFO 队尾
- 如果当前获得锁的 goroutine 是 FIFO 队尾,或是等待时长小于 1ms,那么退出饥饿模式
- normal 模式下性能是比较好的,但是 starvation 模式能减小长尾 latency
公平锁上锁逻辑
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex. 快速上锁逻辑
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
var waitStartTime int64 // waitStartTime 用于判断是否需要进入饥饿模式
starving := false // 饥饿标记
awoke := false // 是否被唤醒
iter := 0 // spin 循环次数
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway. 饥饿模式下不进行自旋,直接进入阻塞队列
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 { // 只有此时不是饥饿模式时,才设置 mutexLocked,也就是说饥饿模式下的活跃 goroutine 直接排队去
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 { // 处于己经上锁或是饥饿时,waiter 计数 + 1
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case. 如果当前处于饥饿模式下,并且己经上锁了,mutexStarving 置 1,接下来 CAS 会用到
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke { // 如果当前 goroutine 是被唤醒的,然后清 mutexWoken 位
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 { // 如果 old 没有上锁并且也不是饥饿模式,上锁成功直接退出
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0 // 第一次 queueLifo 肯定是 false
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo) // park 在这里,如果 queueLifo 为真,那么扔到队头,也就是 LIFO
// 走到这里,说明被其它 goroutine 唤醒了,继续抢锁时先判断是否需要进入 starving
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 超过 1ms 就进入饥饿模式
old = m.state
if old&mutexStarving != 0 { // 如果原来就是饥饿模式的话,走 if 逻辑
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 此时饥饿模式下被唤醒,那么一定能上锁成功。因为 Unlock 保证饥饿模式下只唤醒 park 状态的 goroutine
delta := int32(mutexLocked - 1<<mutexWaiterShift) // waiter 计数 -1
if !starving || old>>mutexWaiterShift == 1 { // 如果是饥饿模式下并且自己是最后一个 waiter ,那么清除 mutexStarving 标记
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta) // 更新,抢锁成功后退出
break
}
awoke = true // 走到这里,不是饥饿模式,重新发起抢锁竞争
iter = 0
} else {
old = m.state // CAS 失败,重新发起竞争
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
整体来讲,公平锁上锁逻辑复杂了不少,边界点要考滤的比较多
- 同样的 fast path 快速上锁逻辑,原来 m.state 为 0,锁就完事了
- 进入 for 循环,也要走自旋逻辑,但是多了一个判断,如果当前处于饥饿模式禁止自旋,根据实现原理,此时活跃的 goroutine 要直接进入 park 的队列
- 自旋后面的代码有四种情况:饥饿抢锁成功,饥饿抢锁失败,正常抢锁成历,正常抢锁失败。上锁失败的最后都要 waiter 计数加一后,更新 CAS
- 如果 CAS 失败,那么重新发起竞争就好
- 如果 CAS 成功,此时要判断处于何种情况,如果 old 没上锁也处于 normal 模式,抢锁成历退出
- 如果 CAS 成功,但是己经有人上锁了,那么要根据 queueLifo 来判断是扔到 park 队首还是队尾,此时当前 goroutine park 在这里,等待被唤醒
runtime_SemacquireMutex
被唤醒了有两种情况,判断是否要进入饥饿模式,如果老的 old 就是饥饿的,那么自己一定是唯一被唤醒,一定能抢到锁的,waiter 减一,如果自己是最后一个 waiter 或是饥饿时间小于 starvationThresholdNs 那么清除 mutexStarving 标记位后退出- 如果老的不是饥饿模式,那么 awoke 置 true,重新竞争
公平锁解锁逻辑
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit. 和原有逻辑一样,先减去 mutexLocked,并判断是否解锁了未上锁的 Mutex, 直接 panic
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 查看 mutexStarving 标记位,如果 0 走老逻辑,否则走 starvation 分支
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true) // 直接 runtime_Semrelease 唤醒等待的 goroutine
}
}
- 原子操作,将 m.state 减去 mutexLocked,然后判断是否释放了未上锁的 Mutex,直接 panic
- 根据 m.state 的 mutexStarving 判断当前处于何种模式,0 走 normal 分支,1 走 starvation 分支
- starvation 模式下,直接
runtime_Semrelease
做信号量 UP 操作,唤醒 FIFO 队列中的第一个 goroutine - noarmal 模式类似原有逻辑,唯一不同的是多了一个 mutexStarving 位判断逻辑
RWMutex
为读者暴露了两个方法(RLock
和 RUnlock
),同时专门为写者也暴露了两个方法( Lock
和 Unlock
)。
RLock
为了代码简洁起见,我们跳过那些跟竞态检测器相关的代码(用 ...
表示)。
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false)
}
...
}
readerCount
字段的类型是 int32
,它表示当前启用的读者——包括了所有正在临界区里面的读者或者被写者阻塞的等待进入临界区读者的数量。相当于是当前调用了 RLock
函数并且还没调用 RUnLock
函数的读者的数量。
atomic.AddInt32 是下面代码的原子版本:
*addr += delta
return *addr
其中 addr
是 *int32
类型的而 delta
是 int32
类型的。由于这是个原子性的操作,所以增加 delta
不会有干扰到其它线程的风险。(更多关于 Fetch-and-add 的资料 详见这里)
如果我们完全没有用到写者的话,
readerCount
会一直大于或等于 0 (译注:后面会讲到,一旦有写者调用Lock
,Lock
函数就会把readerCount
设置为负数),并且读者获取锁的过程会走较快的非阻塞的分支,因为这时候读者获取锁的过程只涉及到atomic.AddInt32
的调用。
信号量(semaphore)
这是一个由 Edsger Dijkstra 提出的数据结构,解决很多关于同步的问题时,它都很好用。它是一个提供了两种操作的整数:
- 获取(acquire,又称 wait、decrement 或者 P)
- 释放(release,又称 signal、increment 或者 V)
获取操作把信号量减一,如果减一的结果是非负数,那么线程可以继续执行。如果结果是负数,那么线程将会被阻塞,除非有其它线程把信号量增加回非负数,该线程才有可能恢复运行)。
释放操作把信号量加一,如果当前有被阻塞的线程,那么它们其中一个会被唤醒,恢复执行。
Go 语言的运行时提供了 runtime_SemacquireMutex
和 runtime_Semrelease
函数,像 sync.RWMutex
这些对象的实现会用到这两个函数。
Lock 方法
func (rw *RWMutex) Lock() {
...
rw.w.Lock()
// 通过把 rw.readerCount 设置成负数,来告知读者当前有写者正在等待进入临界区
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}
...
}
Lock
方法让写者可以获得对共享数据的独占访问权:
首先它会获取一个叫 w
的互斥量(mutex),这会使得其它的写者无法访问这个共享数据,这个w
只有在 Unlock
函数快结束的时候,才会被解锁,从而保证一次最多只能有一个写者进入临界区。
然后 Lock
方法会把 readerCount
的值设置成负数,(通过把readerCount
减掉 rwmutexMaxReaders
(即1 << 30
))。然后接下来任何读者调用 RLock
函数时,都会被阻塞掉了:
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// rw.readerCount 是负数,说明有写者正在等待进入临界区或者正在临界区内,等待写者执行完成
runtime_SemacquireMutex(&rw.readerSem, false)
}
后续来到临界区的读者们将会被阻塞,那正在运行的读者们会怎样呢?readerWait
字段就是用来记录当前有多少读者正在运行。写者阻塞在信号量 rw.writerSem
里,直到最后一个正在运行的读者执行完毕,它调用的 RUnlock
方法会把 rw.writerSem
信号量加一(我后面会讲到),这时写者才能被唤醒、进入临界区。
如果没有正在运行的读者,那么写者就可以直接进入临界区了。
rwmutexMaxReaders
(译注:原文大量使用的 pending 这个词常常被翻译为「挂起」(有暂停的语义),但是在本文中,pending 表示的是「等待进入临界区(这时是线程是暂停的)或者正在临界区里面(这时是线程正在运行的)」这个状态。「挂起」不能很好的表达该语义,所以 pending 保留原文不翻译,但读者要注意 pending 在本文的语义,例如:「一个 pending 的读者」可以理解为是一个调用了 RLock
函数但是还没调用 RUnlock
函数的读者。「一个 pending 的写者」则相应地表示一个调用了Lock
函数但是还没调用 Unlock
函数的写者)
在 rwmutex.go 里面有一个常量:
const rwmutexMaxReaders = 1 << 30
这个 1 << 30
是什么意思、做什么用的呢?
readerCount
字段是 int32 类型的,它的有效范围是:
[-1 << 31, (1 << 31) - 1] 或者说 [-2147483648, 2147483647]
RWMutex 使用这个字段来记录当前 pending 的读者数,并且这个字段还标记着当前是否有写者在 pending 状态。在 Lock
方法里面:
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
readerCount
字段被减掉了 1<<30
。当 readerCount
的值为负数时,说明当前存在 pending 状态的写者。而 readerCount
再加回 1<<30
,又能表示当前 pending 的读者的数量。最后,rwmutexMaxReaders
还限制了 pending 读者的数量。如果我们的当前 pending 的读者数量比 rwmutexMaxReaders
还要多的话,那么 readerCount
减去 rwmutexMaxReaders
就不是负数了,这样整个机制都会被破坏掉。从中我们可以知道,pending 的读者数量不能大于 rwmutexMaxReaders - 1
,它的值超过了 10 亿——1073741823。
RUnlock
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
...
}
这个方法会把 readerCount
减一 (之前是 RLock
方法把这个值增加了的),如果 readerCount
是负数,意味着当前存在 pending 状态的写者,因为正如上面所说的,在写者调用 Lock
方法的时候,readerCount
的值会减掉 rwmutexMaxReaders
,从而使 readerCount
变成负数。
然后这个方法会检查当前正在临界区里面的读者数是不是已经是 0 了,如果是的话,意味着等待进入临界区的写者可以获取到 rw.writerSem
信号量、进入临界区了。
Unlock
func (rw *RWMutex) Unlock() {
...
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
rw.w.Unlock()
...
}
要解锁写者拥有的写锁,首先 readerCount
的值要增加 rwmutexMaxReaders
,这个操作会使得 readerCount
恢复成非负数,如果这时候 readerCount
大于 0,这意味着当前有读者在等待着写者离开临界区。最后写者释放掉它拥有的 w
这个互斥量(译注:上文说过,这个互斥量是写者用来防止其它写者进入临界区的),这使得其它写者能够有机会再次锁定 w
这个互斥量。
如果读者或写者尝试在一个已经解锁的 RWMutex 上调用Unlock
和 RUnlock
方法会抛出错误(代码):
m := sync.RWMutex{}
m.Unlock()
输出:
fatal error: sync: Unlock of unlocked RWMutex
...
递归地读锁定
文档里面写道:
如果一个 goroutine 拥有一个读锁,而另外一个 goroutine 又调用了 Lock
函数,那么在第一个读锁被释放之前,没有读者可以获得读锁。这尤其限制了我们不能递归地获取读锁,因为只有这样才能确保锁都能变得可用,一个 Lock
的调用会阻止新的读者获取到读锁。(上文已经多次提到这一点了)
因为 RWMutex 就是这么实现的:如果当前有一个 pending 的写者,那么所有尝试调用 RLock
的读者都会被阻塞,即使在这之前已经有读者获取到了读锁(源代码):
package main
import (
"fmt"
"sync"
"time"
)
var m sync.RWMutex
func f(n int) int {
if n < 1 {
return 0
}
fmt.Println("RLock")
m.RLock()
defer func() {
fmt.Println("RUnlock")
m.RUnlock()
}()
time.Sleep(100 * time.Millisecond)
return f(n-1) + n
}
func main() {
done := make(chan int)
go func() {
time.Sleep(200 * time.Millisecond)
fmt.Println("Lock")
m.Lock()
fmt.Println("Unlock")
m.Unlock()
done <- 1
}()
f(4)
<-done
}
输出:
RLock
RLock
RLock
Lock
RLock
fatal error: all goroutines are asleep - deadlock!
(译注:上面的代码有两个 goroutine,一个是写者 routine,一个是主 goroutine(也是读者),通过程序的输出可以知道:前三行都是输出 RLock,表示这时候已经有 3 个读者获取到了读锁。后面接着输出了 Lock, 表示这时候写者开始请求写锁,后面接着输出一个 RLock,表示这时又多了一个读者请求读锁。因为 pending 的写者会阻塞掉后续调用 RLock
的读者,所以最后一个 RLock 的调用堵塞了主 routine,而写者的 routine 也在堵塞等待前面三个读者释放它们的读锁,所以两个 goroutine 都堵塞了,因此程序报错:fatal error: all goroutines are asleep - deadlock!
)
锁的拷贝
go tool vet
可以检测到是否有锁被按值拷贝了,因为这种情况会导致死锁,具体的情况可以看之前的一篇文章:Detect locks passed by value in Go (译注:GCTT 译文:检测 Go 程序中按值传递的 locks
性能
之前有人提出:随着 CPU 核心数量的增加,RWMutex 的性能会降低,详见:https://github.com/golang/go/issues/17973
锁的争用
Go 1.8 版本开始支持分析 mutex 的争用情况(译注:原文 Contention,参考维基百科#Granularity))(patch 补丁),我们来看看它是怎么用的:
import (
"net/http"
_ "net/http/pprof"
"runtime"
"sync"
"time"
)
func main() {
var mu sync.Mutex
runtime.SetMutexProfileFraction(5)
for i := 0; i < 10; i++ {
go func() {
for {
mu.Lock()
time.Sleep(100 * time.Millisecond)
mu.Unlock()
}
}()
}
http.ListenAndServe(":8888", nil)
}
> go build mutexcontention.go
> ./mutexcontention
当程序 mutexcontention 运行时:
> go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1
Fetching profile over HTTP from http://localhost:8888/debug/pprof/mutex?debug=1
Saved profile in /Users/mlowicki/pprof/pprof.mutexcontention.contentions.delay.003.pb.gz
File: mutexcontention
Type: delay
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) list main
Total: 57.28s
ROUTINE ======================== main.main.func1 in /Users/mlowicki/projects/golang/src/github.com/mlowicki/mutexcontention/mutexcontention.go
0 57.28s (flat, cum) 100% of Total
. . 14: for i := 0; i < 10; i++ {
. . 15: go func() {
. . 16: for {
. . 17: mu.Lock()
. . 18: time.Sleep(100 * time.Millisecond)
. 57.28s 19: mu.Unlock()
. . 20: }
. . 21: }()
. . 22: }
. . 23:
. . 24: http.ListenAndServe(":8888", nil)
上面的 57.28s 是什么,它为什么挨着 mu.Unlock()
呢?
当 goroutine 因为调用 Lock
方法而被阻塞的时候,这个时间点会被记录下来——aquiretime(获取时间)。当其他 goroutine 解锁了这个锁,并且起码有一个 goroutine 在等待获取这个锁的时候。其中一个 goroutine 可以获取到这个锁,这时他会自动调用 mutexevent
函数。函数 mutexevent
根据 SetMutexProfileFraction
函数设定的比率,来确定是否应该保存或忽略掉该事件。这种事件都包含了等待时间(当前时间 - 获取时间)。上述的代码中,所有阻塞在这个锁的 goroutine 的总等待时间会被收集和显示出来,
对于读锁(Rlock 和 RUnlock
)争用的分析功能,将会在 Go 1.11 版本加入
边栏推荐
猜你喜欢
随机推荐
尚硅谷尚品项目汇笔记(三)
开始使用 NVIDIA Jetson Orin 上的深度学习加速器
锁定和并发控制(一)
红蓝对抗经验分享:CS免杀姿势
sql2008数据库置疑的解决方法_sqlserver2008数据库可疑
29. 两数相除
接入网学习笔记
边界访问的空间权限
实时数仓架构演进及选型
Switch 块、Switch 表达式、Switch 模式匹配,越来越好用的 Switch
Numpy those things
Navicat for mysql破解版安装
Redis的使用--集群模式
JS数组删除其中一个元素
【电子器件笔记7】MOS管参数和选型
nacos简单使用
金仓数据库KingbaseES安全指南--6.13. 关于身份验证的常见问题
持续交付(一)JenkinsAPI接口调用
图像质量评价指标
Alibaba最新神作——1015页分布式全栈手册太香了