go 并发调度笔记

go 并发调度笔记,第1张

go 并发调度笔记 开启一个m
  • 从空闲process列表中获取一个process

  • 从空闲列表获取一个m,或者新建一个m

  • 使用newosproc()方法创建一个内核线程,并把内核线程和m以及mstart方法关联,该线程执行时会调用mstart方法

  • 由该m的g0开始执行schedule方法

    • g0的作用?

    • 为runtime 下调度G的工作提供栈空间,每个m都有一个g0

调度循环schedule()
  • 获取G,有三种方式

    • 每处理n个任务后就去全局g队列中获取G,同时会将全局队列中一定数量的G搬运到自己的本地队列

    • 从本地队列中获取。为了增加公平性,获取都从队头获取,新加入都加到队尾

    • 从netpoll中获取ready的G

    • 尝试4次从其他P中窃取G,同时也会将其他P的一半G窃取到自身队列中,充分利用了CPU

  • 调用execute执行该G,最终通过汇编方法gogo传入执行栈信息(栈指针、程序计数器等)执行G

G如何让出CPU
  • 执行完毕后让出,执行完毕后会调用在newproc时设置的goexit方法,goexit会切换到g0释放该G
  • 主动让出,time.Sleep、channel阻塞、io阻塞等场景下会主动调用gopark方法,该方法会切换到g0,将当前G的状态从 running 切换为 waiting,然后开始下一轮循环
  • 抢占让出,在进程启动时会启动一个监控任务,叫sysmon,该任务会每隔一段时间去查看每个P是否有执行时间过长的G(10ms),如果有,则会标记抢占,在栈扩张时(newstack)会检查是否有抢占标记,如果有,则将该G从 running 更改为 runnable(goschedImpl),放到全局队列中等待下一次被调度到,最后再次进入schedule()
  • 系统调用让出 ,该内核线程无法调度运行其他goroutine,在执行SysCall时在汇编中加入了entersyscall和exitsyscall两个方法,在进入系统调用前,会保存好执行现场,同时更改状态为 syscall ,然后标记为可抢占(因为系统调用会阻塞底层内核线程);系统调用结束后,会切换到G0,先检查是否有空闲的P,如果没有的话就放到全局可执行队列中等待被执行并进入schedule(),如果有的话,将空闲的P跟当前M绑定,并立刻执行
go func的过程
  • 在调用方的堆栈上执行newproc1

    • 优先从当前P内free列表中复用,如果没有就新分配一个

    • 将入参用memmove拷贝到新G的栈指针

    • 将goexit方法设置到新G的pc寄存器,用于当执行G结束后找到退出方法,从而再次进入调度循环(goexit会切换为g0重新开始schedule())

  • 将新创建的G放到

mcall(fn func(*g))

从g切换到g0,在g0的堆栈上执行 fn(g),比如

func goexit1() {
    ...
  mcall(goexit0)//当前执行的是g,这里的调用会切换到g0,并执行 goexit0(g)
}
goexit0分析

主要是将g各个字段重置,然后放回gfree列表便于复用,最后调用schedule()继续进入调度循环

// gp是需要被退出清理的g
// 整个goexit0是由g0执行的(在g0的堆栈上执行的)
func goexit0(gp *g) {
    _g_ := getg()//这里获取到的 _g_ 是g0

    casgstatus(gp, _Grunning, _Gdead)//将gp设置为 dead
    if isSystemGoroutine(gp, false) {
        atomic.Xadd(&sched.ngsys, -1)
    }
    gp.m = nil //跟m解绑
    locked := gp.lockedm != 0
    gp.lockedm = 0
    _g_.m.lockedg = 0
    gp.preemptStop = false
    gp.paniconfault = false
    gp._defer = nil // should be true already but just in case.
    gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
    gp.writebuf = nil
    gp.waitreason = 0
    gp.param = nil
    gp.labels = nil
    gp.timer = nil
    ...
    dropg()//将当前执行的g0从m的current的位置拿走
    ...
    gfput(_g_.m.p.ptr(), gp)// 将gp放回 p 的 gfree 队列进行复用
    ...
    schedule()
}
reentersyscall分析
func reentersyscall(pc, sp uintptr) {
    _g_ := getg()

    // Disable preemption because during this function g is in Gsyscall status,
    // but can have inconsistent g->sched, do not let GC observe it.
    _g_.m.locks++

    // Entersyscall must not call any function that might split/grow the stack.
    // (See details in comment above.)
    // Catch calls that might, by replacing the stack guard with something that
    // will trip any stack check and leaving a flag to tell newstack to die.
    _g_.stackguard0 = stackPreempt
    _g_.throwsplit = true

    // Leave SP around for GC and traceback.
    save(pc, sp)
    _g_.syscallsp = sp
    _g_.syscallpc = pc
    casgstatus(_g_, _Grunning, _Gsyscall)
    if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
        systemstack(func() {
            print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]n")
            throw("entersyscall")
        })
    }

    if trace.enabled {
        systemstack(traceGoSysCall)
        // systemstack itself clobbers g.sched.{pc,sp} and we might
        // need them later when the G is genuinely blocked in a
        // syscall
        save(pc, sp)
    }

    if atomic.Load(&sched.sysmonwait) != 0 {
        systemstack(entersyscall_sysmon)
        save(pc, sp)
    }

    if _g_.m.p.ptr().runSafePointFn != 0 {
        // runSafePointFn may stack split if run on this stack
        systemstack(runSafePointFn)
        save(pc, sp)
    }

    _g_.m.syscalltick = _g_.m.p.ptr().syscalltick
    _g_.sysblocktraced = true
    pp := _g_.m.p.ptr()
    pp.m = 0 //将 p 和 m 分离
    _g_.m.oldp.set(pp)
    _g_.m.p = 0 //将 m 和 p 分离
    atomic.Store(&pp.status, _Psyscall) //将 p 状态标记为 syscall
    if sched.gcwaiting != 0 {
        systemstack(entersyscall_gcwait)
        save(pc, sp)
    }

    _g_.m.locks--
}


