GoLang sync.Mutex 实现分析

GoLang sync.Mutex 实现分析,第1张

文章目录 定义介绍模式转换简单模式正常模式饥饿模式 阻塞与解锁加锁自旋

定义介绍

先从sync.Mutex定义出发,看看mutex是如何实现; sync.Mutex源码

sema: 信号量, 用于阻塞和唤醒goroutine; 注意, sema并不存储阻塞的goroutine的数量;state: 状态(精彩之处); 它的使用被切分2部分,一部分是状态标识位,另一部分存储wait goroutine的数量;

源代码常量定义含义
mutexLocked锁定标志位001
mutexWoken唤醒标志位010
mutexStarving饥饿标志位100
mutexWaiterShift等待计数偏移3

所以,锁竞争可以理解为 当前goroutine是否将锁定标志位 设置成1; 正常模式是多goroutine抢占锁定标志位的过程;饥饿模式是饥饿标志位被设置成1;

模式转换

结合代码分析以上3种方式,实际是2种,即正常模式和饥饿模式;我这里将正常模式细分出简单模式和正常模式; 首先回忆一下锁的用法, 即

func xxxxx() {
    mu.Lock()
    defer mu.Unlock()
    ...
    ...
}

当多个goroutine 抢锁时,成功的可以继续执行,失败的则被阻塞;等同于 抢锁成功的可以从Lock函数中返回,失败则留在Lock函数中无法返回。

简单模式 加锁解锁
func (m *Mutex) Lock() {
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        ...
		return
	}
	m.lockSlow()
}

func (m *Mutex) Unlock() {
    ...
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		m.unlockSlow(new)
	}
}

代码容易理解 当G抢锁时state=0变成state=1,加锁成功; 当G解锁时state=1变成state=0 解锁成功;

正常模式 加锁解锁
func (m *Mutex) unlockSlow(new int32) {
    ...
	if new&mutexStarving == 0 {
		old := new
		for {
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
        ...
	}
}

先看正常模式的解锁 *** 作,已将饥饿模式代码隐藏; new = m.state - mutexLocked, 意味着 锁定标志被解除; 正常模式解锁 会将state 唤醒标志置1, 并唤醒G

    old := new
	for {
        //old>>mutexWaiterShift = 0 表示无等待的G; 这种情况可以直接返回; 因为没有需要唤醒的G;
        //old>>mutexWaiterShift > 0 表示存在等待G;
        //
        //此时: old 可能为 ..1 000 或者 ..1 010; 因为已解锁且非饥饿
        //
        //
		if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
			return
		}
        //
        // (old - 1<
        //
        // old 为 ...1 000 --> ...1 010
        //
		new = (old - 1<<mutexWaiterShift) | mutexWoken

		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			runtime_Semrelease(&m.sema, false, 1)
			return
		}

		old = m.state
	}

runtime_Semrelease(&m.sema, false, 1) 队尾唤醒G
runtime_Semrelease(&m.sema, true, 1) 队首唤醒G

然后看一下正常模式的加锁 *** 作;已将饥饿模式代码隐藏; 前文所过加锁成功可以等同于Lock函数返回,所以紧盯break; 接下来将分析如何退出;

func (m *Mutex) lockSlow() {
	var waitStartTime int64    //G等待时间纳秒
	starving := false          //是否饥饿
	awoke := false             //是否唤醒
	iter := 0                  //自旋计数
	old := m.state             //快照状态
	for {
        ...
        ...
		new := old
        //非饥饿模式加锁 确保锁定标志为1
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
        //等待G计数增加
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
        // 进入饥饿模式
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
        //唤醒标志重置, 因为唤醒G时都会将唤醒标志置1
		if awoke {
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {  //唤醒G抢锁成功 010 解锁时为010
				break // locked the mutex with CAS
			}
            //竞争失败重新等待; 新G将放入等待队列尾,唤醒G将放入等待队列首;
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            //这个位置很关键; 它是唤醒G 开始执行的位置; 阻塞的底层是将G与P解耦,然后将G交由sema保存;
            //
            //进入饥饿条件是等待时间 > 1ms
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

            //唤醒G 需要重新快照state; 因为G已睡眠一段时间, state可能已改变;
			old = m.state
			if old&mutexStarving != 0 {
                ...
			}
            //唤醒G 开启新一轮锁竞争
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
    ...
}
饥饿模式

当唤醒G等待时间大于1ms时,即将进入模式,那么后续锁竞争的G 将直接进入等待队列,无需竞争;

func (m *Mutex) lockSlow() {
	var waitStartTime int64    //G等待时间纳秒
	starving := false          //是否饥饿
	awoke := false             //是否唤醒
	iter := 0                  //自旋计数
	old := m.state             //快照状态
	for {
        ...

		new := old
        ...
        //等待G计数增加
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
        // 进入饥饿模式
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
        ...
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
            ....
			//竞争失败重新等待; 新G将放入等待队列尾,唤醒G将放入等待队列首;
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            //这个位置很关键; 它是唤醒G 开始执行的位置; 阻塞的底层是将G与P解耦,然后将G交由sema保存;
            //
            //进入饥饿条件是等待时间 > 1ms
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            //唤醒G 需要重新快照state; 因为G已睡眠一段时间, state可能已改变;
			old = m.state
            //饥饿模式处理逻辑,有点绕;
			if old&mutexStarving != 0 {
                //
                //没有等待G是不会进入饥饿模式的 old>>mutexWaiterShift == 0
                //
                //此时应该 ..1 100
                //
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
                //
                // delta = 1 - 8 = -7 饥饿 1.....1 111; 符号位为1,后3位为111;
                //
				if !starving || old>>mutexWaiterShift == 1 {
                    //解除饥饿
                    //-7-4 = -11 = 1...1 011 
                    //
                    //此时 0...1 100 + 1...1 011 运算结果为 ...1 011 解除饥饿模式
					delta -= mutexStarving
				}
                //等待G计数减1;
                //此时 0....1 100 + 1....0 111 运算结果后 为 ...1 111, 仍然为饥饿模式
				atomic.AddInt32(&m.state, delta)
				break
			}
            ...
        } else {
			old = m.state
		}
	}
    ...
}
阻塞与解锁

加锁自旋

自旋就是空跑阻止G进入阻塞队列; 因为G的唤醒和阻塞涉及G的调度, 调度是耗时的; 如果通过短暂的自选可以获取锁,那就避免G的调度耗时; 同时可以看到自旋会浪费CPU的;

func (m *Mutex) lockSlow() {
    ...
	awoke := false
	iter := 0
	old := m.state
	for {
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		new := old
        ...
    }

runtime_canSpin,进入自旋的条件是严苛的,尽力避免浪费CPU;

不能超过4次多核多P空闲待运行队列

如有不对之处,欢迎留言评论区

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/990489.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存