Go 语言入门:并发编程1(Goroutine与Channel的入门)

Go 语言入门:并发编程1(Goroutine与Channel的入门),第1张

Goroutine
  • goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。
  • Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。
使用goroutine

Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。

  • 一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。

启动单个goroutine

  • 启动goroutine的方式非常简单,只需要在调用的函数(普通函数和匿名函数)前面加上一个go关键字。

    func hello() {
        fmt.Println("Hello Goroutine!")
    }
    func main() {
        hello()
        fmt.Println("main goroutine done!")
    }
    
    • 执行的结果是打印完 Hello Goroutine!后打印main goroutine done!
  • 接下来我们在调用hello函数前面加上关键字go,也就是启动一个goroutine去执行hello这个函数。

    func main() {
        go hello() // 启动另外一个goroutine去执行hello函数
        fmt.Println("main goroutine done!")
    }
    
    • 这一次的执行结果只打印了main goroutine done!,并没有打印Hello Goroutine!
    • 在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。当main() 函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束
  • 使用time.Sleep 对 子进程 grouptine 进行等待

    func main() {
        go hello() // 启动另外一个goroutine去执行hello函数
        fmt.Println("main goroutine done!")
        time.Sleep(time.Second)
    }
    
    • 执行上面的代码你会发现,这一次先打印main goroutine done!,然后紧接着打印Hello Goroutine!。

启动多个 goroutine

  • 这里使用了sync.WaitGroup来实现goroutine的同步, 有点像java里面的Countdown

    var wg sync.WaitGroup
    
    func hello(i int) {
        defer wg.Done() // goroutine结束就登记-1
        fmt.Println("Hello Goroutine!", i)
    }
    func main() {
    
        for i := 0; i < 10; i++ {
            wg.Add(1) // 启动一个goroutine就登记+1
            go hello(i)
        }
        wg.Wait() // 等待所有登记的goroutine都结束
    }
    
    • 多次执行上面的代码,会发现每次打印的数字的顺序都不一致。这是因为10个goroutine是并发执行的,而goroutine的调度是随机的。
worker pool(goroutine 池)
  • 本质上是生产者消费者模型
  • 可以有效控制goroutine数量,防止暴涨

例子:

  • 计算一个数字的各个位数之和,例如数字123,结果为1+2+3=6
  • 随机生成数字进行计算
    控制台输出结果如下:
    package main
    
    import (
    	"fmt"
    	"math/rand"
    )
    
    type Job struct {
    	// id
    	Id int
    	// 需要计算的随机数
    	RandNum int
    }
    
    type Result struct {
    	// 这里必须传对象实例
    	job *Job
    	// 求和
    	sum int
    }
    
    func main() {
    	// 需要2个管道
    	// 1.job管道
    	jobChan := make(chan *Job, 128)
    	// 2.结果管道
    	resultChan := make(chan *Result, 128)
    	// 3.创建工作池
    	createPool(64, jobChan, resultChan)
    	// 4.开个打印的协程
    	go func(resultChan chan *Result) {
    		// 遍历结果管道打印
    		for result := range resultChan {
    			fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,
    				result.job.RandNum, result.sum)
    		}
    	}(resultChan)
    	var id int
    	// 循环创建job,输入到管道
    	for {
    		id++
    		// 生成随机数
    		r_num := rand.Int()
    		job := &Job{
    			Id:      id,
    			RandNum: r_num,
    		}
    		jobChan <- job
    	}
    }
    
    // 创建工作池
    // 参数1:开几个协程
    func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
    	// 根据开协程个数,去跑运行
    	//多个线程一起跑,计算jobChan中的任务
    	for i := 0; i < num; i++ {
    		go func(jobChan chan *Job, resultChan chan *Result) {
    			// 执行运算
    			// 遍历job管道所有数据,进行相加
    			for job := range jobChan {
    				// 随机数接过来
    				r_num := job.RandNum
    				// 随机数每一位相加
    				// 定义返回值
    				var sum int
    				for r_num != 0 {
    					tmp := r_num % 10
    					sum += tmp
    					r_num /= 10
    				}
    				// 想要的结果是Result
    				r := &Result{
    					job: job,
    					sum: sum,
    				}
    				//运算结果扔到管道
    				resultChan <- r
    			}
    		}(jobChan, resultChan)
    	}
    }
    
