Go语言 Channel管道的源码分析及图解

Go语言 Channel管道的源码分析及图解,第1张

文章目录
    • Channel概述
    • Channel结构体信息
    • Channel创建
    • chansend发送
      • 1. 直接发送流程图
      • 2. 写入缓冲区流程图
      • 3. 发送阻塞挂起
    • chanrecv接收
      • 1. 直接接收流程图(无缓冲区)
      • 2. 直接接收流程图(带缓冲区)
      • 3. 从缓冲区接收
      • 4. 接收阻塞挂起
    • channel关闭

Channel概述

Channel通过通信的方式在goroutine之间共享内存,是支撑Go语言高性能并发编程模型的重要结构,本文将分析Channel相关的创建、发送、接收、关闭函数的源代码。


ps:源代码只给出了重要逻辑核心部分

Channel结构体信息

Channel结构体为hchan ,源码如下:

type hchan struct {
	qcount   uint           // 元素个数
	dataqsiz uint           // 循环队列的长度
	buf      unsafe.Pointer // 缓冲区数据指针
	elemsize uint16         //元素类型大小
	closed   uint32         //是否关闭标志位
	elemtype *_type 		// 元素类型
	sendx    uint   		// 发送数据的发送偏移位置(缓冲区中)
	recvx    uint   		// 接收数据的接收偏移位置(缓冲区中)
	recvq    waitq    		// 阻塞的发送Goroutine列表,链表形式,后来的协程在链表尾部
	sendq    waitq  		// 阻塞的接收Goroutine列表
	lock mutex				//锁
}
Channel创建

创建chan的函数为makechan,判断是否创建缓冲区。


func makechan(t *chantype, size int) *hchan {
  	//一些元素检查部分,略过
	elem := t.elem
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	var c *hchan
	switch {
	case mem == 0:      //无缓冲区
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()   
	case elem.ptrdata == 0:   //类型不是指针类型, 分配一块连续的内存空间
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)  
	default:    //默认分配hchan和缓冲区空间
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
	//初始化信息
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)
	}
	return c
}
chansend发送

发送函数为chansend,主体分为3个部分:

  1. 如果存在接收方,直接发送
  2. 缓冲空间有剩余时,写入缓冲空间
  3. 上述都不满足时,阻塞发送,等待其他Goroutine接收 *** 作
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	//一些元素检查部分,略过
	lock(&c.lock)
	if c.closed != 0 {    //如果发送时管道已经关闭,直接panic
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	
	//第一个判断:如果接收队列中有协程在等待,那么直接发送数据
	//dequeue()取链表头部元素,也就是最先陷入等待的Goroutine
	if sg := c.recvq.dequeue(); sg != nil {       		
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)   //send函数见下面分析
		//
		return true
	}
	
	//第二个判断:缓冲区如果有剩余,则将数据写入缓冲区(无缓冲区channel跳过)
	if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx) 			//根据发送标记位sendx计算出要存储的缓冲区对应地址
		typedmemmove(c.elemtype, qp, ep)    //发送数据写入qp地址
		c.sendx++    //发送标记位后移一位
		if c.sendx == c.dataqsiz {    //循环队列基本 *** 作
			c.sendx = 0
		}
		c.qcount++    //元素加1
		unlock(&c.lock)
		return true
	}
	
	//第三个判断:接收队列无Goroutine等待、无缓冲区或者缓冲区已满,
	//则将当前协程加入发送队列sendq并挂起
	gp := getg()  //获取当前协程
	mysg := acquireSudog()   //获取一个runtime.sudog结构体
	//设置sudog的一些信息
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep   
	mysg.waitlink = nil
	mysg.g = gp   //绑定当前协程
	mysg.isSelect = false
	mysg.c = c    //当前channel
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)  //加入到sendq的发送阻塞队列中
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)   
	//gopark函数令当前goroutine陷入沉睡等待唤醒
	
	//唤醒后就是一些收尾工作
	//此处可能有疑问,唤醒后的数据还没发送嘞?
	//其实这个数据在recv接收端已经处理了,见后面chanrecv代码分析
	KeepAlive(ep)
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)  //释放sudo结构体
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}

send函数,负责向接收队列中最先陷入等待的Goroutine发送数据:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	//.....
	if sg.elem != nil {           //sg.elem就是接收协程中接收数据的地址
		sendDirect(c.elemtype, sg, ep)   //调用memmove完成数据写入
		sg.elem = nil
	}
	//对接收的sudog结构体信息处理
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)  
	//1.将负责接收的Goroutine标记为可运行状态
	//2.放在处理器的runnext上等待执行,下一次调度会立刻唤醒接收方Goroutine
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	dst := sg.elem    //接收方的数据接收地址
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)  
}

