Goroutine:
goroutine是Go并行设计的核心。goroutine说到底其实就是协程,它比线程更小,十几个goroutine可能体现在底层就是五六个线程,
Go语言内部帮你实现了这些goroutine之间的内存共享
。执行goroutine只需极少的栈内存(大概是4~5KB),当然会根据相应的数据伸缩。也正因为如此,可同时运行成千上万个并发任务。goroutine比thread更易用、更高效、更轻便。一般情况下,一个普通计算机跑几十个线程就有点负载过大了,但是同样的机器却可以轻松地让成百上千个goroutine进行资源竞争。
特点:
有独立的栈空间共享程序堆内存调度由用于控制协程是轻量级的线程Goroutine的创建:
只需在函数调⽤语句前添加 go 关键字
,就可创建并发执⾏单元。开发⼈员无需了解任何执⾏细节,调度器会自动将其安排到合适的系统线程上执行。在并发编程中,我们通常想将一个过程切分成几块,然后让每个goroutine各自负责一块工作,当一个程序启动时,主函数在一个单独的goroutine中运行,我们叫它main goroutine。新的goroutine会用go语句来创建。而go语言的并发设计,让我们很轻松就可以达成这一目的。
Goroutine格式:
go 函数名( 参数列表 )
演示:
主goroutine退出后,其它的工作goroutine也会自动退出:
func main() {
// 如果不加go执行顺序是:先执行test1再执行test2,是有顺序的,但是如果有go关键字就是同时在执行了
go goroutineTest01()
go goroutineTest02()
for {
}
}
func goroutineTest01() {
for i := 0; i < 10; i++ {
fmt.Println("goroutineTest01执行")
time.Sleep(1000 * time.Millisecond)
}
}
func goroutineTest02() {
for i := 0; i < 10; i++ {
fmt.Println("goroutineTest02执行")
time.Sleep(1000 * time.Millisecond)
}
}
runtime包:
runtime.Gosched()
用于让出CPU时间片,让出当前goroutine的执行权限,调度器安排其他等待的任务运行,并在下次再获得cpu时间轮片的时候,从该出让cpu的位置恢复执行。有点像跑接力赛,A跑了一会碰到代码runtime.Gosched() 就把接力棒交给B了,A歇着了,B继续跑。
func main() {
go func() {
for {
fmt.Println("我不让出时间片")
}
}()
for {
runtime.Gosched() // 让出当前时间片
fmt.Println("我让出时间片")
}
}
runtime.Goexit()
将立即终止当前 goroutine 执⾏,调度器确保所有已注册 defer延迟调用被执行。Goexit之前注册的defer会生效,之后不会。
func main() {
go func() { // Goexit直接退出func
fmt.Println("走我吗——1")
goexit()
fmt.Println("走我吗——2")
}()
for {
}
}
func goexit() {
//return
fmt.Println("走我吗——3")
runtime.Goexit() // 退出当前go程
defer fmt.Println("走我吗——4")
}
runtime.GOMAXPROCS()
用来设置可以并行计算的CPU核数的最大值,并返回上一次的核心数,如果是第一次调用就返回默认值。
func main() {
// func GOMAXPROCS(n int) int {} 参数是要设置的核心数,返回值是上一次设置的核心数
num := runtime.GOMAXPROCS(1)
fmt.Println("上一次设置核心数为:", num)
for {
// 0和1会一直交替打印,如果用GOMAXPROCS限制1个核心,那么谁抢到谁就一直跑
go fmt.Println(0)
fmt.Println(1)
}
}
channel:
channel可以建立goroutine之间的通信连接,channel的特点是:先进先出、线程安全不需要加锁
,channel是Go语言中的一个核心类型,可以把它看成管道。并发核心单元通过它就可以发送或者接收数据通讯,这在一定程度上又进一步降低了编程的难度。channel是一个数据类型,主要用来解决协程的同步问题以及协程之间数据共享(数据传递)的问题。goroutine运行在相同的地址空间,因此访问共享内存必须做好同步。goroutine 奉行通过通信来共享内存,而不是共享内存来通信。
引⽤类型 channel可用于多个 goroutine 通讯。其内部实现了同步,确保并发安全。channel分为有缓冲和无缓冲
channel分为两个端:
传入端负责写的 *** 作,输出端负责读的 *** 作
读和写必须同时满足条件,才在会进行数据流动,否则会阻塞。
例:channel就是一个外卖小哥,传入端是卖家,输出端是买家,必须保证卖家把商品给外卖小哥以后,买家正在准备拿,否则外卖小哥就会懵逼了。
无缓冲的channel:
无缓冲的通道(unbuffered channel)指在接收前不会保存任何数据的一个通道。通道容量为0,可以实现同步的 *** 作, *** 作前提是读和写必须同时 *** 作,否则会阻塞。
缓冲:中间加了个存放数据的区域,然后缓存区慢了才会写入,就像Java的缓冲流一样这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收 *** 作。否则,通道会导致先执行发送或接收 *** 作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个 *** 作都无法离开另一个 *** 作单独存在。阻塞:由于某种原因数据没有到达,当前协程(线程)持续处于等待状态,直到条件满足,才解除阻塞。
同步:在两个或多个协程(线程)间,保持数据内容一致性的机制。
无缓冲创建:
chan
是创建channel所需使用的关键字。Type
代表指定channel收发数据的类型。
make(chan Type) //等价于make(chan Type, 0)
make(chan Type, capacity)
当参数capacity= 0 时,channel 是无缓冲阻塞读写的
当capacity > 0 时,channel 有缓冲、是非阻塞的,直到写满 capacity个元素才阻塞写入。
channel通过 *** 作符<-
来发送和接收数据
发送和接收数据语法:
默认情况下,channel接收和发送数据都是阻塞的,除非另一端已经准备好,这样就使得goroutine同步变的更加的简单,而不需要显式的lock。
channel <- value //发送value到channel
<-channel //接收并将其丢弃
x := <-channel //从channel中接收数据,并赋值给x
x, ok := <-channel //功能同上,同时检查通道是否已关闭或者是否为空
演示:
func main() {
go channelTest01()
go channelTest02()
for {
}
}
// 定义channel
var channel = make(chan int)
// 定义一个公共 *** 作类
func print(s string) {
for _, ch := range s {
fmt.Printf("%c", ch)
time.Sleep(300 * time.Millisecond)
}
}
// 定义两个人使用打印机
func channelTest01() {
print("person01")
channel <- 1 // person01负责写的 *** 作,随便写的数字都行,相当于规定了两个方法的执行顺序
}
func channelTest02() {
<-channel // person02负责读channel中的数据,也就是先把person01的数据读出来才会继续执行person02的任务
//num := <-channel // person02负责读channel中的数据,也就是先把person01的数据读出来才会继续执行person02的任务
// 如果一个写,一个没读,或者是一个读一个没写就会阻塞
print("person02")
//fmt.Println(num) //也可以定义一个变量存起来
}
演示:
func main() {
// 创建无缓冲通道,长度默认为0
ch := make(chan string)
// 验证长度和容量 len(ch):channel中数据剩余未读取的个数 cap(ch):channel的容量
fmt.Println("channel中未读取的个数:", len(ch), "channel的容量:", cap(ch))
// 定义匿名go程
go func() {
for i := 0; i < 3; i++ {
fmt.Println("匿名go程循环:", i, "channel中未读取的个数:", len(ch), "channel的容量:", cap(ch))
}
ch <- "匿名go程执行完毕"
}()
// 主函数读取channel中的数据
result := <-ch
fmt.Println("result", result)
}
演示:
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 3; i++ {
fmt.Println("匿名channel在写:", i)
ch <- i
}
}()
// time.Sleep(300 * time.Millisecond)
for i := 0; i < 3; i++ {
result := <-ch
fmt.Println("main函数channel在读:", result)
}
}
打印结果:
可以看到结果并不是一个写一个读的情况,原因是这样的
Main函数先执行,然后通过make创建了channel下面是两个方法,因为没有指定先后顺序,所以具体执行哪一个是靠抢占线程的然后匿名channel抢到执行权,写了个i=0,然后写到channel里面然后又要抢下一次执行权,还是匿名的channel抢到,又写了个i=1然后main的抢到了,写了两次0、1然后下面一人一次总结就是不管是有缓冲还是无缓冲,都要调用硬件写到屏幕,所以到底谁先谁后,是看谁能抢到CPU的执行权,只要涉及到IO的 *** 作都是有延迟的,打印的数据出现顺序问题都是正常的,
匿名channel在写: 0
匿名channel在写: 1
main函数channel在读: 0
main函数channel在读: 1
匿名channel在写: 2
main函数channel在读: 2
有缓冲的channel:
有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个数据值的通道。
这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也不同。无缓冲的是同步 *** 作,有缓冲是异步 *** 作
只有通道中没有要接收的值时,接收动作才会阻塞。只有通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。
有缓冲的channel创建格式:
如果给定了一个缓冲区容量,通道就是异步的。只要缓冲区有未使用空间用于发送数据,或还包含可以接收的数据,那么其通信就会无阻塞地进行
make(chan Type, capacity)
演示:
有可能会出现
func main() {
ch := make(chan int, 3)
fmt.Println("初始数据:", "channel中未读取的个数:", len(ch), "channel的容量:", cap(ch))
go func() {
for i := 0; i < 3; i++ {
fmt.Println("匿名go程循环:", i, "channel中未读取的个数:", len(ch), "channel的容量:", cap(ch))
ch <- i
}
}()
time.Sleep(300 * time.Millisecond)
for i := 0; i < 3; i++ {
result := <-ch
fmt.Println("main函数channel在读:", result, "channel中未读取的个数:", len(ch), "channel的容量:", cap(ch))
}
}
关闭channel:
如果发送者知道,没有更多的值需要发送到channel的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,因为接收者可以停止不必要的接收等待。这可以通过内置的close函数来关闭channel实现。
channel不像文件一样需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束range循环之类的,才去关闭channel关闭channel后,无法向channel 再发送数据(引发 panic 错误后导致接收立即返回零值)闭channel后,可以继续从channel接收数据对于nil channel,无论收发都会被阻塞。如果不知道发送端要发多少次可以使用 range 来迭代channel
判断channel是否关闭的两种方式
if num , ok := <- ch; ok == true{}
for num := range ch {}
如果已经关闭,ok为false,num无数据如果没有关闭,ok为true,num保存读取的数据golang中还是引用了管道的特性,关闭后就会返回0,但是不需要需判断是否为0,而是用ok判断
func main() {
ch := make(chan int, 3)
go func() {
for i := 0; i < 3; i++ {
ch <- i
}
// 写入端关闭channel
close(ch)
}()
/*for {
if num, ok := <-ch; ok == true {
fmt.Println("读到的数据:", num)
} else {
fmt.Println("关闭后:", num)
break
}
}*/
for num := range ch {
fmt.Println("读到的数据:", num)
}
}
单向channel:
默认情况下,通道channel是双向的,也就是,既可以往里面发送数据也可以往里面接收数据。但是,我们经常见一个通道作为参数进行传递而值希望对方是单向使用的,要么只让它发送数据,要么只让它接收数据,这时候我们可以指定通道的方向。
单向channel变量声明:
var ch1 chan int // ch1是一个正常的channel,是双向的
var ch2 chan<- float64 // ch2是单向channel,只用于写float64数据
var ch3 <-chan int // ch3是单向channel,只用于读int数据
chan<- 表示数据进入管道,要把数据写进管道,对于调用者就是输出。<-chan 表示数据从管道出来,对于调用者就是得到管道的数据,当然就是输入。双向channel可以隐式转换为任意一种单向channel,单向channel不可以转换为双向channel
channel作为函数参数:
channel传参是引用,好处就是多个Goroutine通信的时候会共用一个channel
演示:
func main() {
ch := make(chan int, 3)
go func() {
send(ch) // 相当于双向转为单向写的 *** 作
}()
read(ch)
}
// 读
func read(in <-chan int) {
n := <-in
fmt.Println("读到:", n)
}
// 写
func send(out chan<- int) {
out <- 24
close(out)
}
生产者消费者模型:
单向channel最典型的应用是生产者消费者模型
生产者消费者模型
: 某个模块(函数等)负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、协程、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。单单抽象出生产者和消费者,还够不上是生产者/消费者模型。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。例:小明负责蒸包子,小红负责吃包子,桌子用来放包子
小明蒸好包子——相当于生产者制造数据把包子放在桌子上——相当于生产者把数据放入缓冲区小红从桌子上拿走包子——相当于消费者把数据取出缓冲区小红吃包子——相当于消费者处理数据
这个缓冲区有什么用呢?为什么不让生产者直接调用消费者的某个函数,直接把数据传递过去,而画蛇添足般的设置一个缓冲区呢?
1、解耦
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会直接影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合度也就相应降低了。
2、处理并发
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者只能无端浪费时间。使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。其实最当初这个生产者消费者模式,主要就是用来处理并发问题的。
3、缓存
如果生产者制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
演示:
type OrderInfo struct {
orderId int // 订单id
}
func product(out chan<- OrderInfo) {
for i := 0; i < 10; i++ {
order := OrderInfo{orderId: i + 1}
out <- order
}
close(out)
}
func consumer(in <-chan OrderInfo) {
for order := range in {
fmt.Println("订单id为:", order.orderId)
}
}
func main() {
ch := make(chan OrderInfo)
go product(ch)
consumer(ch)
}
在上面的代码中,加了一个消费者,同时在consumer方法中,将数据取出来后,又进行了一组运算。这时可能会出现一个协程从管道中取出数据,参与加法运算,但是还没有算完另外一个协程又从管道中取出一个数据赋值给了num变量。所以这样累加计算,很有可能出现问题。当然,按照前面的知识,解决这个问题的方法很简单,就是通过加锁的方式来解决。增加生产者也是一样的道理。另外一个问题,如果消费者比生产者多,仓库中就会出现没有数据的情况。我们需要不断的通过循环来判断仓库队列中是否有数据,这样会造成cpu的浪费。反之,如果生产者比较多,仓库很容易满,满了就不能继续添加数据,也需要循环判断仓库满这一事件,同样也会造成CPU的浪费。我们希望当仓库满时,生产者停止生产,等待消费者消费;同理,如果仓库空了,我们希望消费者停下来等待生产者生产。为了达到这个目的,这里引入条件变量。(需要注意:如果仓库队列用channel,是不存在以上情况的,因为channel被填满后就阻塞了,或者channel中没有数据也会阻塞)。
条件变量:
条件变量的作用并不保证在同一时刻仅有一个协程(线程)访问某个共享的数据资源,而是在对应的共享数据的状态发生变化时,通知阻塞在某个条件上的协程(线程)。条件变量不是锁,在并发中不能达到同步的目的,因此条件变量总是与锁一块使用。例如,上面说的,如果仓库队列满了,我们可以使用条件变量让生产者对应的goroutine暂停(阻塞),但是当消费者消费了某个产品后,仓库就不再满了,应该唤醒(发送通知给)阻塞的生产者goroutine继续生产产品。GO标准库中的sys.Cond类型代表了条件变量。条件变量要与锁(互斥锁,或者读写锁)一起使用。成员变量L代表与条件变量搭配使用的锁。
对应的有3个常用方法,Wait、Signal、Broadcast
func (c *Cond) Wait()
该函数的作用可归纳为如下三点:
func (c *Cond) Signal()
func (c *Cond) Broadcast()
演示:
var cond sync.Cond // 创建全局条件变量
// 生产者
func producer(out chan<- int, idx int) {
for {
cond.L.Lock() // 条件变量对应互斥锁加锁
for len(out) == 3 { // 产品区满 等待消费者消费
cond.Wait() // 挂起当前协程, 等待条件变量满足,被消费者唤醒
}
num := rand.Intn(1000) // 产生一个随机数
out <- num // 写入到 channel 中 (生产)
fmt.Printf("%dth 生产者,产生数据 %3d, 公共区剩余%d个数据\n", idx, num, len(out))
cond.L.Unlock() // 生产结束,解锁互斥锁
cond.Signal() // 唤醒 阻塞的 消费者
time.Sleep(time.Second) // 生产完休息一会,给其他协程执行机会
}
}
//消费者
func consumer(in <-chan int, idx int) {
for {
cond.L.Lock() // 条件变量对应互斥锁加锁(与生产者是同一个)
for len(in) == 0 { // 产品区为空 等待生产者生产
cond.Wait() // 挂起当前协程, 等待条件变量满足,被生产者唤醒
}
num := <-in // 将 channel 中的数据读走 (消费)
fmt.Printf("---- %dth 消费者, 消费数据 %3d,公共区剩余%d个数据\n", idx, num, len(in))
cond.L.Unlock() // 消费结束,解锁互斥锁
cond.Signal() // 唤醒 阻塞的 生产者
time.Sleep(time.Millisecond * 500) //消费完 休息一会,给其他协程执行机会
}
}
func main() {
rand.Seed(time.Now().UnixNano()) // 设置随机数种子
quit := make(chan bool) // 创建用于结束通信的 channel
product := make(chan int, 3) // 产品区(公共区)使用channel 模拟
cond.L = new(sync.Mutex) // 创建互斥锁和条件变量
for i := 0; i < 5; i++ { // 5个消费者
go producer(product, i+1)
}
for i := 0; i < 3; i++ { // 3个生产者
go consumer(product, i+1)
}
<-quit // 主协程阻塞 不结束
}
/*
1. main函数中定义quit,其作用是让主协程阻塞。
2. 定义product作为队列,生产者产生数据保存至队列中,最多存储3个数据,消费者从中取出数据模拟消费
3. 条件变量要与锁一起使用,这里定义全局条件变量cond,它有一个属性:L Locker。是一个互斥锁。
4. 开启5个消费者协程,开启3个生产者协程。
5. producer生产者,在该方法中开启互斥锁,保证数据完整性。并且判断队列是否满,如果已满,调用wait()让该goroutine阻塞。当消费者取出数后执行cond.Signal(),会唤醒该goroutine,继续生产数据。
6. consumer消费者,同样开启互斥锁,保证数据完整性。判断队列是否为空,如果为空,调用wait()使得当前goroutine阻塞。当生产者产生数据并添加到队列,执行cond.Signal() 唤醒该goroutine。
*/
定时器:
ime.Timer是一个定时器。代表未来的一个单一事件,你可以告诉timer你要等待多长时间。它提供一个channel,在定时时间到达之前,没有数据写入timer.C会一直阻塞。直到定时时间到,向channel写入值,阻塞解除,可以从中读取数据。
创建:
func main() {
// 三种方法完成定时、NewTimer、After
// 当前时间
fmt.Printf("当前时间:%v\n", time.Now())
// 创建定时器,2秒后,定时器向定时器的C发送time.Timer类型的元素值
timer := time.NewTimer(time.Second * 2)
nowTimer := <-timer.C
fmt.Println("nowTimer", nowTimer)
fmt.Printf("当前时间:%v\n", time.Now())
after := <-time.After(time.Second * 2)
fmt.Println("after", after)
timeNow02 := <-timer.C
fmt.Printf("timeNow02:%v\n", timeNow02) // 当前时间
// time.Sleep
timer2 := time.NewTimer(time.Second * 2)
<-timer2.C
fmt.Println("可以实现单纯的等待2秒")
time.Sleep(time.Second * 2)
fmt.Println("再一次2s后")
}
重置和关闭:
func main() {
// 定时停止和重置
timer3 := time.NewTimer(time.Second * 3)
go func() {
<-timer3.C
fmt.Println("timer3运行完毕")
}()
stop := timer3.Stop() // 设置定时器停止
if stop {
fmt.Println("已经停止")
}
timer4 := time.NewTimer(time.Second * 3) // 原设置时间
timer4.Reset(time.Second * 1) // 重新设置时间
<-timer4.C
fmt.Println("after")
}
定时器周期定时:
func main() {
quit := make(chan bool)
i := 0
fmt.Println("当前时间:", time.Now())
// NewTicker:周期定时器
ticker := time.NewTicker(time.Second)
go func() {
for {
i++
nowTime := <-ticker.C
fmt.Println("nowTime", nowTime)
if i == 5 {
quit <- true
}
}
}()
<-quit
}
select:
通过select可以监听channel上的数据流动
select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。与switch语句相比, select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO *** 作(读写),大致的结构如下:
select {
case <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
在一个select语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有两种可能的情况:
如果给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复。如果没有default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去,但是会产生忙轮询
演示:
func main() {
ch := make(chan int) // 用来进行数据通信的channel
quit := make(chan bool) // 用来判断是否退出的channel
go func() {
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(time.Second)
}
close(ch)
quit <- true // 通知主go程退出
runtime.Goexit()
}()
// 监听channel,读取数据
for {
select {
case num := <-ch:
fmt.Println("读到的数据为:", num)
case <-quit:
return
}
}
}
斐波那契数列:
func main() {
ch := make(chan int) // 用来进行数据通信的channel
quit := make(chan bool) // 用来判断是否退出的channel
go f(ch, quit)
x, y := 1, 1
for i := 0; i < 20; i++ {
ch <- x
x, y = y, x+y
}
quit <- true
}
func f(ch <-chan int, quit <-chan bool) {
for {
select {
case num := <-ch:
fmt.Print(num, " ")
case <-quit:
//return
runtime.Goexit()
}
}
}
超时:
有时候会出现goroutine阻塞的情况,使用select我们如何避免整个程序进入阻塞的情况
演示:
func main() {
ch := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case v := <-ch:
fmt.Println(v)
// 设置5秒读取不到数据就退出,避免阻塞
case <-time.After(5 * time.Second):
fmt.Println("timeout")
quit <- true
break
}
}
}()
ch <- 666 // 写完5秒后退出
<-ch
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)