Golang 基础:原生并发 goroutine channel 和 select 常见使用场景

本文为极客时间 Go 语言第一课 相关章节学习笔记及思考。

goroutinegoroutine 调度原理 channelchannel 的不同类型作为参数的单向类型关闭 channellen(channel)nil channel 无缓冲 channel 的常见用途 🔥多 goroutine 通信:信号多 goroutine 同步:通过阻塞,替代锁 带缓冲 channel 的常见用途 🔥消息队列计数信号量 selectchannel 与 select 结合的常见用途 🔥利用 default 分支避免阻塞实现超时心跳机制 总结

Go 语言之父 Rob Pike 的经典名言:“不要通过共享内存来通信,应该通过通信来共享内存(Don’t communicate by sharing memory, share memory by communicating)

C/C++ 线程的复杂性:

又或者是否需要在新线程中设置取消点(cancel point)来保证被主线程取消(cancel)的时候能顺利退出

goroutine 是由 Go 运行时(runtime)负责调度的、轻量的用户级线程。


占用内存小,goroutine 初始栈只有 2k,比 Linux 线程小多了用户态调度,不需要内核介入,代价更小一退出就会被回收提供 channel 通信

无论是 Go 自身运行时代码还是用户层 Go 代码,都无一例外地运行在 goroutine 中。


调用函数、方法(具名、匿名、闭包都可以)时,前面加上 go 关键字,就会创建一个 goroutine。

goroutine 调度原理

Goroutine 调度器的任务:将 Goroutine 按照一定算法放到不同的 *** 作系统线程中去执行。


G-M 模型(废弃):将 G(Goroutine) 调度到 M(Machine) 上运行G-P-M 模型(使用中):增加中间层 P(Processor),提供队列管理多个 G,然后在合适的时候绑定 M。先后使用协作式、抢占式调度。NUMA 调度模型(尚未实现)


G:存储 Goroutine 的执行信息,包括:栈、状态P:逻辑处理器,有一个待调度的 G 队列M:真正的计算资源,Go 代码运行的真实载体(用户态线程),要执行 G 需要绑定 P,绑定后会从 P 的本地队列和全局队列获取 G 然后执行

