Go 语言入门:并发编程1(Goroutine与Channel的入门)

Go 语言入门:并发编程1(Goroutine与Channel的入门),第1张

Goroutine goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。 使用goroutine

Go语言中使用goroutine非常简单,只需要在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。

一个goroutine必定对应一个函数,可以创建多个goroutine去执行相同的函数。

启动单个goroutine

启动goroutine的方式非常简单,只需要在调用的函数(普通函数和匿名函数)前面加上一个go关键字。

func hello() {
    fmt.Println("Hello Goroutine!")
}
func main() {
    hello()
    fmt.Println("main goroutine done!")
}
执行的结果是打印完 Hello Goroutine!后打印main goroutine done!

接下来我们在调用hello函数前面加上关键字go,也就是启动一个goroutine去执行hello这个函数。

func main() {
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
}
这一次的执行结果只打印了main goroutine done!,并没有打印Hello Goroutine!。在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。当main() 函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束

使用time.Sleep 对 子进程 grouptine 进行等待

func main() {
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second)
}
执行上面的代码你会发现,这一次先打印main goroutine done!,然后紧接着打印Hello Goroutine!。

启动多个 goroutine

这里使用了sync.WaitGroup来实现goroutine的同步, 有点像java里面的Countdown

var wg sync.WaitGroup

func hello(i int) {
    defer wg.Done() // goroutine结束就登记-1
    fmt.Println("Hello Goroutine!", i)
}
func main() {

    for i := 0; i < 10; i++ {
        wg.Add(1) // 启动一个goroutine就登记+1
        go hello(i)
    }
    wg.Wait() // 等待所有登记的goroutine都结束
}
多次执行上面的代码,会发现每次打印的数字的顺序都不一致。这是因为10个goroutine是并发执行的,而goroutine的调度是随机的。 worker pool(goroutine 池) 本质上是生产者消费者模型可以有效控制goroutine数量,防止暴涨

例子:

计算一个数字的各个位数之和,例如数字123,结果为1+2+3=6随机生成数字进行计算
控制台输出结果如下:
package main

import (
	"fmt"
	"math/rand"
)

type Job struct {
	// id
	Id int
	// 需要计算的随机数
	RandNum int
}

type Result struct {
	// 这里必须传对象实例
	job *Job
	// 求和
	sum int
}

func main() {
	// 需要2个管道
	// 1.job管道
	jobChan := make(chan *Job, 128)
	// 2.结果管道
	resultChan := make(chan *Result, 128)
	// 3.创建工作池
	createPool(64, jobChan, resultChan)
	// 4.开个打印的协程
	go func(resultChan chan *Result) {
		// 遍历结果管道打印
		for result := range resultChan {
			fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,
				result.job.RandNum, result.sum)
		}
	}(resultChan)
	var id int
	// 循环创建job,输入到管道
	for {
		id++
		// 生成随机数
		r_num := rand.Int()
		job := &Job{
			Id:      id,
			RandNum: r_num,
		}
		jobChan <- job
	}
}

// 创建工作池
// 参数1:开几个协程
func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
	// 根据开协程个数,去跑运行
	//多个线程一起跑,计算jobChan中的任务
	for i := 0; i < num; i++ {
		go func(jobChan chan *Job, resultChan chan *Result) {
			// 执行运算
			// 遍历job管道所有数据,进行相加
			for job := range jobChan {
				// 随机数接过来
				r_num := job.RandNum
				// 随机数每一位相加
				// 定义返回值
				var sum int
				for r_num != 0 {
					tmp := r_num % 10
					sum += tmp
					r_num /= 10
				}
				// 想要的结果是Result
				r := &Result{
					job: job,
					sum: sum,
				}
				//运算结果扔到管道
				resultChan <- r
			}
		}(jobChan, resultChan)
	}
}
定时器

Timer:时间到了,执行只执行1次

package main
import (
	"fmt"
	"time"
)

