-
从空闲process列表中获取一个process
-
从空闲列表获取一个m,或者新建一个m
-
使用newosproc()方法创建一个内核线程,并把内核线程和m以及mstart方法关联,该线程执行时会调用mstart方法
-
由该m的g0开始执行schedule方法
-
g0的作用?
-
为runtime 下调度G的工作提供栈空间,每个m都有一个g0
-
-
获取G,有三种方式
-
每处理n个任务后就去全局g队列中获取G,同时会将全局队列中一定数量的G搬运到自己的本地队列
-
从本地队列中获取。为了增加公平性,获取都从队头获取,新加入都加到队尾
-
从netpoll中获取ready的G
-
尝试4次从其他P中窃取G,同时也会将其他P的一半G窃取到自身队列中,充分利用了CPU
-
-
调用execute执行该G,最终通过汇编方法gogo传入执行栈信息(栈指针、程序计数器等)执行G
- 执行完毕后让出,执行完毕后会调用在newproc时设置的goexit方法,goexit会切换到g0释放该G
- 主动让出,time.Sleep、channel阻塞、io阻塞等场景下会主动调用gopark方法,该方法会切换到g0,将当前G的状态从 running 切换为 waiting,然后开始下一轮循环
- 抢占让出,在进程启动时会启动一个监控任务,叫sysmon,该任务会每隔一段时间去查看每个P是否有执行时间过长的G(10ms),如果有,则会标记抢占,在栈扩张时(newstack)会检查是否有抢占标记,如果有,则将该G从 running 更改为 runnable(goschedImpl),放到全局队列中等待下一次被调度到,最后再次进入schedule()
- 系统调用让出 ,该内核线程无法调度运行其他goroutine,在执行SysCall时在汇编中加入了entersyscall和exitsyscall两个方法,在进入系统调用前,会保存好执行现场,同时更改状态为 syscall ,然后标记为可抢占(因为系统调用会阻塞底层内核线程);系统调用结束后,会切换到G0,先检查是否有空闲的P,如果没有的话就放到全局可执行队列中等待被执行并进入schedule(),如果有的话,将空闲的P跟当前M绑定,并立刻执行
-
在调用方的堆栈上执行newproc1
-
优先从当前P内free列表中复用,如果没有就新分配一个
-
将入参用memmove拷贝到新G的栈指针
-
将goexit方法设置到新G的pc寄存器,用于当执行G结束后找到退出方法,从而再次进入调度循环(goexit会切换为g0重新开始schedule())
-
-
将新创建的G放到
从g切换到g0,在g0的堆栈上执行 fn(g),比如
func goexit1() { ... mcall(goexit0)//当前执行的是g,这里的调用会切换到g0,并执行 goexit0(g) }goexit0分析
主要是将g各个字段重置,然后放回gfree列表便于复用,最后调用schedule()继续进入调度循环
// gp是需要被退出清理的g // 整个goexit0是由g0执行的(在g0的堆栈上执行的) func goexit0(gp *g) { _g_ := getg()//这里获取到的 _g_ 是g0 casgstatus(gp, _Grunning, _Gdead)//将gp设置为 dead if isSystemGoroutine(gp, false) { atomic.Xadd(&sched.ngsys, -1) } gp.m = nil //跟m解绑 locked := gp.lockedm != 0 gp.lockedm = 0 _g_.m.lockedg = 0 gp.preemptStop = false gp.paniconfault = false gp._defer = nil // should be true already but just in case. gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data. gp.writebuf = nil gp.waitreason = 0 gp.param = nil gp.labels = nil gp.timer = nil ... dropg()//将当前执行的g0从m的current的位置拿走 ... gfput(_g_.m.p.ptr(), gp)// 将gp放回 p 的 gfree 队列进行复用 ... schedule() }reentersyscall分析
func reentersyscall(pc, sp uintptr) { _g_ := getg() // Disable preemption because during this function g is in Gsyscall status, // but can have inconsistent g->sched, do not let GC observe it. _g_.m.locks++ // Entersyscall must not call any function that might split/grow the stack. // (See details in comment above.) // Catch calls that might, by replacing the stack guard with something that // will trip any stack check and leaving a flag to tell newstack to die. _g_.stackguard0 = stackPreempt _g_.throwsplit = true // Leave SP around for GC and traceback. save(pc, sp) _g_.syscallsp = sp _g_.syscallpc = pc casgstatus(_g_, _Grunning, _Gsyscall) if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp { systemstack(func() { print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]n") throw("entersyscall") }) } if trace.enabled { systemstack(traceGoSysCall) // systemstack itself clobbers g.sched.{pc,sp} and we might // need them later when the G is genuinely blocked in a // syscall save(pc, sp) } if atomic.Load(&sched.sysmonwait) != 0 { systemstack(entersyscall_sysmon) save(pc, sp) } if _g_.m.p.ptr().runSafePointFn != 0 { // runSafePointFn may stack split if run on this stack systemstack(runSafePointFn) save(pc, sp) } _g_.m.syscalltick = _g_.m.p.ptr().syscalltick _g_.sysblocktraced = true pp := _g_.m.p.ptr() pp.m = 0 //将 p 和 m 分离 _g_.m.oldp.set(pp) _g_.m.p = 0 //将 m 和 p 分离 atomic.Store(&pp.status, _Psyscall) //将 p 状态标记为 syscall if sched.gcwaiting != 0 { systemstack(entersyscall_gcwait) save(pc, sp) } _g_.m.locks-- } func entersyscall_sysmon() { lock(&sched.lock) if atomic.Load(&sched.sysmonwait) != 0 { //将全局的sched.sysmonwait置零,同时通知CPU的等待者 atomic.Store(&sched.sysmonwait, 0) notewakeup(&sched.sysmonnote) } unlock(&sched.lock) }retake函数
func retake(now int64) uint32 { n := 0 // Prevent allp slice changes. This lock will be completely // uncontended unless we're already stopping the world. // 全局p锁 lock(&allpLock) // We can't use a range loop over allp because we may // temporarily drop the allpLock. Hence, we need to re-fetch // allp each time around the loop. for i := 0; i < len(allp); i++ { _p_ := allp[i] // 1. p == nil 说明通过runtime.GOMAXPROCS 动态对P进行了调大,但当前还未初始化使用 直接跳过 if _p_ == nil { // This can happen if procresize has grown // allp but not yet created new Ps. continue } pd := &_p_.sysmontick s := _p_.status sysretake := false // 2. p 为 running 或 系统调用 if s == _Prunning || s == _Psyscall { // Preempt G if it's running for too long. t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { // G运行时间过长,立刻进行标记抢占 // 注意:因为 syscall 状态下 p 已经和 m 互相分离了,所以 preemptone 不会进行抢占,同时返回 false preemptone(_p_) // In case of syscall, preemptone() doesn't // work, because there is no M wired to P. sysretake = true } } // 3. 系统调用 if s == _Psyscall { // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). t := int64(_p_.syscalltick) if !sysretake && int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ //给p重新分配一个m handoffp(_p_) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) return uint32(n) } func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { // syscall 状态下, p 和 m 已经分离会在这里返回 return false } gp := mp.curg if gp == nil || gp == mp.g0 { return false } gp.preempt = true // Every call in a go routine checks for stack overflow by // comparing the current stack pointer to gp->stackguard0. // Setting gp->stackguard0 to StackPreempt folds // preemption into the normal stack overflow check. gp.stackguard0 = stackPreempt // Request an async preemption of this P. if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true preemptM(mp) } return true }handoffp
func handoffp(_p_ *p) { // handoffp must start an M in any situation where // findrunnable would return a G to run on _p_. // 如果还有未处理完的任务,则新开一个m处理 if !runqempty(_p_) || sched.runqsize != 0 { startm(_p_, false) return } // if it has GC work, start it straight away if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { startm(_p_, false) return } // no local work, check that there are no spinning/idle M's, // otherwise our help is not required if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic startm(_p_, true) return } lock(&sched.lock) if sched.gcwaiting != 0 { _p_.status = _Pgcstop sched.stopwait-- if sched.stopwait == 0 { notewakeup(&sched.stopnote) } unlock(&sched.lock) return } if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) { sched.safePointFn(_p_) sched.safePointWait-- if sched.safePointWait == 0 { notewakeup(&sched.safePointNote) } } if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } if when := nobarrierWakeTime(_p_); when != 0 { wakeNetPoller(when) } pidleput(_p_) unlock(&sched.lock) }notesleep和notewakeup
如果是要进行一次性的通知,可以使用 note。note 提供了 notesleep 和 notewakeup。不像传统的 UNIX 的 sleep/wakeup,note 是无竞争的(race-free),所以如果 notewakeup 已经发生了,那么 notesleep 将会立即返回。note 可以在使用后通过 noteclear 来重置,但是要注意 noteclear 和 notesleep、notewakeup 不能发生竞争。类似 mutex,阻塞在 note 上会阻塞整个 M。然而,note 提供了不同的方式来调用 sleep:notesleep 会阻止相关联的 G 和 P 被重新调度;notetsleepg 的表现却像一个阻塞的系统调用一样,允许 P 被重用去运行另一个 G。尽管如此,这仍然比直接阻塞一个 G 要低效,因为这需要消耗一个 M。
栈扩张go的协程设计是stackful coroutine,每一个goroutine都需要有自己的栈空间,
栈空间的内容再goroutine休眠时候需要保留的,等到重新调度时候恢复(这个时候整个调用树是完整的)。
这样就会引出一个问题,如果系统存在大量的goroutine,给每一个goroutine都预先分配一个足够的栈空间那么go就会使用过多的内存。
为了避免内存使用过多问题,go在一开始时候,会默认只为goroutine分配一个很小的栈空间,它的大小在1.92版本中是2k。
当函数发现栈空间不足时,会申请一块新的栈空间并把原来的栈复制过去。
g实例里面的g.stack、g.stackguard0两个变量来描述goroutine实例的栈。
写屏障go支持并行GC的,GC的扫描阶段和go代码可以同时运行。这样带来的问题是,GC扫描的过程中go代码的执行可能改变了对象依赖树。
比如:开始扫描时候发现根对象A和B,B拥有C的指针,GC先扫描A,然后B把C的指针交给A,GC再扫描B,这时C就不会被扫描到。
为了避免这个问题,go在GC扫描标记阶段会启用写屏障(Write Barrier)
启用了Write barrier之后,当B把C指针交给A时,GC会认为在这一轮扫描中C的指针是存活的,即使A 可能在稍后丢掉C,那么C在下一轮GC中再回收。
Write barrier只针对指针启用,而且只在GC的标记阶段启用,平时会直接把值写入到目标地址。
Q: 系统调用的过程?-
reentersyscall
-
标记抢占 并唤醒 sysmon监控任务
-
将p和m解绑
-
将p设置为 syscall
-
-
进行系统调用
-
sysmon被唤醒后执行retake
-
retake检测到 syscall 状态的 p调用handoffp给p分配一个新的m
- 如果本地还有未执行的任务,则分配一个m来执行这些任务
-
-
系统调用完成后执行 exitsyscall()
-
切换到g0将 系统调用完成的g 状态标记为可运行
-
如果没有空闲的p,将 系统调用完成的g 放到全局队列,然后通过 notesleep 停掉当前 m , 最后执行schedule,当前m等待被唤醒(比如 startm )
-
如果有空闲的p,则立刻将p跟当前的m绑定,并执行 系统调用完成的g
-
- Go 语言设计与实现-6.5 调度器
- 详尽干货!从源码角度看 Golang 的调度(上)
- 详尽干货!从源码角度看 Golang 的调度(下)
- 5.2 goroutine的生老病死
- 图解Go运行时调度器 | Tony Bai
- golang 在 runtime 中的一些骚东西
- Golang-Scheduler原理解析_惜暮-CSDN博客_go scheduler
- Goroutine调度实例简要分析 - Go语言中文网 - Golang中文社区
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)