func entersyscall_sysmon() {
    lock(&sched.lock)
    if atomic.Load(&sched.sysmonwait) != 0 {
        //将全局的sched.sysmonwait置零,同时通知CPU的等待者
        atomic.Store(&sched.sysmonwait, 0)
        notewakeup(&sched.sysmonnote)
    }
    unlock(&sched.lock)
}
retake函数
func retake(now int64) uint32 {
	n := 0
	// Prevent allp slice changes. This lock will be completely
	// uncontended unless we're already stopping the world.
    // 全局p锁
	lock(&allpLock)
	// We can't use a range loop over allp because we may
	// temporarily drop the allpLock. Hence, we need to re-fetch
	// allp each time around the loop.
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
        // 1. p == nil 说明通过runtime.GOMAXPROCS 动态对P进行了调大,但当前还未初始化使用 直接跳过
		if _p_ == nil {
			// This can happen if procresize has grown
			// allp but not yet created new Ps.
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false
        // 2. p 为 running 或 系统调用
		if s == _Prunning || s == _Psyscall {
			// Preempt G if it's running for too long.
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now {
                // G运行时间过长,立刻进行标记抢占
                // 注意:因为 syscall 状态下 p 已经和 m 互相分离了,所以 preemptone 不会进行抢占,同时返回 false
				preemptone(_p_)
				// In case of syscall, preemptone() doesn't
				// work, because there is no M wired to P.
				sysretake = true
			}
		}
        // 3. 系统调用
		if s == _Psyscall {
			// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
			t := int64(_p_.syscalltick)
			if !sysretake && int64(pd.syscalltick) != t {
				pd.syscalltick = uint32(t)
				pd.syscallwhen = now
				continue
			}
			// On the one hand we don't want to retake Ps if there is no other work to do,
			// but on the other hand we want to retake them eventually
			// because they can prevent the sysmon thread from deep sleep.
			if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
				continue
			}
			// Drop allpLock so we can take sched.lock.
			unlock(&allpLock)
			// Need to decrement number of idle locked M's
			// (pretending that one more is running) before the CAS.
			// Otherwise the M from which we retake can exit the syscall,
			// increment nmidle and report deadlock.
			incidlelocked(-1)
			if atomic.Cas(&_p_.status, s, _Pidle) {
				if trace.enabled {
					traceGoSysBlock(_p_)
					traceProcStop(_p_)
				}
				n++
				_p_.syscalltick++
                //给p重新分配一个m
				handoffp(_p_)
			}
			incidlelocked(1)
			lock(&allpLock)
		}
	}
	unlock(&allpLock)
	return uint32(n)
}