type g struct {
    stack      stack   // offset known to runtime/cgo
    sched      gobuf
    goid       int64
    gopc       uintptr // pc of go statement that created this goroutine
    startpc    uintptr // pc of goroutine function
    ... ...

type p struct {
    lock mutex

    id          int32
    status      uint32 // one of pidle/prunning/...
    mcache      *mcache
    racectx     uintptr

    // Queue of runnable goroutines. Accessed without lock.
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr

    runnext guintptr

    // Available G's (status == Gdead)
    gfree    *g
    gfreecnt int32

    ... ...

type m struct {
    g0            *g     // goroutine with scheduling stack
    mstartfn      func()
    curg          *g     // current running goroutine
    ... ...

从 Go 1.2 以后,Go 调度器使用 G-P-M 模型,调度目标:公平地将 G 调度到 P 上运行。


常规执行,G 运行超出时间片后抢占调度G 阻塞在 channel 或者 I/O 上时,会被放置到等待队列,M 会尝试运行 P 的下一个可运行 G;当 G 可运行时,会被唤醒并修改状态,然后放到某个 P 的队列中,等待被绑定 M、执行G 阻塞在 syscall 上时,执行 G 的 M 也会受影响,会解绑 P、进入挂起状态;syscall 返回后,G 会尝试获取可用的 P,没获取到的话,修改状态,等待被运行

如果一个 G 任务运行 10ms,sysmon 就会认为它的运行时间太久而发出抢占式调度的请求。
一旦 G 的抢占标志位被设为 true,那么等到这个 G 下一次调用函数或方法时,运行时就可以将 G 抢占并移出运行状态,放入队列中,等待下一次被调度。

// $GOROOT/src/runtime/proc.go

// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

和线程一样,一个应用内部启动的所有 goroutine 共享进程空间的资源,如果多个 goroutine 访问同一块内存数据,将会存在竞争。

Go 提供了 channel 作为 goroutine 之间的通信方式,goroutine 可以从 channel 读取数据,处理后再把数据写到 channel 中。

channel 是和 切片、map 类似的复合类型,使用时需要指定具体的类型:

c := make(chan int) //c 是一个 int 类型的 channel

和函数一样,channel 也是“第一公民” 身份,可以做变量、参数、返回值等。

func spawn(f func() error) <-chan error {
    c := make(chan error)

    go func() {
        c <- f()

    return c

func main() {
    c := spawn(func() error {
        time.Sleep(2 * time.Second)
        return errors.New("timeout")

main goroutine 与子 goroutine 之间建立了一个元素类型为 error 的 channel,子 goroutine 退出时,会将它执行的函数的错误返回值写入这个 channel,main goroutine 可以通过读取 channel 的值来获取子 goroutine 的退出状态。

channel 的不同类型

通过 make 可以创建 2 种类型的 channel:


下面是无 buffer channel 的测试例子:

func testNoBufferChannel() {
    var c chan int = make(chan int) //无缓冲,同步进行,没有对接人,就会阻塞住
    //var c chan int = make(chan int, 5)    //有缓冲,容量为 5

    //大多数时候,读写要在不同 goroutine,尤其是无缓冲 channel
    go func() {
        fmt.Println("goroutine run")
        b := <-c //读取 channel
        fmt.Println("read from channel: ", b)

    fmt.Println("main goroutine before write")
    c <- 1  //没有 buffer,写入 channel 时会阻塞,直到有读取
    fmt.Println("main goroutine finish")


main goroutine before write
goroutine run
read from channel:  1
main goroutine finish

和预期一致,主 goroutine 在写入无 buffer 的 channel 时会阻塞,直到 子 goroutine 读取。

下面是有 buffer channel 的测试例子:

func testBufferChannel() {
    c := make(chan int, 1)  //有缓冲,容量为 5

    //大多数时候,读写要在不同 goroutine,尤其是无缓冲 channel
    go func() {
        fmt.Println("child_goroutine run")
        b := <-c //读取 channel,有数据时不会阻塞
        fmt.Println("child_goroutine read from channel: ", b)

    fmt.Println("main goroutine before write first")
    c <- 1  //有 buffer,写入 channel 时不会阻塞,除非满了
    fmt.Println("main_goroutine first write finish, len:", len(c))

    fmt.Println("main_goroutine write second:")
    c <-2
    fmt.Println("main_goroutine finish, len:", len(c))

    time.Sleep( 3 * time.Second)    //不加这个子 goroutine 没执行就退出了


main goroutine before write first
main_goroutine first write finish, len: 1
main_goroutine write second:
child_goroutine run
child_goroutine read from channel:  1
main_goroutine finish, len: 1


第一次写完立刻就返回;第二次写时,因为这个 goroutine 已经满了,所以阻塞在写上子 goroutine 读取了一次,主 goroutine 才从写上返回 作为参数的单向类型 只发送, chan<-只接收, <-chan
func testSingleDirectionChannel() {

    f := func(a chan<- int, b <- chan int) {    //a 是只能写入,b 是只能读取
        x := <- a   //编译报错:Invalid operation: <- a (receive from send-only type chan<- int)
        b <- 2      //编译报错:nvalid operation: b <- 2 (send to receive-only type <-chan int)

通常只发送 channel 类型和只接收 channel 类型,会被用作函数的参数类型或返回值,用于限制对 channel 内的 *** 作,或者是明确可对 channel 进行的 *** 作的类型


关闭 channel

close(channel) 后,不同语句的结果:

func testCloseChannel() {
    a := make(chan int)
    close(a)    //先关闭,然后看下几种读取关闭 channel 的结果
    b := <- a
    fmt.Println("关闭后直接读取:", b)  //0
    c, ok := <-a
    fmt.Println("关闭后通过逗号 ok 读取:", c, ok)    //0 false

    for v := range a{   //关闭的话直接跳过
        fmt.Println("关闭后通过 for-range 读取", v)

通过“comma, ok” 我们可以知道 channel 是否被关闭。

一般由发送端负责关闭 channel,原因:

向一个关闭的 channel 中发送数据,会 panic (⚠️注意了!!!)发送端没有办法判断 channel 是否已经关闭。 len(channel)

当 ch 为无缓冲 channel 时,len(ch) 总是返回 0;当 ch 为带缓冲 channel 时,len(ch) 返回当前 channel ch 中尚未被读取的元素个数。

如果只是想知道 channel 中是否有数据、不想阻塞,可以使用 len(channel) 先做检查:

nil channel

默认读取一个关闭的 channel,会返回零值。但是读取一个 nil channel, *** 作将阻塞。

所以在有些场景下,可能需要手动修改 channel 为 nil,以实现阻塞的效果,比如在 select 语句中。

无缓冲 channel 的常见用途 🔥

Go 语言倡导:

Do not communicate by sharing memory; instead, share memory by communicating.

多 goroutine 通信:信号

基于无 buffer channel,可以实现一对一和一对多的信号传递。


type signal struct{}

//接收一个函数,在子 routine 里执行,然后返回一个 channel,用于主 routine 等待
func spawn(f func()) <-chan signal {
    c := make(chan signal)
    go func() {
        fmt.Println("exec f in child_routine");
        fmt.Println("f exec finished, write to channel")
        c<- signal{}
    return c

//测试使用无 buffer channel 实现信号
func testUseNonBufferChannelImplSignal() {
    //模拟主 routine 等待子 routine

    worker := func() {
        fmt.Println("do some work")
        time.Sleep(3 * time.Second)

    fmt.Println("start a worker...")
    c := spawn(worker)

    fmt.Println("spawn finished, read channel...")
    <-c //读取,阻塞等待

    fmt.Println("worker finished")

上面的代码中,主 routine 创建了一个函数,然后在子 routine 中执行,主 routine 阻塞在一个 channel 上,等待子 routine 完成后继续执行。


start a worker...
spawn finished, read channel...
exec f in child_routine
do some work
f exec finished, write to channel
worker finished

可以看到,这样的确实现了类似“信号”的机制:在一个 routine 中通知另一个 routine。
如果 channel 的类型复杂些,就可以传递任意数据了!

struct{} 大小是0,不占内存


关闭一个无 buffer channel 会让所有阻塞在这个 channel 上的 read *** 作返回,基于此我们可以实现 1 对 n 的“广播”机制。

var waitGroup sync.WaitGroup

func spawnGroup(f func(ind int), count int, groupSignal chan struct{}) <-chan signal {
	c := make(chan signal)	//用于让主 routine 阻塞的 channel
	waitGroup.Add(count)	//等待总数

	//创建 n 个 goroutine
	for i := 0; i < count; i++ {
		go func(index int) {
			<- groupSignal	//读取阻塞,等待通知执行

			//fmt.Println("exec f in child_routine, index: ", i);
			//⚠️注意上面注释的代码,这里不能直接访问 for 循环的 i,因为这个是复用的,会导致访问的值不是目标值

			fmt.Println("exec f in child_routine, index: ", index);
			fmt.Println(index , " exec finished, write to channel")

		}(i + 1)

	//创建通知主 routine 结束的 routine,不能阻塞当前函数
	go func() {
		//需要同步等待所有子 routine 执行完
		c <- signal{}	//写入数据
	return c

func testUseNonBufferChannelImplGroupSignal() {
	worker := func(i int) {
		fmt.Println("do some work, index ", i)
		time.Sleep(3 * time.Second)

	groupSignal := make(chan struct{})
	c := spawnGroup(worker, 5, groupSignal)

	fmt.Println("main routine: close channel")
	close(groupSignal)	//通知刚创建的所有 routine

	fmt.Println("main routine: read channel...")
	<- c	//阻塞在这里

	fmt.Println("main routine: all worker finished")


创建 channelA,传递给多个 goroutine子 routine 中读取等待这个 channelA主 routine 关闭 channel,然后阻塞在 channelB 上,此时所有子 routine 开始执行所有子 routine 执行完后,通过 channelB 唤醒主 routine


main routine: close channel
main routine: read channel
exec f in child_routine, index:  2
do some work, index  2
exec f in child_routine, index:  1
do some work, index  1
exec f in child_routine, index:  3
do some work, index  3
exec f in child_routine, index:  4
do some work, index  4
exec f in child_routine, index:  5
do some work, index  5
4  exec finished, write to channel
5  exec finished, write to channel
3  exec finished, write to channel
1  exec finished, write to channel
2  exec finished, write to channel
main routine: all worker finished

用 2 个 channel 实现了 【主 routine 通知所有子 routine 开始】 和【子 routine 通知主 routine 任务结束】。

多 goroutine 同步:通过阻塞,替代锁
type NewCounter struct {
	c chan int
	i int

func CreateNewCounter() *NewCounter {
	counter := &NewCounter{
		c: make(chan int),
		i: 0,

	go func() {
		for {
			counter.i ++
			counter.c <- counter.i		//每次加一,阻塞在这里

	return counter

func (c *NewCounter)Increase() int {
	return <- c.c		//读取到的值,是上一次加一

//多协程并发增加计数,通过 channel 写入阻塞,读取时加一
func testCounterWithChannel() {
	fmt.Println("\ntestCounterWithChannel ->>>")

	group := sync.WaitGroup{}
	counter := CreateNewCounter()

	for i:=0 ; i<10 ; i++ {

		go func(i int) {
			count := counter.Increase()
			fmt.Printf("Goroutine-%d, count %d \n", i, count)



上面的代码中,我们创建了一个单独的协程,在其中循环增加计数,但每次加一后,就会尝试写入 channel(无 buffer 的),在没有人读取时,会阻塞在这个方法上。

然后在 10 个协程里并发读取 channel,从而实现每次读取递增。

带缓冲 channel 的常见用途 🔥 消息队列

channel 的特性符合对消息队列的要求:

跨 goroutine 访问安全FIFO可设置容量异步收发

Go 支持 channel 的初衷是将它作为 Goroutine 间的通信手段,它并不是专门用于消息队列场景的。
如果你的项目需要专业消息队列的功能特性,比如支持优先级、支持权重、支持离线持久化等,那么 channel 就不合适了,可以使用第三方的专业的消息队列实现。


由于带 buffer channel 的特性(容量满时写入会阻塞),可以用它的容量表示同时最大并发数量。


var active = make(chan struct{}, 3)	//"信号量",最多 3 个
var jobs = make(chan int, 10)

//使用带缓存的 channel,容量就是信号量的大小
func testSemaphoreWithBufferChannel() {

	go func() {
		for i:= 0; i < 9; i++ {
			jobs <- i + 1

	var wg sync.WaitGroup

	for j := range jobs {

		go func(i int) {
			active <- struct{}{}

			//fmt.Println("exec job ", i)
			log.Printf("exec job: %d, length of active: %d \n", i, len(active))
			time.Sleep(2 * time.Second)

			<- active



上面的代码中,我们用 channel jobs 表示要执行的任务(这里为 8 个),然后用 channel active 表示信号量(最多三个)。

然后在 8 个 goroutine 里执行任务,每个任务耗时 2s。在每次执行任务前,先写入 channel 表示获取信号量;执行完后读取,表示释放信号量。

由于信号量最多三个,所以同一时刻最多能有 3 个任务得以执行。


2022/04/20 19:14:26 exec job: 1, length of active: 1 
2022/04/20 19:14:26 exec job: 9, length of active: 2 
2022/04/20 19:14:26 exec job: 5, length of active: 3 
2022/04/20 19:14:28 exec job: 6, length of active: 3 
2022/04/20 19:14:28 exec job: 7, length of active: 3 
2022/04/20 19:14:28 exec job: 8, length of active: 3 
2022/04/20 19:14:30 exec job: 3, length of active: 3 
2022/04/20 19:14:30 exec job: 2, length of active: 3 
2022/04/20 19:14:30 exec job: 4, length of active: 3 

当需要在一个 goroutine 同时读/写多个 channel 时,可以使用 select:

类似 Linux 的 I/O 多路复用思路,我们可以叫它:goroutine 多路复用。

func testSelect() {
    channelA := make(chan int)
    channelB := make(chan int)

    go func() {
        var readA bool
        var readB bool

        for {
            select {
            case x := <- channelA:
                fmt.Println("child_routine: read from channelA:", x)
                readA = true
            case y := <- channelB:
                fmt.Println("child_routine:  read from channelB:", y)
                readB = true
            //  //其他 case 阻塞,就执行 default
            //  fmt.Println("default")

            if readA && readB {

                fmt.Println("child_goroutine finish")
            } else {
                fmt.Println("child_goroutine still loop, ", readA, readB)

    fmt.Println("main goroutine")

    time.Sleep(2 * time.Second)

    fmt.Println("main goroutine, write to channelA")
    channelA <- 111
    fmt.Println("main goroutine, write to channelA finish")

    time.Sleep(1 * time.Second)

    fmt.Println("main goroutine, write to channelB")
    channelB <- 111
    fmt.Println("main goroutine, write to channelB finish")

    time.Sleep( 5 * time.Second)
    fmt.Println("main goroutinefinish")


main goroutine
main goroutine, write to channelA
main goroutine, write to channelA finish
child_routine: read from channelA: 111
child_goroutine still loop,  true false
main goroutine, write to channelB
main goroutine, write to channelB finish
child_routine:  read from channelB: 111
child_goroutine finish
main goroutinefinish


使用 select 在一个 goroutine 里读取了 2 个 channel这 2 个 case 里的 channel 都不可读时,select 阻塞,只会执行 default,不会执行 select 代码块以外的主 goroutine 写入数据后,select 的其中一个 case 返回,然后继续执行 select 后面的逻辑下一轮循环后 2 个 case 都不可读,继续阻塞然后主 goroutine 写入后,另外一个 case 也返回,循环结束 channel 与 select 结合的常见用途 🔥 利用 default 分支避免阻塞

select 的 default 分支语义:当所有 case 语句里读/写 channel 阻塞时,会执行 default!

无论 channel 是否有 buffer。

有些时候,我们可能不希望阻塞在写入 channel 上,那可以利用 select default 的特性,这样封装一个函数,当写入阻塞时,返回一个 false,让外界可以处理阻塞的情况:

func tryWriteChannel(c chan<- int, value int) bool {
	select {
	case c <- value
		return true
	default:	//其他没就绪时,会执行
		return false


			//active <- 1		//之前直接写 channel,如果满了,就会阻塞
			writed := tryWriteChannel(active, 1)	//改成这样,可以在阻塞时,处理相关逻辑
			if !writed {
				log.Println("failed to write channel")

假如我们想在一个 channel 的读/写 *** 作上加一个超时逻辑,可以通过这样实现:
在 select 代码块中,加一个 case,这个 case 会在超时后执行,这样会结束其他 case。


func tryGetSemaphore(c chan<- struct{}) bool {
	select {
	case c <- struct {}{}:
		return true
	case <- time.After(1 * time.Second):		//在写 channel 的基础上,额外加一个情况,超时情况
		//1s 后返回,可以在这里做超时处理
		return true

及时调用 timer 的 Stop 方法回收 Timer 资源。


循环执行一个额外的 case,这个 case 会定时返回。

func worker() {
  heartbeat := time.NewTicker(30 * time.Second)
  defer heartbeat.Stop()
  for {
    select {
    case <-c:
      // ... do some stuff
    case <- heartbeat.C:
      //... do heartbeat stuff

time.NewTicker 会创建一个定时执行的心跳,可以把这个 ticker channel 读取的 *** 作放到一个 case 里,这样 select 代码块就会定时执行一次。

ticker 也要及时 Stop。


本文介绍了 Golang 中通过 goroutine channel 和 select 实现并发 *** 作的一些典型场景。

可以看到,通过 goroutine 实现并发是如此的简单;通过 channel 无 buffer 和有 buffer,实现一些 goroutine 同步机制也比较方便;结合 select,实现 goroutine 的统一管理。


需要记住的是,Go 提倡通过 CSP 模型(communicating sequential processes 通信顺序进程)进行通信,而不是传统语言的共享内存方式。

CSP:两个独立的并发实体通过共享 channel(管道)进行通信的并发模型。

我们在遇到多 goroutine 通信、同步的情况,可以尽量多使本文的内容进行处理。

不过对于某些情况,也可以使用 go 提供的 sync 包下的内容,进行局部同步。下篇文章我们就来看看这些内容。

对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据时,我们可以使用更为高效的低级同步原语(如 mutex),保证 goroutine 对数据的同步访问。