func main() {
	// 1.timer基本使用
	timer1 := time.NewTimer(2 * time.Second)
	t1 := time.Now()
	fmt.Printf("t1:%v\n", t1)
	t2 := <-timer1.C
	fmt.Printf("t2:%v\n", t2)

	// 2.验证timer只能响应1次
	//timer2 := time.NewTimer(time.Second)
	//for {
	//<-timer2.C
	//fmt.Println("时间到")
	//}

	// 3.timer实现延时的功能
	//(1)
	//time.Sleep(time.Second)
	//(2)
	timer3 := time.NewTimer(2 * time.Second)
	<-timer3.C
	fmt.Println("2秒到")
	//(3)
	<-time.After(2*time.Second)
	fmt.Println("2秒到")

	// 4.停止定时器
	timer4 := time.NewTimer(2 * time.Second)
	go func() {
	<-timer4.C
	fmt.Println("定时器执行了")
	}()
	b := timer4.Stop()
	if b {
	fmt.Println("timer4已经关闭")
	}

	// 5.重置定时器
	timer5 := time.NewTimer(3 * time.Second)
	timer5.Reset(1 * time.Second)
	fmt.Println(time.Now())
	fmt.Println(<-timer5.C)

	for {
	}
}

Ticker:时间到了,多次执行

package main
import (
    "fmt"
    "time"
)	
func main() {
    // 1.获取ticker对象
    ticker := time.NewTicker(1 * time.Second)
    i := 0
    // 子协程
    go func() {
        for {
            //<-ticker.C
            i++
            fmt.Println(<-ticker.C)
            if i == 5 {
                //停止
                ticker.Stop()
            }
        }
    }()
    for {
    }
}
runtime包 runtime.Gosched(): 让出时间片
package main

import (
    "fmt"
    "runtime"
)

func main() {
    go func(s string) {
        for i := 0; i < 2; i++ {
            fmt.Println(s)
        }
    }("world")
    // 主协程
    for i := 0; i < 2; i++ {
        // 切一下,再次分配任务
        runtime.Gosched()
        fmt.Println("hello")
    }
}
runtime.Goexit():退出当前协程序(直接退出,不会恢复)
package main

import (
	"fmt"
	"runtime"
)

func main() {
	go func() {
		defer fmt.Println("A.defer")
		func() {
			defer fmt.Println("B.defer")
			// 结束协程
			fmt.Println("f.ok")
			runtime.Goexit()
			defer fmt.Println("C.defer")
			fmt.Println("B")
		}()
		fmt.Println("A")
	}()
	for {
	}
}

结果如下:

f.ok
B.defer
A.defer
runtime.GOMAXPROCS: 确定os线程数 Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。Go语言中可以通过 runtime.GOMAXPROCS() 函数设置当前程序并发时占用的CPU逻辑核心数。

我们可以通过将任务分配到不同的 CPU 逻辑核心上实现并行的效果,这里举个例子:

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(1)
    go a()
    go b()
    time.Sleep(time.Second)
}

两个任务只有一个逻辑核心,此时是做完一个任务再做另一个任务。 将逻辑核心数设为2,此时两个任务并行执行,代码如下。

func a() {
    for i := 1; i < 10; i++ {
        fmt.Println("A:", i)
    }
}

func b() {
    for i := 1; i < 10; i++ {
        fmt.Println("B:", i)
    }
}

func main() {
    runtime.GOMAXPROCS(2)
    go a()
    go b()
    time.Sleep(time.Second)
}

Go语言中的 *** 作系统线程和goroutine的关系:

1.一个 *** 作系统线程对应用户态多个goroutine。2.go程序可以同时使用多个 *** 作系统线程。3.goroutine和OS线程是多对多的关系,即m:n。 Channel *** 作

虽然 groutine 之间可以使用共享内存进行数据交换,但是共享内存在不同的goroutine 中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

channel类型

channel 是一种类型,一种引用类型。声明通道类型的格式如下:

var 变量 chan 元素类型
var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道
创建channel

声明通道: 通道是引用类型,通道类型的空值是nil

var ch chan int
fmt.Println(ch) // 

声明的通道后需要使用make函数初始化之后才能使用。

实例化通道: 创建channel的格式如下:

