目录
1、基本概念
2、sync.WaitGroup
3、goroutine和线程
4、channel
5、无缓冲通道和缓冲通道
6、生产者和消费者模型
7、select 多路复用
8、单向通道
总结
1、基本概念
并发:是指一个时间段中几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,担任一个时刻点上只有一个程序在处理机上运行。同一时间段同时在做多个事情。
并行:在 *** 作系统中指,一组程序按独立异步的速度执行,无论从微观还是宏观,程序都是一起执行的。同一时刻同时在做多个事情。
进程:一个程序在启动之后创建一个进程。
线程: *** 作系统调度的最小单元。
协程:用户态的线程。
goroutine:go关键字为一个函数创建一个goroutine。一个函数可以被创建多个goroutine,一个goroutine必定对应一个函数。
实例:等待goroutine运行完main在结束。
package main
import (
"fmt"
"time"
)
func hello() {
fmt.Println("hello函数")
}
func main() {
go hello() //先创建一个goroutine,再goroutine中执行hello函数
//等待1秒钟,再执行main函数
time.Sleep(time.Second)
fmt.Println("main函数")
}
//运行结果为:
hello函数
main函数
实例:加入defer语句。
package main
import (
"fmt"
"time"
)
func hello() {
fmt.Println("hello函数")
}
func main() {
//在程序执行完之前执行
defer fmt.Println("defer函数")
go hello() //先创建一个goroutine,再goroutine中执行hello函数
//等待1秒钟,再执行main函数
time.Sleep(time.Second)
//再执行main函数
fmt.Println("main函数")
}
//运行结果为:
hello函数
main函数
defer函数
2、sync.WaitGroup
使用sync.WaitGroup无需设置等待时间,它会自动等待所有goroutine执行完成后在结束main,效率提升。
实例:
package main
import (
"fmt"
"sync"
)
//计数器结构体实例
var sw sync.WaitGroup
func hello() {
fmt.Println("hello函数")
//告知计数器运行完毕,次数-1
sw.Done()
}
func test() {
fmt.Println("test函数")
sw.Done()
}
func main() {
defer fmt.Println("defer语句")
//设置计数器次数
sw.Add(2)
go hello()
go test()
fmt.Println("main函数")
//等待计数器归零,结束main。否则一直处于等待状态,不关闭main函数。
sw.Wait()
}
//运行结果为:
main函数
hello函数
test函数
defer语句
实例:启动10个goroutine执行。
package main
import (
"fmt"
"sync"
)
//启用10个goroutine
var sw sync.WaitGroup
func hello(i int) {
fmt.Println("hello函数", i)
sw.Done()
}
func main() {
sw.Add(10)
//创建10个goroutine
for i := 0; i < 10; i++ {
go hello(i)
}
fmt.Println("main函数")
sw.Wait()
}
//运行结果为:
hello函数 4
main函数
hello函数 6
hello函数 0
hello函数 2
hello函数 3
hello函数 9
hello函数 8
hello函数 7
hello函数 1
hello函数 5
示例:panic宕机前把错误信息发送到控制台上,程序结束,资源全部释放。
package main
import (
"fmt"
"sync"
)
//定义sync.WaitGroup结构体,内置计数器
var sw sync.WaitGroup
func hello(i int) {
//计数器-1,goroutine会全部执行完成
fmt.Println("hello函数", i)
if i == 6 {
panic("宕机报警")
}
//遇到panic不执行
sw.Done()
}
func main() {
defer fmt.Println("defer语句")
//启用10个goroutine
sw.Add(10)
for i := 0; i < 10; i++ {
go hello(i)
}
sw.Wait()
}
//运行结果为:
hello函数 9
hello函数 5
hello函数 6
hello函数 4
hello函数 1
panic: 宕机报警
3、goroutine和线程
可增长的栈:OS线程( *** 作系统线程)一般都有固定的栈内存(2M),一个goroutine的栈在生命周期开始时只有很小的栈(2K),goroutine的栈是不固定的可以按需增大或缩小,goroutine的栈大小限制可达到1G,虽然这种情况不多见,所以一次性创建十万左右的goroutine是没问题的。
goroutine调度:OS线程由OS内核来调度,goroutine则是由Go运行时(runtime)自己的调度器来调度,这个调度器使用一个m:n调度的技术(复用/调度m个goroutine到n个OS线程),goroutine的调度不需要切换内核语境,所以调用一个goroutine比调用个线程的成本要低很多。
GOMAXPROCS:Go运行时的调度使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码,默认值是机器上的CPU核心数。例如:在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m.n调度中的n)。
Go可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。(Go1.5版本前默认是单核心执行,Go1.5版本后默认使用全部逻辑核心数)。
示例:通过将任务分配到不同的CPU逻辑核心上实现并行效果。
package main
import (
"fmt"
"runtime"
"sync"
)
var sw sync.WaitGroup
func a() {
defer sw.Done()
for i := 0; i < 5; i++ {
fmt.Println("A:", i)
}
}
func b() {
defer sw.Done()
for i := 0; i < 5; i++ {
fmt.Println("B:", i)
}
}
func main() {
//使用两个逻辑核心数跑go程序
runtime.GOMAXPROCS(2)
sw.Add(2)
go a()
go b()
sw.Wait()
}
//运行结果为:
B: 0
A: 0
A: 1
A: 2
A: 3
A: 4
B: 1
B: 2
B: 3
B: 4
4、channel
单纯的将函数并发执行没有意义。函数与函数间需要交换数据才能体现并发执行函数的意义。
虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
go语言的并发模型是SCP,提倡通过通信共享内存而不是通过共享内存而实现通信。
如果说goroutine是Go程序并发的执行体,channel就是它们之间连接。channel是可以让一个goroutine发送特定值到另个goroutine的通信机制。
go语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,在声明channel的时候需要为其指定元素类型。
声明语法:
var 变量 chan 元素类型
var ch1 chan int //传递整型的通道
var ch2 chan bool //传递布尔型的通道
var ch3 chan []int //传递整型切片的通道
示例:使用channel传递数据。
package main
import "fmt"
//channel定义
func main() {
//通道ch1传输int元素数据
var ch1 chan int
//通道ch2传输布尔元素数据
var ch2 chan bool
fmt.Println("ch1", ch1)
fmt.Println("ch2", ch2)
//使用make初始化,需要定义传入的值的个数
ch3 := make(chan int, 2)
//发送
ch3 <- 10
ch3 <- 20
//接受
result1 := <-ch3
result2 := <-ch3
fmt.Println("ch3:", result1, result2)
//关闭通道
close(ch3)
//关闭通道后是否能传入值
//ch3 <- 30
result3 := <-ch3
fmt.Println("关闭通道后传入的值ch3:", result3)
}
//运行结果为:
ch1
ch2
ch3: 10 20
关闭通道后传入的值ch3: 0
示例:传递并取出多个值。
package main
import "fmt"
func main() {
var ch = make(chan int, 10)
for i := 0; i < 10; i++ {
ch <- i
if i == 6 {
close(ch)
break
}
}
//通道元素数量
leng := len(ch)
fmt.Printf("ch元素格式 %d, 容量 %d\n", leng, cap(ch)) //cap()取容量
for i := 0; i < leng; i++ {
result := <-ch
fmt.Println(result)
}
//数据量不确定的情况下取数据使用for range
// for result := range ch {
// fmt.Println(result)
// }
}
//运行结果为:
ch元素格式 7, 容量 10
0
1
2
3
4
5
6
5、无缓冲通道和缓冲通道
示例:无缓冲通道。
package main
import "fmt"
//通道取值
func Result(ch chan bool) {
//取值,未取到会处于阻塞状态
ret := <-ch
fmt.Println(ret)
}
func main() {
var ch = make(chan bool)
go Result(ch)
//传递数据,同步执行
ch <- true
fmt.Println("main 函数")
}
//输出结果为:
true
main 函数
示例:缓冲区通道。
package main
import (
"fmt"
"time"
)
func Result(ch chan bool) {
ret := <-ch
fmt.Println(ret)
}
func main() {
//缓冲通道,可以异步执行
ch := make(chan bool, 10)
ch <- false
//获取数据量
fmt.Println(len(ch), cap(ch))
go Result(ch)
ch <- true
time.Sleep(time.Second)
fmt.Println("main函数")
}
//运行结果为:
1 10
false
main函数
示例:取值时判断通道是否关闭。
package main
import "fmt"
//产生数据输入通道,输入完即关闭
func send(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func main() {
var ch = make(chan int, 100)
go send(ch)
for {
result, ok := <-ch
if !ok {
break
}
fmt.Print(result)
}
}
//运行结果为:
0123456789
方式二:
package main
import "fmt"
//产生数据输入通道,输入完即关闭
func send(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func main() {
var ch = make(chan int, 100)
go send(ch)
//获取值
for result := range ch {
fmt.Print(result)
}
}
//运行结果为:
0123456789
6、生产者和消费者模型
生产者——》产生随机数。
消费者——》计算每个随机数的每个位的数字的和。
package main
import (
"fmt"
"math/rand"
"time"
)
//随机数通道
var itemChan chan *item
//随机数结构体
type item struct {
id int64
num int64
}
//求和通道
var resultChan chan *result
//求和结构体
type result struct {
item *item
sum int64
}
//生产者
func producer(ch chan *item) {
//生成随机数
var id int64
for {
id++
number := rand.Int63() //随机正整数
//构造结构体
tmp := &item{
id: id,
num: number,
}
//随机数发送到通道中
ch <- tmp
time.Sleep(time.Millisecond * 100)
}
}
//计算求和
func calc(i int64) int64 {
//求和变量
var sum int64
for i > 0 {
//得到每一个位数进行累加
sum = sum + i%10
i = i / 10
}
return sum
}
//消费者
func consumer(ch chan *item, resultChan chan *result) {
for {
tmp := <-ch
//数据运算
sum := calc(tmp.num) //tmp.num 就是(*tmp).name
//构造result结构体
resObj := &result{
item: tmp,
sum: sum,
}
//结果传递给通道result等待进行输出
resultChan <- resObj
}
}
//打印结果
func printResult(resultChan chan *result) {
for ret := range resultChan {
fmt.Printf("id:%v,num:%v,sum:%v\n", ret.item.id, ret.item.num, ret.sum)
time.Sleep(time.Second)
}
}
//指定数量的goroutine
func startWorker(n int, ch chan *item, resultChan chan *result) {
for i := 0; i < n; i++ {
go consumer(ch, resultChan)
}
}
func main() {
//通道初始化,结构体指针类型
itemChan = make(chan *item, 100)
resultChan = make(chan *result, 100)
//启用生产者goroutine
go producer(itemChan)
//消费者gotoutine
startWorker(30, itemChan, resultChan)
printResult(resultChan)
}
//输出结果如下
id:1,num:5577006791947779410,sum:95
id:2,num:8674665223082153551,sum:79
id:3,num:6129484611666145821,sum:81
id:4,num:4037200794235010051,sum:53
id:5,num:3916589616287113937,sum:95
id:6,num:6334824724549167320,sum:80
id:7,num:605394647632969758,sum:99
id:8,num:1443635317331776148,sum:77
id:9,num:894385949183117216,sum:89
id:10,num:2775422040480279449,sum:80
id:11,num:4751997750760398084,sum:99
...
...
...
示例:方法一,产生固定数量数据(10000),消费并输出。
package main
import (
"fmt"
"math/rand"
"sync"
)
//计数器处理消费者goroutine
var sw sync.WaitGroup
//随机数通道
var itemChan chan *item
//随机数结构体
type item struct {
id int64
num int64
}
//求和通道
var resultChan chan *result
//求和结构体
type result struct {
item *item
sum int64
}
//生产者
func producer(itemCh chan *item) {
//生成随机数
var id int64
for i := 0; i < 10000; i++ {
id++
number := rand.Int63() //随机正整数
//构造结构体
tmp := &item{
id: id,
num: number,
}
//随机数发送到通道中
itemCh <- tmp
}
//生产完成关闭itemChan通道
close(itemCh)
}
//计算求和
func calc(i int64) int64 {
//求和变量
var sum int64
for i > 0 {
//得到每一个位数进行累加
sum = sum + i%10
i = i / 10
}
return sum
}
//消费者
func consumer(itemch chan *item, resultChan chan *result) {
defer sw.Done()
for tmp := range itemch {
//数据运算
sum := calc(tmp.num) //tmp.num 就是(*tmp).name
//构造result结构体
resObj := &result{
item: tmp,
sum: sum,
}
//结果传递给通道result等待进行输出
resultChan <- resObj
}
}
//打印结果
func printResult(resultChan chan *result) {
for ret := range resultChan {
fmt.Printf("id:%v,num:%v,sum:%v\n", ret.item.id, ret.item.num, ret.sum)
}
}
//指定数量的goroutine
func startWorker(n int, ch chan *item, resultChan chan *result) {
for i := 0; i < n; i++ {
go consumer(ch, resultChan)
}
}
func main() {
//通道初始化,结构体指针类型
itemChan = make(chan *item, 10000)
resultChan = make(chan *result, 10000)
//启用生产者goroutine
go producer(itemChan)
//启用计数器
sw.Add(30)
//消费者gotoutine
startWorker(30, itemChan, resultChan)
//等待goroutine结束
sw.Wait()
//关闭resultChan通道
close(resultChan)
printResult(resultChan)
}
//输出结果如下
id:1,num:5577006791947779410,sum:95
id:2,num:8674665223082153551,sum:79
id:3,num:6129484611666145821,sum:81
id:4,num:4037200794235010051,sum:53
...
id:9997,num:661484091736918950,sum:87
id:9998,num:5665788214883279410,sum:94
id:9999,num:3873652276866279948,sum:108
id:10000,num:8455728612988973956,sum:112
示例:方法二,通道和goroutine配合处理指定数量数据。
package main
import (
"fmt"
"math/rand"
)
//随机数通道
var itemChan chan *item
//求和管道
var resultChan chan *result
//空结构体通道
var doneChan chan struct{}
//随机数结构体
type item struct {
id int64
num int64
}
//求和结构体
type result struct {
item *item
sum int64
}
//生产者
func producer(itemCh chan *item) {
var id int64
//指定数量数据
for i := 0; i < 10000; i++ {
//序列号自增
id++
number := rand.Int63() //随机正整数
//随机数结构体封装
tmp := &item{
id: id,
num: number,
}
//随机数放入通道
itemCh <- tmp
// time.Sleep(time.Millisecond*100)
}
//itemCh通道关闭
close(itemCh)
}
//求和运算
func calc(i int64) int64 {
//求和变量
var sum int64
for i > 0 {
//每位数字求和
sum = sum + i%10
i = i / 10
}
return sum
}
//消费者
func consumer(itemCh chan *item, resultCh chan *result) {
for tmp := range itemCh {
//数据运算
sum := calc(tmp.num)
//求和结构体封装
result := &result{
item: tmp,
sum: sum,
}
//求和数放入resultChan通道
resultCh <- result
}
//传递空结构体进通道
doneChan <- struct{}{}
}
//输出遍历
func printResult(resultChan chan *result) {
for ret := range resultChan {
fmt.Printf("id:%v;num:%v;sum:%v\n", ret.item.id, ret.item.num, ret.sum)
//节奏输出控制
// time.Sleep(time.Second)
}
}
//消费者goroutine数量控制
func startWorker(n int, ch chan *item, resultCh chan *result) {
//开启n数量的goroutine
for i := 0; i < n; i++ {
//goroutine调用函数
go consumer(ch, resultCh)
}
}
//监控goroutine即时关闭通道
func closeChan(n int, doneChan chan struct{}, resultChan chan *result) {
//监控goroutine次数
for i := 0; i < n; i++ {
<-doneChan
}
//计数通道关闭
close(doneChan)
//求和通道关闭
close(resultChan)
}
func main() {
//通道初始化,结构体指针类型
itemChan = make(chan *item, 10000)
resultChan = make(chan *result, 10000)
doneChan = make(chan struct{}, 30)
//启用生产者goroutine
go producer(itemChan)
//消费者goroutine,高并发处理
startWorker(30, itemChan, resultChan)
//启用goroutine监控流程是否结束
go closeChan(30, doneChan, resultChan)
//调用输出函数
printResult(resultChan)
}
7、select 多路复用
类似于用于通信的switch语句。每个case必须是一个通信 *** 作,要么是发送要么是接收。
select 随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行。一个默认的子句应该总是可运行的。
某些场景下,我们需要同事从多个通道接受数据,没有数据发生接收就会阻塞,我们往往可以使用一下方式解决。
语法格式:
select {
case communication clause:
statement(s);
case communication clause:
statement(s);
/* 你可以定义任何数量的case*/
default : /* 可选 */’
statement(s);
}
每个case 都必须是一个通信。
所有channel表达式都会被求值。
所有被发送的表达式都会被求值。
如果任意某个通信可以进行,它就执行,其他被忽略。
示例:
package main
import (
"fmt"
"math"
"time"
)
//select语句
var ch1 = make(chan string, 100)
var ch2 = make(chan string, 100)
//发送数据k1
func sendK1(ch chan string) {
//产生数据
for i := 0; i < math.MaxInt64; i++ {
//像ch中发送数据
ch <- fmt.Sprintf("k1:%d\n", i)
//50毫秒放入一个数据
time.Sleep(time.Millisecond * 50)
}
}
//发送数据k2
func sendK2(ch chan string) {
//产生数据
for i := 0; i < math.MaxInt64; i++ {
//像ch中发送数据
ch <- fmt.Sprintf("k2:%d\n", i)
//100毫秒放入一个数据
time.Sleep(time.Millisecond * 100)
}
}
func main() {
go sendK1(ch1)
go sendK2(ch2)
//取值
for {
//如果通道都可通信,是随机公平执行其中一条,忽略其他;
//如果通道都不可通信,且没有default语句时候,处于阻塞状态,直到可以通信为止
select {
case ret := <-ch1:
fmt.Println(ret)
case ret := <-ch2:
fmt.Println(ret)
default:
fmt.Println("没有数据可取")
time.Sleep(time.Millisecond * 500)
}
}
}
//输出结果如下
没有数据可取
k2:0
k2:1
k2:2
k1:0
k1:1
k2:3
k2:4
k1:2
...
示例:
package main
import "fmt"
//select case通信原理
func main() {
var ch = make(chan int, 1)
for i := 0; i < 10; i++ {
//解决死锁问题
select {
case ch <- i:
case ret := <-ch:
fmt.Println(ret)
}
}
}
//输出结果如下
0
2
4
6
8
示例:使用select 完善生产者和消费者模型,键盘输入回车终止数据。
package main
import (
"fmt"
"math/rand"
"os"
"time"
)
// 传送随机数通道(使用指针传递)
var itemChan chan *item
// 传送求和值通道
var resultChan chan *result
// 空结构体传输通道
var doneChan chan struct{}
//随机数字结构体
type item struct {
id int64
num int64
}
//求和结构体
type result struct {
item *item
sum int64
}
//生产者函数
func producer(ch chan *item) {
// 1.生成随机数
var id int64
for {
// 序列号自增
id++
number := rand.Int63() // 随机生成正整数
// 随机数结构体封装
tmp := &item{
id: id,
num: number,
}
// 2. 随机数发送到通道中
ch <- tmp
}
}
//计算求和函数
func calc(num int64) int64 {
// 和:值
var sum int64
for num > 0 {
// 得到每一个位数进行累加
sum = sum + num%10
num = num / 10
}
return sum
}
//消费者函数
func consumer(ch chan *item, resultChan chan *result) {
// 从 itemChan 通道中取随机数结构体指针
for tmp := range ch {
sum := calc(tmp.num) // tmp.num 就是 (*tmp).num,会自动识别指针
// 构造 result 结构体
resObj := &result{
item: tmp,
sum: sum,
}
// 结果传递通道 resultChan 等待进行输出
resultChan <- resObj
}
}
// 打印结果
func printResult(doneChan chan struct{}, resultChan chan *result) {
for {
// 等待 doneChan 输入退出输出信息
select {
case ret := <-resultChan:
fmt.Printf("id:%v,num:%v,sum:%v\n", ret.item.id, ret.item.num, ret.sum)
time.Sleep(time.Second)
case <-doneChan:
return
}
}
}
// 监听键盘输入字符传递给 doneChan 通道
func inputChan(doneChan chan struct{}) {
// 一个字符的输入
tmp := [1]byte{}
// 从标准输入获取值,未输入一直处于等待状态
os.Stdin.Read(tmp[:])
doneChan <- struct{}{}
}
// 灵活启用指定数量的 goroutine
// n 为要开启的 goroutine 数量
func startWorker(n int, ch chan *item, resultChan chan *result) {
for i := 0; i < n; i++ {
go consumer(ch, resultChan)
}
}
func main() {
// 通道初始化,结构体指针类型
itemChan = make(chan *item, 100)
resultChan = make(chan *result, 100)
doneChan = make(chan struct{}, 1)
// 启用生产者 goroutine
go producer(itemChan)
// 消费者 goroutine,高并发处理
startWorker(30, itemChan, resultChan)
go inputChan(doneChan)
// 打印结果
printResult(doneChan, resultChan)
}
8、单向通道
在函数中稚嫩那个发送值而不能接收值为只写通道。
只能接收不能发送值称为只读通道。
可以让代码意向更明确,更清晰。
示例:
package main
import "fmt"
var ch chan int
//只写通道,针对一个函数中实现
func writeCh(ch chan<- int) {
ch <- 10
}
//只读通道
func read(ch <-chan int) int {
ret := <-ch
return ret
}
func main() {
ch = make(chan int, 1)
writeCh(ch)
fmt.Println(read(ch))
}
//输出结果如下
10
总结
出现panic情况:
通道关闭以后,可以取值,但是不能传递值,会出现panic。
通道关闭后,如果再次关闭通道,会panic。
出现死锁的情况:
通道中元素的容量超出也会死锁。
没有传递元素到通道里,直接取值,会死锁。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)