先从sync.Mutex定义出发,看看mutex是如何实现; sync.Mutex源码
sema: 信号量, 用于阻塞和唤醒goroutine; 注意, sema并不存储阻塞的goroutine的数量;state: 状态(精彩之处); 它的使用被切分2部分,一部分是状态标识位,另一部分存储wait goroutine的数量;源代码常量定义 | 含义 | 值 |
---|---|---|
mutexLocked | 锁定标志位 | 001 |
mutexWoken | 唤醒标志位 | 010 |
mutexStarving | 饥饿标志位 | 100 |
mutexWaiterShift | 等待计数偏移 | 3 |
所以,锁竞争可以理解为 当前goroutine是否将锁定标志位 设置成1; 正常模式是多goroutine抢占锁定标志位的过程;饥饿模式是饥饿标志位被设置成1;
模式转换结合代码分析以上3种方式,实际是2种,即正常模式和饥饿模式;我这里将正常模式细分出简单模式和正常模式; 首先回忆一下锁的用法, 即
func xxxxx() {
mu.Lock()
defer mu.Unlock()
...
...
}
当多个goroutine 抢锁时,成功的可以继续执行,失败的则被阻塞;等同于 抢锁成功的可以从Lock函数中返回,失败则留在Lock函数中无法返回。
简单模式 加锁解锁func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
...
return
}
m.lockSlow()
}
func (m *Mutex) Unlock() {
...
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
代码容易理解 当G抢锁时state=0变成state=1,加锁成功; 当G解锁时state=1变成state=0 解锁成功;
正常模式 加锁解锁func (m *Mutex) unlockSlow(new int32) {
...
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
...
}
}
先看正常模式的解锁 *** 作,已将饥饿模式代码隐藏; new = m.state - mutexLocked, 意味着 锁定标志被解除; 正常模式解锁 会将state 唤醒标志置1, 并唤醒G
old := new
for {
//old>>mutexWaiterShift = 0 表示无等待的G; 这种情况可以直接返回; 因为没有需要唤醒的G;
//old>>mutexWaiterShift > 0 表示存在等待G;
//
//此时: old 可能为 ..1 000 或者 ..1 010; 因为已解锁且非饥饿
//
//
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
//
// (old - 1<
//
// old 为 ...1 000 --> ...1 010
//
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
runtime_Semrelease(&m.sema, false, 1) 队尾唤醒G
runtime_Semrelease(&m.sema, true, 1) 队首唤醒G
然后看一下正常模式的加锁 *** 作;已将饥饿模式代码隐藏; 前文所过加锁成功可以等同于Lock函数返回,所以紧盯break; 接下来将分析如何退出;
func (m *Mutex) lockSlow() {
var waitStartTime int64 //G等待时间纳秒
starving := false //是否饥饿
awoke := false //是否唤醒
iter := 0 //自旋计数
old := m.state //快照状态
for {
...
...
new := old
//非饥饿模式加锁 确保锁定标志为1
if old&mutexStarving == 0 {
new |= mutexLocked
}
//等待G计数增加
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 进入饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
//唤醒标志重置, 因为唤醒G时都会将唤醒标志置1
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 { //唤醒G抢锁成功 010 解锁时为010
break // locked the mutex with CAS
}
//竞争失败重新等待; 新G将放入等待队列尾,唤醒G将放入等待队列首;
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//这个位置很关键; 它是唤醒G 开始执行的位置; 阻塞的底层是将G与P解耦,然后将G交由sema保存;
//
//进入饥饿条件是等待时间 > 1ms
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
//唤醒G 需要重新快照state; 因为G已睡眠一段时间, state可能已改变;
old = m.state
if old&mutexStarving != 0 {
...
}
//唤醒G 开启新一轮锁竞争
awoke = true
iter = 0
} else {
old = m.state
}
}
...
}
饥饿模式
当唤醒G等待时间大于1ms时,即将进入模式,那么后续锁竞争的G 将直接进入等待队列,无需竞争;
func (m *Mutex) lockSlow() {
var waitStartTime int64 //G等待时间纳秒
starving := false //是否饥饿
awoke := false //是否唤醒
iter := 0 //自旋计数
old := m.state //快照状态
for {
...
new := old
...
//等待G计数增加
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 进入饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
...
if atomic.CompareAndSwapInt32(&m.state, old, new) {
....
//竞争失败重新等待; 新G将放入等待队列尾,唤醒G将放入等待队列首;
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
//这个位置很关键; 它是唤醒G 开始执行的位置; 阻塞的底层是将G与P解耦,然后将G交由sema保存;
//
//进入饥饿条件是等待时间 > 1ms
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
//唤醒G 需要重新快照state; 因为G已睡眠一段时间, state可能已改变;
old = m.state
//饥饿模式处理逻辑,有点绕;
if old&mutexStarving != 0 {
//
//没有等待G是不会进入饥饿模式的 old>>mutexWaiterShift == 0
//
//此时应该 ..1 100
//
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//
// delta = 1 - 8 = -7 饥饿 1.....1 111; 符号位为1,后3位为111;
//
if !starving || old>>mutexWaiterShift == 1 {
//解除饥饿
//-7-4 = -11 = 1...1 011
//
//此时 0...1 100 + 1...1 011 运算结果为 ...1 011 解除饥饿模式
delta -= mutexStarving
}
//等待G计数减1;
//此时 0....1 100 + 1....0 111 运算结果后 为 ...1 111, 仍然为饥饿模式
atomic.AddInt32(&m.state, delta)
break
}
...
} else {
old = m.state
}
}
...
}
阻塞与解锁
加锁自旋
自旋就是空跑阻止G进入阻塞队列; 因为G的唤醒和阻塞涉及G的调度, 调度是耗时的; 如果通过短暂的自选可以获取锁,那就避免G的调度耗时; 同时可以看到自旋会浪费CPU的;
func (m *Mutex) lockSlow() {
...
awoke := false
iter := 0
old := m.state
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
...
}
runtime_canSpin,进入自旋的条件是严苛的,尽力避免浪费CPU;
不能超过4次多核多P空闲待运行队列如有不对之处,欢迎留言评论区
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)