下面通过几张图来理解

1. 直接发送流程图

如果接收队列recvq中有协程在等待,那么取出recvq链表头部的接收Goroutine直接发送数据

2. 写入缓冲区流程图

如果接收队列recvq中没有协程在等待,但缓冲区有剩余,则将数据写入缓冲区:

3. 发送阻塞挂起

接收队列无Goroutine等待、无缓冲区或者缓冲区已满,则将当前协程加入发送队列sendq并挂起:

chanrecv接收

同样的三个判断流程:

  1. sendq中是否有待发送协程,如果有就直接接收。


  2. 如果缓冲区有数据,则从缓冲区接收。


  3. 发送队列无Goroutine、无缓冲区或者缓冲区无数据,则将当前协程加入等待接收队列recvq并挂起。


func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	//......
	lock(&c.lock)
	if c.closed != 0 && c.qcount == 0 {   //channel已经关闭,直接返回
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

	//1. 如果sendq中有阻塞的发送Goroutine,直接接收
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	//2. 如果缓冲区有数据,从缓冲区接收
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)   //recvx计算出缓冲区中当前的待接收位置
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {       //ep是当前协程接收数据的地址
			typedmemmove(c.elemtype, ep, qp)      //把qp数据拷贝到ep地址处
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {     //循环队列
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
	
	//3. 发送队列无Goroutine、无缓冲区或者缓冲区无数据:
	//则将当前协程加入等待接收队列recvq并挂起
	gp := getg()     //获取当前协程
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep       //绑定接收数据的地址
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp       //绑定当前协程
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)    //加入等待接收队列
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
	//gopark函数令当前goroutine陷入沉睡等待唤醒
	
	// 唤醒后就是收尾工作,和发送一样唤醒后没有接收数据 *** 作
	//因为在发送函数chansend中如果检测到了此接收协程,
	//就用sendDirect函数直接进行了地址间的数据拷贝,从而完成了数据接收
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
}

如果sendq中有阻塞的发送Goroutine,则调用recv函数,源代码如下:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {   //无缓冲区情况
		if ep != nil {
			recvDirect(c.elemtype, sg, ep)  //此函数进行数据拷贝,sg绑定了发送信息
		}
	} else {   //注意,带缓冲区的channel如果存在发送阻塞Goroutine,则说明缓冲区一定满了
		qp := chanbuf(c, c.recvx)      //找到缓冲区待接收数据位置
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)   //将缓冲区数据拷贝到接收地址ep中
		}
		// 注意缓冲区已满,同时又成功读取了一次数据,
		// 说明此时空出了一块可以发送数据的区域,将阻塞的Goroutine数据写入并用goready函数调度
		typedmemmove(c.elemtype, qp, sg.elem)  //发送方的sg.elem数据拷贝到qp位置
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx   //更新发送位置,此时缓冲区还是满的状态
	}
	//一些收尾工作
	sg.elem = nil
	gp := sg.g    //获取阻塞发送的Goroutine
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)      
	//1.将阻塞发送的Goroutine标记为可运行状态
	//2.放在处理器的runnext上等待执行,下一次调度会立刻唤醒发送方Goroutine
}

recvDirect调用memove进行数据拷贝

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	src := sg.elem     //发送方数据
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}
1. 直接接收流程图(无缓冲区)

如果sendq存在发送阻塞的Goroutine,直接接收数据,并将发送阻塞的Goroutine放在处理器的runnext上等待下一次调度唤醒。



2. 直接接收流程图(带缓冲区)

带缓冲区且sendq存在发送阻塞的Goroutine时,缓冲区一定已经满了,首先从缓冲区对应的recvx区域获得数据,那么此区域就成为了待使用的空闲区域,接着将阻塞的Goroutine发送数据写入该区域,并将此Goroutine放在处理器的runnext上等待下一次调度唤醒。



3. 从缓冲区接收

带缓冲区且sendq不存在发送阻塞的Goroutine时,直接从缓冲区读取数据。



4. 接收阻塞挂起

发送队列无阻塞的Goroutine、无缓冲区或者缓冲区无数据,则将当前协程加入等待接收队列recvq并挂起。



channel关闭

关闭了channel管道,自然有必要通知所有阻塞的发送和接收协程,源码如下:

func closechan(c *hchan) {
	//一些检查部分
	c.closed = 1    //标志位置1,代表关闭状态
 
	var glist gList   //协程队列,集合所有需要被唤醒的阻塞协程

	// 将所有阻塞的接收Goroutine加入glist队列
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// 将所有阻塞的发送Goroutine加入glist队列
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)   //依次把所有阻塞协程准备唤醒
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/564974.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-06
下一篇 2022-04-06

发表评论

登录后才能评论

评论列表(0条)

保存