make(chan 元素类型, [缓冲大小])

channel的缓冲大小是可选的。

ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)
channel *** 作

通道有发送(send)、接收(receive)和关闭(close)三种 *** 作。

发送和接收都使用<-符号。

现在我们先使用以下语句定义一个通道:

ch := make(chan int)
//发送
// 将一个值发送到通道中。
ch <- 10 // 把10发送到ch中

//接收
//从一个通道中接收值。
x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果

//关闭
//我们通过调用内置的close函数来关闭通道。
close(ch)

关于关闭通道需要注意的事情是,

只有在通知接收方 goroutine 所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束 *** 作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

1.对一个关闭的通道再发送值就会导致panic。2.对一个关闭的通道进行接收会一直获取值直到通道为空。3.对一个关闭的并且没有值的通道执行接收 *** 作会得到对应类型的零值。4.关闭一个已经关闭的通道会导致panic。 无缓冲的通道: 同步通道

无缓冲的通道又称为阻塞的通道。我们来看一下下面的代码:

func main() {
    ch := make(chan int)
    ch <- 10
    fmt.Println("发送成功")
}

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        .../src/github.com/pprof/studygo/day06/channel02/main.go:8 +0x54
因为我们使用 ch := make(chan int) 创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。

上面的代码会阻塞在ch <- 10这一行代码形成死锁,那如何解决这个问题呢?

一种方法是启用一个goroutine去接收值,例如:

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}
func main() {
    ch := make(chan int)
    go recv(ch) // 启用goroutine从通道接收值
    ch <- 10
    fmt.Println("发送成功")
}

无缓冲通道上的发送 *** 作会阻塞,直到另一个goroutine在该通道上执行接收 *** 作,这时值才能发送成功,两个goroutine将继续执行

相反,如果接收 *** 作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

有缓冲的通道

解决上面问题的方法还有一种就是使用有缓冲区的通道。我们可以在使用make函数初始化通道的时候为其指定通道的容量,例如:

func main() {
    ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
    ch <- 10
    fmt.Println("发送成功")
}
只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做。 close()

通过内置的close()函数关闭channel(如果你的管道不往里存值或者取值的时候一定记得关闭管道)

package main

import "fmt"

func main() {
    c := make(chan int)
    go func() {
        for i := 0; i < 5; i++ {
            c <- i
        }
        close(c)
    }()
    for {
        if data, ok := <-c; ok {
            fmt.Println(data)
        } else {
            break
        }
    }
    fmt.Println("main结束")
}

注意: 关闭已经关闭的channel也会引发panic。

如何优雅的从通道循环取值 当通过通道发送有限的数据时,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待。当通道被关闭时,往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。那如何判断一个通道是否被关闭了呢?

我们来看下面这个例子:

// channel 练习
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    // 开启goroutine将0~100的数发送到ch1中
    go func() {
        for i := 0; i < 100; i++ {
            ch1 <- i
        }
        close(ch1)
    }()
    // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
    go func() {
        for {
            i, ok := <-ch1 // 通道关闭后再取值ok=false
            if !ok {
                break
            }
            ch2 <- i * i
        }
        close(ch2)
    }()
    // 在主goroutine中从ch2中接收值打印
    for i := range ch2 { // 通道关闭后会退出for range循环
        fmt.Println(i)
    }
}

从上面可以看到从判断通道是否关闭的方式有两种:

!okfor range 单向通道

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。

方法:直接在参数传入是时就指定 xxx chan<- xx (只准输入) 还是 yyy<-chan xx (只准输出)
Go语言中提供了单向通道来处理这种情况。例如,我们把上面的例子改造如下:
func counter(out chan<- int) {
    for i := 0; i < 100; i++ {
        out <- i
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for i := range in {
        out <- i * i
    }
    close(out)
}
func printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

其中,

1.chan<- int是一个只能发送的通道,可以发送但是不能接收;2.<-chan int是一个只能接收的通道,可以接收但是不能发送。

1.1.10. 通道总结
channel常见的异常总结,如下图:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/996318.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存