Channel通过通信的方式在goroutine之间共享内存,是支撑Go语言高性能并发编程模型的重要结构,本文将分析Channel相关的创建、发送、接收、关闭函数的源代码。
ps:源代码只给出了重要逻辑核心部分
Channel结构体信息Channel结构体为hchan
,源码如下:
type hchan struct {
qcount uint // 元素个数
dataqsiz uint // 循环队列的长度
buf unsafe.Pointer // 缓冲区数据指针
elemsize uint16 //元素类型大小
closed uint32 //是否关闭标志位
elemtype *_type // 元素类型
sendx uint // 发送数据的发送偏移位置(缓冲区中)
recvx uint // 接收数据的接收偏移位置(缓冲区中)
recvq waitq // 阻塞的发送Goroutine列表,链表形式,后来的协程在链表尾部
sendq waitq // 阻塞的接收Goroutine列表
lock mutex //锁
}
Channel创建
创建chan的函数为makechan
,判断是否创建缓冲区。
func makechan(t *chantype, size int) *hchan {
//一些元素检查部分,略过
elem := t.elem
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0: //无缓冲区
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0: //类型不是指针类型, 分配一块连续的内存空间
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: //默认分配hchan和缓冲区空间
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
//初始化信息
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
}
return c
}
chansend发送
发送函数为chansend,主体分为3个部分:
如果存在接收方,直接发送缓冲空间有剩余时,写入缓冲空间上述都不满足时,阻塞发送,等待其他Goroutine接收 *** 作func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//一些元素检查部分,略过
lock(&c.lock)
if c.closed != 0 { //如果发送时管道已经关闭,直接panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//第一个判断:如果接收队列中有协程在等待,那么直接发送数据
//dequeue()取链表头部元素,也就是最先陷入等待的Goroutine
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3) //send函数见下面分析
//
return true
}
//第二个判断:缓冲区如果有剩余,则将数据写入缓冲区(无缓冲区channel跳过)
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx) //根据发送标记位sendx计算出要存储的缓冲区对应地址
typedmemmove(c.elemtype, qp, ep) //发送数据写入qp地址
c.sendx++ //发送标记位后移一位
if c.sendx == c.dataqsiz { //循环队列基本 *** 作
c.sendx = 0
}
c.qcount++ //元素加1
unlock(&c.lock)
return true
}
//第三个判断:接收队列无Goroutine等待、无缓冲区或者缓冲区已满,
//则将当前协程加入发送队列sendq并挂起
gp := getg() //获取当前协程
mysg := acquireSudog() //获取一个runtime.sudog结构体
//设置sudog的一些信息
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp //绑定当前协程
mysg.isSelect = false
mysg.c = c //当前channel
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) //加入到sendq的发送阻塞队列中
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
//gopark函数令当前goroutine陷入沉睡等待唤醒
//唤醒后就是一些收尾工作
//此处可能有疑问,唤醒后的数据还没发送嘞?
//其实这个数据在recv接收端已经处理了,见后面chanrecv代码分析
KeepAlive(ep)
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg) //释放sudo结构体
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
send函数,负责向接收队列中最先陷入等待的Goroutine发送数据:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//.....
if sg.elem != nil { //sg.elem就是接收协程中接收数据的地址
sendDirect(c.elemtype, sg, ep) //调用memmove完成数据写入
sg.elem = nil
}
//对接收的sudog结构体信息处理
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
//1.将负责接收的Goroutine标记为可运行状态
//2.放在处理器的runnext上等待执行,下一次调度会立刻唤醒接收方Goroutine
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem //接收方的数据接收地址
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
下面通过几张图来理解
1. 直接发送流程图如果接收队列recvq中有协程在等待,那么取出recvq链表头部的接收Goroutine直接发送数据
如果接收队列recvq中没有协程在等待,但缓冲区有剩余,则将数据写入缓冲区:
接收队列无Goroutine等待、无缓冲区或者缓冲区已满,则将当前协程加入发送队列sendq并挂起:
同样的三个判断流程:
sendq中是否有待发送协程,如果有就直接接收。如果缓冲区有数据,则从缓冲区接收。发送队列无Goroutine、无缓冲区或者缓冲区无数据,则将当前协程加入等待接收队列recvq并挂起。func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
//......
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 { //channel已经关闭,直接返回
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
//1. 如果sendq中有阻塞的发送Goroutine,直接接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//2. 如果缓冲区有数据,从缓冲区接收
if c.qcount > 0 {
qp := chanbuf(c, c.recvx) //recvx计算出缓冲区中当前的待接收位置
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil { //ep是当前协程接收数据的地址
typedmemmove(c.elemtype, ep, qp) //把qp数据拷贝到ep地址处
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz { //循环队列
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
//3. 发送队列无Goroutine、无缓冲区或者缓冲区无数据:
//则将当前协程加入等待接收队列recvq并挂起
gp := getg() //获取当前协程
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep //绑定接收数据的地址
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp //绑定当前协程
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) //加入等待接收队列
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
//gopark函数令当前goroutine陷入沉睡等待唤醒
// 唤醒后就是收尾工作,和发送一样唤醒后没有接收数据 *** 作
//因为在发送函数chansend中如果检测到了此接收协程,
//就用sendDirect函数直接进行了地址间的数据拷贝,从而完成了数据接收
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
如果sendq中有阻塞的发送Goroutine,则调用recv函数,源代码如下:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { //无缓冲区情况
if ep != nil {
recvDirect(c.elemtype, sg, ep) //此函数进行数据拷贝,sg绑定了发送信息
}
} else { //注意,带缓冲区的channel如果存在发送阻塞Goroutine,则说明缓冲区一定满了
qp := chanbuf(c, c.recvx) //找到缓冲区待接收数据位置
if ep != nil {
typedmemmove(c.elemtype, ep, qp) //将缓冲区数据拷贝到接收地址ep中
}
// 注意缓冲区已满,同时又成功读取了一次数据,
// 说明此时空出了一块可以发送数据的区域,将阻塞的Goroutine数据写入并用goready函数调度
typedmemmove(c.elemtype, qp, sg.elem) //发送方的sg.elem数据拷贝到qp位置
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx //更新发送位置,此时缓冲区还是满的状态
}
//一些收尾工作
sg.elem = nil
gp := sg.g //获取阻塞发送的Goroutine
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
//1.将阻塞发送的Goroutine标记为可运行状态
//2.放在处理器的runnext上等待执行,下一次调度会立刻唤醒发送方Goroutine
}
recvDirect
调用memove
进行数据拷贝
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem //发送方数据
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
1. 直接接收流程图(无缓冲区)
如果sendq存在发送阻塞的Goroutine,直接接收数据,并将发送阻塞的Goroutine放在处理器的runnext上等待下一次调度唤醒。
带缓冲区且sendq存在发送阻塞的Goroutine时,缓冲区一定已经满了,首先从缓冲区对应的recvx区域获得数据,那么此区域就成为了待使用的空闲区域,接着将阻塞的Goroutine发送数据写入该区域,并将此Goroutine放在处理器的runnext上等待下一次调度唤醒。
带缓冲区且sendq不存在发送阻塞的Goroutine时,直接从缓冲区读取数据。
发送队列无阻塞的Goroutine、无缓冲区或者缓冲区无数据,则将当前协程加入等待接收队列recvq并挂起。
关闭了channel管道,自然有必要通知所有阻塞的发送和接收协程,源码如下:
func closechan(c *hchan) {
//一些检查部分
c.closed = 1 //标志位置1,代表关闭状态
var glist gList //协程队列,集合所有需要被唤醒的阻塞协程
// 将所有阻塞的接收Goroutine加入glist队列
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 将所有阻塞的发送Goroutine加入glist队列
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3) //依次把所有阻塞协程准备唤醒
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)