定时器
  • Timer:时间到了,执行只执行1次

    package main
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	// 1.timer基本使用
    	timer1 := time.NewTimer(2 * time.Second)
    	t1 := time.Now()
    	fmt.Printf("t1:%v\n", t1)
    	t2 := <-timer1.C
    	fmt.Printf("t2:%v\n", t2)
    
    	// 2.验证timer只能响应1次
    	//timer2 := time.NewTimer(time.Second)
    	//for {
    	//<-timer2.C
    	//fmt.Println("时间到")
    	//}
    
    	// 3.timer实现延时的功能
    	//(1)
    	//time.Sleep(time.Second)
    	//(2)
    	timer3 := time.NewTimer(2 * time.Second)
    	<-timer3.C
    	fmt.Println("2秒到")
    	//(3)
    	<-time.After(2*time.Second)
    	fmt.Println("2秒到")
    
    	// 4.停止定时器
    	timer4 := time.NewTimer(2 * time.Second)
    	go func() {
    	<-timer4.C
    	fmt.Println("定时器执行了")
    	}()
    	b := timer4.Stop()
    	if b {
    	fmt.Println("timer4已经关闭")
    	}
    
    	// 5.重置定时器
    	timer5 := time.NewTimer(3 * time.Second)
    	timer5.Reset(1 * time.Second)
    	fmt.Println(time.Now())
    	fmt.Println(<-timer5.C)
    
    	for {
    	}
    }
    
  • Ticker:时间到了,多次执行

    package main
    import (
        "fmt"
        "time"
    )	
    func main() {
        // 1.获取ticker对象
        ticker := time.NewTicker(1 * time.Second)
        i := 0
        // 子协程
        go func() {
            for {
                //<-ticker.C
                i++
                fmt.Println(<-ticker.C)
                if i == 5 {
                    //停止
                    ticker.Stop()
                }
            }
        }()
        for {
        }
    }
    
runtime包 runtime.Gosched(): 让出时间片
package main

import (
    "fmt"
    "runtime"
)

func main() {
    go func(s string) {
        for i := 0; i < 2; i++ {
            fmt.Println(s)
        }
    }("world")
    // 主协程
    for i := 0; i < 2; i++ {
        // 切一下,再次分配任务
        runtime.Gosched()
        fmt.Println("hello")
    }
}
runtime.Goexit():退出当前协程序(直接退出,不会恢复)
package main

import (
	"fmt"
	"runtime"
)

func main() {
	go func() {
		defer fmt.Println("A.defer")
		func() {
			defer fmt.Println("B.defer")
			// 结束协程
			fmt.Println("f.ok")
			runtime.Goexit()
			defer fmt.Println("C.defer")
			fmt.Println("B")
		}()
		fmt.Println("A")
	}()
	for {
	}
}

结果如下:

f.ok
B.defer
A.defer
runtime.GOMAXPROCS: 确定os线程数
  • Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。
  • 默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。
  • Go语言中可以通过 runtime.GOMAXPROCS() 函数设置当前程序并发时占用的CPU逻辑核心数。

我们可以通过将任务分配到不同的 CPU 逻辑核心上实现并行的效果,这里举个例子:

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(1)
    go a()
    go b()
    time.Sleep(time.Second)
}

两个任务只有一个逻辑核心,此时是做完一个任务再做另一个任务。 将逻辑核心数设为2,此时两个任务并行执行,代码如下。

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(2)
    go a()
    go b()
    time.Sleep(time.Second)
}

Go语言中的 *** 作系统线程和goroutine的关系:

  • 1.一个 *** 作系统线程对应用户态多个goroutine。
  • 2.go程序可以同时使用多个 *** 作系统线程。
  • 3.goroutine和OS线程是多对多的关系,即m:n。
Channel *** 作

虽然 groutine 之间可以使用共享内存进行数据交换,但是共享内存在不同的goroutine 中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

  • Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

channel类型
  • channel 是一种类型,一种引用类型。声明通道类型的格式如下:

    var 变量 chan 元素类型
    
    var ch1 chan int   // 声明一个传递整型的通道
    var ch2 chan bool  // 声明一个传递布尔型的通道
    var ch3 chan []int // 声明一个传递int切片的通道
    
创建channel
  • 声明通道: 通道是引用类型,通道类型的空值是nil

    var ch chan int
    fmt.Println(ch) // 
    

    声明的通道后需要使用make函数初始化之后才能使用。

  • 实例化通道: 创建channel的格式如下:

    make(chan 元素类型, [缓冲大小])
    

    channel的缓冲大小是可选的。

    ch4 := make(chan int)
    ch5 := make(chan bool)
    ch6 := make(chan []int)
    