func preemptone(_p_ *p) bool {
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
        // syscall 状态下, p 和 m 已经分离会在这里返回
		return false
	}
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}

	gp.preempt = true

	// Every call in a go routine checks for stack overflow by
	// comparing the current stack pointer to gp->stackguard0.
	// Setting gp->stackguard0 to StackPreempt folds
	// preemption into the normal stack overflow check.
	gp.stackguard0 = stackPreempt

	// Request an async preemption of this P.
	if preemptMSupported && debug.asyncpreemptoff == 0 {
		_p_.preempt = true
		preemptM(mp)
	}

	return true
}
handoffp
func handoffp(_p_ *p) {
	// handoffp must start an M in any situation where
	// findrunnable would return a G to run on _p_.

	// 如果还有未处理完的任务,则新开一个m处理
	if !runqempty(_p_) || sched.runqsize != 0 {
		startm(_p_, false)
		return
	}
	// if it has GC work, start it straight away
	if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
		startm(_p_, false)
		return
	}
	// no local work, check that there are no spinning/idle M's,
	// otherwise our help is not required
	if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
		startm(_p_, true)
		return
	}
	lock(&sched.lock)
	if sched.gcwaiting != 0 {
		_p_.status = _Pgcstop
		sched.stopwait--
		if sched.stopwait == 0 {
			notewakeup(&sched.stopnote)
		}
		unlock(&sched.lock)
		return
	}
	if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
		sched.safePointFn(_p_)
		sched.safePointWait--
		if sched.safePointWait == 0 {
			notewakeup(&sched.safePointNote)
		}
	}
	if sched.runqsize != 0 {
		unlock(&sched.lock)
		startm(_p_, false)
		return
	}
	// If this is the last running P and nobody is polling network,
	// need to wakeup another M to poll network.
	if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
		unlock(&sched.lock)
		startm(_p_, false)
		return
	}
	if when := nobarrierWakeTime(_p_); when != 0 {
		wakeNetPoller(when)
	}
	pidleput(_p_)
	unlock(&sched.lock)
}
notesleep和notewakeup

如果是要进行一次性的通知,可以使用 note。note 提供了 notesleep 和 notewakeup。不像传统的 UNIX 的 sleep/wakeup,note 是无竞争的(race-free),所以如果 notewakeup 已经发生了,那么 notesleep 将会立即返回。note 可以在使用后通过 noteclear 来重置,但是要注意 noteclear 和 notesleep、notewakeup 不能发生竞争。类似 mutex,阻塞在 note 上会阻塞整个 M。然而,note 提供了不同的方式来调用 sleep:notesleep 会阻止相关联的 G 和 P 被重新调度;notetsleepg 的表现却像一个阻塞的系统调用一样,允许 P 被重用去运行另一个 G。尽管如此,这仍然比直接阻塞一个 G 要低效,因为这需要消耗一个 M。

栈扩张

go的协程设计是stackful coroutine,每一个goroutine都需要有自己的栈空间,
栈空间的内容再goroutine休眠时候需要保留的,等到重新调度时候恢复(这个时候整个调用树是完整的)。
这样就会引出一个问题,如果系统存在大量的goroutine,给每一个goroutine都预先分配一个足够的栈空间那么go就会使用过多的内存。

为了避免内存使用过多问题,go在一开始时候,会默认只为goroutine分配一个很小的栈空间,它的大小在1.92版本中是2k。
当函数发现栈空间不足时,会申请一块新的栈空间并把原来的栈复制过去。

g实例里面的g.stack、g.stackguard0两个变量来描述goroutine实例的栈。

写屏障

go支持并行GC的,GC的扫描阶段和go代码可以同时运行。这样带来的问题是,GC扫描的过程中go代码的执行可能改变了对象依赖树。

比如:开始扫描时候发现根对象A和B,B拥有C的指针,GC先扫描A,然后B把C的指针交给A,GC再扫描B,这时C就不会被扫描到。
为了避免这个问题,go在GC扫描标记阶段会启用写屏障(Write Barrier)

启用了Write barrier之后,当B把C指针交给A时,GC会认为在这一轮扫描中C的指针是存活的,即使A 可能在稍后丢掉C,那么C在下一轮GC中再回收。

Write barrier只针对指针启用,而且只在GC的标记阶段启用,平时会直接把值写入到目标地址。

Q: 系统调用的过程?
  • reentersyscall

    • 标记抢占 并唤醒 sysmon监控任务

    • 将p和m解绑

    • 将p设置为 syscall

  • 进行系统调用

  • sysmon被唤醒后执行retake

    • retake检测到 syscall 状态的 p调用handoffp给p分配一个新的m

      • 如果本地还有未执行的任务,则分配一个m来执行这些任务
  • 系统调用完成后执行 exitsyscall()

    • 切换到g0将 系统调用完成的g 状态标记为可运行

    • 如果没有空闲的p,将 系统调用完成的g 放到全局队列,然后通过 notesleep 停掉当前 m , 最后执行schedule,当前m等待被唤醒(比如 startm )

    • 如果有空闲的p,则立刻将p跟当前的m绑定,并执行 系统调用完成的g

参考文章
  • Go 语言设计与实现-6.5 调度器
  • 详尽干货!从源码角度看 Golang 的调度(上)
  • 详尽干货!从源码角度看 Golang 的调度(下)
  • 5.2 goroutine的生老病死
  • 图解Go运行时调度器 | Tony Bai
  • golang 在 runtime 中的一些骚东西
  • Golang-Scheduler原理解析_惜暮-CSDN博客_go scheduler
  • Goroutine调度实例简要分析 - Go语言中文网 - Golang中文社区

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678401.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存