channel *** 作

通道有发送(send)、接收(receive)和关闭(close)三种 *** 作。

  • 发送和接收都使用<-符号。

现在我们先使用以下语句定义一个通道:

ch := make(chan int)
//发送
// 将一个值发送到通道中。
ch <- 10 // 把10发送到ch中

//接收
//从一个通道中接收值。
x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果

//关闭
//我们通过调用内置的close函数来关闭通道。
close(ch)

关于关闭通道需要注意的事情是,

  • 只有在通知接收方 goroutine 所有的数据都发送完毕的时候才需要关闭通道。
  • 通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束 *** 作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  • 1.对一个关闭的通道再发送值就会导致panic。
  • 2.对一个关闭的通道进行接收会一直获取值直到通道为空。
  • 3.对一个关闭的并且没有值的通道执行接收 *** 作会得到对应类型的零值。
  • 4.关闭一个已经关闭的通道会导致panic。
无缓冲的通道: 同步通道

无缓冲的通道又称为阻塞的通道。我们来看一下下面的代码:

func main() {
    ch := make(chan int)
    ch <- 10
    fmt.Println("发送成功")
}

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        .../src/github.com/pprof/studygo/day06/channel02/main.go:8 +0x54
  • 因为我们使用 ch := make(chan int) 创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。

上面的代码会阻塞在ch <- 10这一行代码形成死锁,那如何解决这个问题呢?

  • 一种方法是启用一个goroutine去接收值,例如:

    func recv(c chan int) {
        ret := <-c
        fmt.Println("接收成功", ret)
    }
    func main() {
        ch := make(chan int)
        go recv(ch) // 启用goroutine从通道接收值
        ch <- 10
        fmt.Println("发送成功")
    }
    
  • 无缓冲通道上的发送 *** 作会阻塞,直到另一个goroutine在该通道上执行接收 *** 作,这时值才能发送成功,两个goroutine将继续执行

  • 相反,如果接收 *** 作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

有缓冲的通道

解决上面问题的方法还有一种就是使用有缓冲区的通道。我们可以在使用make函数初始化通道的时候为其指定通道的容量,例如:

func main() {
    ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
    ch <- 10
    fmt.Println("发送成功")
}
  • 只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。
  • 我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做。
close()

通过内置的close()函数关闭channel(如果你的管道不往里存值或者取值的时候一定记得关闭管道)

package main

import "fmt"

func main() {
    c := make(chan int)
    go func() {
        for i := 0; i < 5; i++ {
            c <- i
        }
        close(c)
    }()
    for {
        if data, ok := <-c; ok {
            fmt.Println(data)
        } else {
            break
        }
    }
    fmt.Println("main结束")
}

注意: 关闭已经关闭的channel也会引发panic。

如何优雅的从通道循环取值
  • 当通过通道发送有限的数据时,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待。
  • 当通道被关闭时,往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。那如何判断一个通道是否被关闭了呢?

我们来看下面这个例子:

// channel 练习
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    // 开启goroutine将0~100的数发送到ch1中
    go func() {
        for i := 0; i < 100; i++ {
            ch1 <- i
        }
        close(ch1)
    }()
    // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
    go func() {
        for {
            i, ok := <-ch1 // 通道关闭后再取值ok=false
            if !ok {
                break
            }
            ch2 <- i * i
        }
        close(ch2)
    }()
    // 在主goroutine中从ch2中接收值打印
    for i := range ch2 { // 通道关闭后会退出for range循环
        fmt.Println(i)
    }
}

从上面可以看到从判断通道是否关闭的方式有两种:

  • !ok
  • for range
单向通道

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。

  • 方法:直接在参数传入是时就指定 xxx chan<- xx (只准输入) 还是 yyy<-chan xx (只准输出)
    Go语言中提供了单向通道来处理这种情况。例如,我们把上面的例子改造如下:
func counter(out chan<- int) {
    for i := 0; i < 100; i++ {
        out <- i
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for i := range in {
        out <- i * i
    }
    close(out)
}
func printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

其中,

  • 1.chan<- int是一个只能发送的通道,可以发送但是不能接收;
  • 2.<-chan int是一个只能接收的通道,可以接收但是不能发送。

1.1.10. 通道总结
channel常见的异常总结,如下图:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/799578.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存