Go语言-高并发

Go语言-高并发,第1张

目录

1、基本概念

2、sync.WaitGroup

3、goroutine和线程

4、channel

5、无缓冲通道和缓冲通道

6、生产者和消费者模型

7、select 多路复用

8、单向通道

总结


1、基本概念

并发:是指一个时间段中几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,担任一个时刻点上只有一个程序在处理机上运行。同一时间段同时在做多个事情。

并行:在 *** 作系统中指,一组程序按独立异步的速度执行,无论从微观还是宏观,程序都是一起执行的。同一时刻同时在做多个事情。

进程:一个程序在启动之后创建一个进程。

线程: *** 作系统调度的最小单元。

协程:用户态的线程。

goroutine:go关键字为一个函数创建一个goroutine。一个函数可以被创建多个goroutine,一个goroutine必定对应一个函数。

实例:等待goroutine运行完main在结束。

package main

import (
	"fmt"
	"time"
)

func hello() {
	fmt.Println("hello函数")
}

func main() {
	go hello() //先创建一个goroutine,再goroutine中执行hello函数
	//等待1秒钟,再执行main函数
	time.Sleep(time.Second)
	fmt.Println("main函数")
}

//运行结果为:
hello函数
main函数

实例:加入defer语句。

package main

import (
	"fmt"
	"time"
)

func hello() {
	fmt.Println("hello函数")
}

func main() {
	//在程序执行完之前执行
	defer fmt.Println("defer函数")
	go hello() //先创建一个goroutine,再goroutine中执行hello函数
	//等待1秒钟,再执行main函数
	time.Sleep(time.Second)
	//再执行main函数
	fmt.Println("main函数")
}

//运行结果为:
hello函数
main函数
defer函数
2、sync.WaitGroup

使用sync.WaitGroup无需设置等待时间,它会自动等待所有goroutine执行完成后在结束main,效率提升。

实例:

package main

import (
	"fmt"
	"sync"
)

//计数器结构体实例
var sw sync.WaitGroup

func hello() {
	fmt.Println("hello函数")
	//告知计数器运行完毕,次数-1
	sw.Done()
}

func test() {
	fmt.Println("test函数")
	sw.Done()
}

func main() {
	defer fmt.Println("defer语句")
	//设置计数器次数
	sw.Add(2)
	go hello()
	go test()
	fmt.Println("main函数")
	//等待计数器归零,结束main。否则一直处于等待状态,不关闭main函数。
	sw.Wait()
}

//运行结果为:
main函数
hello函数
test函数
defer语句

实例:启动10个goroutine执行。

package main

import (
	"fmt"
	"sync"
)

//启用10个goroutine
var sw sync.WaitGroup

func hello(i int) {
	fmt.Println("hello函数", i)
	sw.Done()
}

func main() {
	sw.Add(10)
	//创建10个goroutine
	for i := 0; i < 10; i++ {
		go hello(i)
	}
	fmt.Println("main函数")
	sw.Wait()
}

//运行结果为:
hello函数 4
main函数
hello函数 6
hello函数 0
hello函数 2
hello函数 3
hello函数 9
hello函数 8
hello函数 7
hello函数 1
hello函数 5

示例:panic宕机前把错误信息发送到控制台上,程序结束,资源全部释放。

package main

import (
	"fmt"
	"sync"
)

//定义sync.WaitGroup结构体,内置计数器
var sw sync.WaitGroup

func hello(i int) {
	//计数器-1,goroutine会全部执行完成
	fmt.Println("hello函数", i)
	if i == 6 {
		panic("宕机报警")
	}
	//遇到panic不执行
	sw.Done()
}

func main() {
	defer fmt.Println("defer语句")
	//启用10个goroutine
	sw.Add(10)
	for i := 0; i < 10; i++ {
		go hello(i)
	}
	sw.Wait()
}

//运行结果为:
hello函数 9
hello函数 5
hello函数 6
hello函数 4
hello函数 1
panic: 宕机报警
3、goroutine和线程

可增长的栈:OS线程( *** 作系统线程)一般都有固定的栈内存(2M),一个goroutine的栈在生命周期开始时只有很小的栈(2K),goroutine的栈是不固定的可以按需增大或缩小,goroutine的栈大小限制可达到1G,虽然这种情况不多见,所以一次性创建十万左右的goroutine是没问题的。

goroutine调度:OS线程由OS内核来调度,goroutine则是由Go运行时(runtime)自己的调度器来调度,这个调度器使用一个m:n调度的技术(复用/调度m个goroutine到n个OS线程),goroutine的调度不需要切换内核语境,所以调用一个goroutine比调用个线程的成本要低很多。

GOMAXPROCS:Go运行时的调度使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码,默认值是机器上的CPU核心数。例如:在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m.n调度中的n)。

Go可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。(Go1.5版本前默认是单核心执行,Go1.5版本后默认使用全部逻辑核心数)。

示例:通过将任务分配到不同的CPU逻辑核心上实现并行效果。

package main

import (
	"fmt"
	"runtime"
	"sync"
)

var sw sync.WaitGroup

func a() {
	defer sw.Done()
	for i := 0; i < 5; i++ {
		fmt.Println("A:", i)
	}
}

func b() {
	defer sw.Done()
	for i := 0; i < 5; i++ {
		fmt.Println("B:", i)
	}
}

func main() {
	//使用两个逻辑核心数跑go程序
	runtime.GOMAXPROCS(2)
	sw.Add(2)
	go a()
	go b()
	sw.Wait()
}

//运行结果为:
B: 0
A: 0
A: 1
A: 2
A: 3
A: 4
B: 1
B: 2
B: 3
B: 4
4、channel

单纯的将函数并发执行没有意义。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

go语言的并发模型是SCP,提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说goroutine是Go程序并发的执行体,channel就是它们之间连接。channel是可以让一个goroutine发送特定值到另个goroutine的通信机制。

go语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,在声明channel的时候需要为其指定元素类型。

声明语法:

var 变量 chan 元素类型

var ch1 chan int   //传递整型的通道
var ch2 chan bool  //传递布尔型的通道
var ch3 chan []int //传递整型切片的通道 

示例:使用channel传递数据。

package main

import "fmt"

//channel定义
func main() {
	//通道ch1传输int元素数据
	var ch1 chan int
	//通道ch2传输布尔元素数据
	var ch2 chan bool
	fmt.Println("ch1", ch1)
	fmt.Println("ch2", ch2)
	//使用make初始化,需要定义传入的值的个数
	ch3 := make(chan int, 2)
	//发送
	ch3 <- 10
	ch3 <- 20
	//接受
	result1 := <-ch3
	result2 := <-ch3
	fmt.Println("ch3:", result1, result2)
	//关闭通道
	close(ch3)
	//关闭通道后是否能传入值
	//ch3 <- 30
	result3 := <-ch3
	fmt.Println("关闭通道后传入的值ch3:", result3)
}

//运行结果为:
ch1 
ch2 
ch3: 10 20
关闭通道后传入的值ch3: 0

示例:传递并取出多个值。

package main

import "fmt"

func main() {
	var ch = make(chan int, 10)
	for i := 0; i < 10; i++ {
		ch <- i
		if i == 6 {
			close(ch)
			break
		}
	}
	//通道元素数量
	leng := len(ch)
	fmt.Printf("ch元素格式 %d, 容量 %d\n", leng, cap(ch)) //cap()取容量
	for i := 0; i < leng; i++ {
		result := <-ch
		fmt.Println(result)
	}

	//数据量不确定的情况下取数据使用for range
	// for result := range ch {
	// 	fmt.Println(result)
	// }
}

//运行结果为:
ch元素格式 7, 容量 10
0
1
2
3
4
5
6
5、无缓冲通道和缓冲通道

示例:无缓冲通道。

package main

import "fmt"

//通道取值
func Result(ch chan bool) {
	//取值,未取到会处于阻塞状态
	ret := <-ch
	fmt.Println(ret)
}

func main() {
	var ch = make(chan bool)
	go Result(ch)
	//传递数据,同步执行
	ch <- true
	fmt.Println("main 函数")
}

//输出结果为:
true
main 函数

示例:缓冲区通道。

package main

import (
	"fmt"
	"time"
)

func Result(ch chan bool) {
	ret := <-ch
	fmt.Println(ret)
}

func main() {
	//缓冲通道,可以异步执行
	ch := make(chan bool, 10)
	ch <- false
	//获取数据量
	fmt.Println(len(ch), cap(ch))
	go Result(ch)
	ch <- true
	time.Sleep(time.Second)
	fmt.Println("main函数")
}

//运行结果为:
1 10
false
main函数

示例:取值时判断通道是否关闭。

package main

import "fmt"

//产生数据输入通道,输入完即关闭
func send(ch chan int) {
	for i := 0; i < 10; i++ {
		ch <- i
	}
	close(ch)
}

func main() {
	var ch = make(chan int, 100)
	go send(ch)
	for {
		result, ok := <-ch
		if !ok {
			break
		}
		fmt.Print(result)
	}
}

//运行结果为:
0123456789

方式二:

package main

import "fmt"

//产生数据输入通道,输入完即关闭
func send(ch chan int) {
	for i := 0; i < 10; i++ {
		ch <- i
	}
	close(ch)
}

func main() {
	var ch = make(chan int, 100)
	go send(ch)
	//获取值
	for result := range ch {
		fmt.Print(result)
	}
}

//运行结果为:
0123456789
6、生产者和消费者模型

生产者——》产生随机数。
消费者——》计算每个随机数的每个位的数字的和。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

//随机数通道
var itemChan chan *item

//随机数结构体
type item struct {
	id  int64
	num int64
}

//求和通道
var resultChan chan *result

//求和结构体
type result struct {
	item *item
	sum  int64
}

//生产者
func producer(ch chan *item) {
	//生成随机数
	var id int64
	for {
		id++
		number := rand.Int63() //随机正整数
		//构造结构体
		tmp := &item{
			id:  id,
			num: number,
		}
		//随机数发送到通道中
		ch <- tmp
		time.Sleep(time.Millisecond * 100)
	}
}

//计算求和
func calc(i int64) int64 {
	//求和变量
	var sum int64
	for i > 0 {
		//得到每一个位数进行累加
		sum = sum + i%10
		i = i / 10
	}
	return sum
}

//消费者
func consumer(ch chan *item, resultChan chan *result) {
	for {
		tmp := <-ch
		//数据运算
		sum := calc(tmp.num) //tmp.num 就是(*tmp).name
		//构造result结构体
		resObj := &result{
			item: tmp,
			sum:  sum,
		}
		//结果传递给通道result等待进行输出
		resultChan <- resObj
	}
}

//打印结果
func printResult(resultChan chan *result) {
	for ret := range resultChan {
		fmt.Printf("id:%v,num:%v,sum:%v\n", ret.item.id, ret.item.num, ret.sum)
		time.Sleep(time.Second)
	}
}

//指定数量的goroutine
func startWorker(n int, ch chan *item, resultChan chan *result) {
	for i := 0; i < n; i++ {
		go consumer(ch, resultChan)
	}
}

func main() {
	//通道初始化,结构体指针类型
	itemChan = make(chan *item, 100)
	resultChan = make(chan *result, 100)
	//启用生产者goroutine
	go producer(itemChan)
	//消费者gotoutine
	startWorker(30, itemChan, resultChan)
	printResult(resultChan)
}



//输出结果如下
id:1,num:5577006791947779410,sum:95
id:2,num:8674665223082153551,sum:79
id:3,num:6129484611666145821,sum:81
id:4,num:4037200794235010051,sum:53
id:5,num:3916589616287113937,sum:95
id:6,num:6334824724549167320,sum:80
id:7,num:605394647632969758,sum:99
id:8,num:1443635317331776148,sum:77
id:9,num:894385949183117216,sum:89
id:10,num:2775422040480279449,sum:80
id:11,num:4751997750760398084,sum:99
...
...
...

示例:方法一,产生固定数量数据(10000),消费并输出。

package main

import (
	"fmt"
	"math/rand"
	"sync"
)

//计数器处理消费者goroutine
var sw sync.WaitGroup

//随机数通道
var itemChan chan *item

//随机数结构体
type item struct {
	id  int64
	num int64
}

//求和通道
var resultChan chan *result

//求和结构体
type result struct {
	item *item
	sum  int64
}

//生产者
func producer(itemCh chan *item) {
	//生成随机数
	var id int64
	for i := 0; i < 10000; i++ {
		id++
		number := rand.Int63() //随机正整数
		//构造结构体
		tmp := &item{
			id:  id,
			num: number,
		}
		//随机数发送到通道中
		itemCh <- tmp
	}
	//生产完成关闭itemChan通道
	close(itemCh)
}

//计算求和
func calc(i int64) int64 {
	//求和变量
	var sum int64
	for i > 0 {
		//得到每一个位数进行累加
		sum = sum + i%10
		i = i / 10
	}
	return sum
}

//消费者
func consumer(itemch chan *item, resultChan chan *result) {
	defer sw.Done()
	for tmp := range itemch {
		//数据运算
		sum := calc(tmp.num) //tmp.num 就是(*tmp).name
		//构造result结构体
		resObj := &result{
			item: tmp,
			sum:  sum,
		}
		//结果传递给通道result等待进行输出
		resultChan <- resObj
	}
}

//打印结果
func printResult(resultChan chan *result) {
	for ret := range resultChan {
		fmt.Printf("id:%v,num:%v,sum:%v\n", ret.item.id, ret.item.num, ret.sum)
	}
}

//指定数量的goroutine
func startWorker(n int, ch chan *item, resultChan chan *result) {
	for i := 0; i < n; i++ {
		go consumer(ch, resultChan)
	}
}

func main() {
	//通道初始化,结构体指针类型
	itemChan = make(chan *item, 10000)
	resultChan = make(chan *result, 10000)
	//启用生产者goroutine
	go producer(itemChan)
	//启用计数器
	sw.Add(30)
	//消费者gotoutine
	startWorker(30, itemChan, resultChan)
	//等待goroutine结束
	sw.Wait()
	//关闭resultChan通道
	close(resultChan)
	printResult(resultChan)
}


//输出结果如下
id:1,num:5577006791947779410,sum:95
id:2,num:8674665223082153551,sum:79
id:3,num:6129484611666145821,sum:81
id:4,num:4037200794235010051,sum:53
...
id:9997,num:661484091736918950,sum:87
id:9998,num:5665788214883279410,sum:94
id:9999,num:3873652276866279948,sum:108
id:10000,num:8455728612988973956,sum:112

示例:方法二,通道和goroutine配合处理指定数量数据。

package main

import (
	"fmt"
	"math/rand"
)

//随机数通道
var itemChan chan *item

//求和管道
var resultChan chan *result

//空结构体通道
var doneChan chan struct{}

//随机数结构体
type item struct {
	id  int64
	num int64
}

//求和结构体
type result struct {
	item *item
	sum  int64
}

//生产者
func producer(itemCh chan *item) {
	var id int64
	//指定数量数据
	for i := 0; i < 10000; i++ {
		//序列号自增
		id++
		number := rand.Int63() //随机正整数
		//随机数结构体封装
		tmp := &item{
			id:  id,
			num: number,
		}
		//随机数放入通道
		itemCh <- tmp
		// time.Sleep(time.Millisecond*100)
	}
	//itemCh通道关闭
	close(itemCh)
}

//求和运算
func calc(i int64) int64 {
	//求和变量
	var sum int64
	for i > 0 {
		//每位数字求和
		sum = sum + i%10
		i = i / 10
	}
	return sum
}

//消费者
func consumer(itemCh chan *item, resultCh chan *result) {
	for tmp := range itemCh {
		//数据运算
		sum := calc(tmp.num)
		//求和结构体封装
		result := &result{
			item: tmp,
			sum:  sum,
		}
		//求和数放入resultChan通道
		resultCh <- result
	}
	//传递空结构体进通道
	doneChan <- struct{}{}
}

//输出遍历
func printResult(resultChan chan *result) {
	for ret := range resultChan {
		fmt.Printf("id:%v;num:%v;sum:%v\n", ret.item.id, ret.item.num, ret.sum)
		//节奏输出控制
		// time.Sleep(time.Second)
	}
}

//消费者goroutine数量控制
func startWorker(n int, ch chan *item, resultCh chan *result) {
	//开启n数量的goroutine
	for i := 0; i < n; i++ {
		//goroutine调用函数
		go consumer(ch, resultCh)
	}
}

//监控goroutine即时关闭通道
func closeChan(n int, doneChan chan struct{}, resultChan chan *result) {
	//监控goroutine次数
	for i := 0; i < n; i++ {
		<-doneChan
	}
	//计数通道关闭
	close(doneChan)
	//求和通道关闭
	close(resultChan)
}

func main() {
	//通道初始化,结构体指针类型
	itemChan = make(chan *item, 10000)
	resultChan = make(chan *result, 10000)
	doneChan = make(chan struct{}, 30)
	//启用生产者goroutine
	go producer(itemChan)

	//消费者goroutine,高并发处理
	startWorker(30, itemChan, resultChan)
	//启用goroutine监控流程是否结束
	go closeChan(30, doneChan, resultChan)

	//调用输出函数
	printResult(resultChan)
}
7、select 多路复用

类似于用于通信的switch语句。每个case必须是一个通信 *** 作,要么是发送要么是接收。

select 随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行。一个默认的子句应该总是可运行的。

某些场景下,我们需要同事从多个通道接受数据,没有数据发生接收就会阻塞,我们往往可以使用一下方式解决。

语法格式:

select {
case communication clause:
	statement(s);
case communication clause:
	statement(s);
/* 你可以定义任何数量的case*/
default : /* 可选 */’
	statement(s);
}

每个case 都必须是一个通信。

所有channel表达式都会被求值。

所有被发送的表达式都会被求值。

如果任意某个通信可以进行,它就执行,其他被忽略。

示例:

package main

import (
	"fmt"
	"math"
	"time"
)

//select语句
var ch1 = make(chan string, 100)
var ch2 = make(chan string, 100)

//发送数据k1
func sendK1(ch chan string) {
	//产生数据
	for i := 0; i < math.MaxInt64; i++ {
		//像ch中发送数据
		ch <- fmt.Sprintf("k1:%d\n", i)
		//50毫秒放入一个数据
		time.Sleep(time.Millisecond * 50)
	}
}

//发送数据k2
func sendK2(ch chan string) {
	//产生数据
	for i := 0; i < math.MaxInt64; i++ {
		//像ch中发送数据
		ch <- fmt.Sprintf("k2:%d\n", i)
		//100毫秒放入一个数据
		time.Sleep(time.Millisecond * 100)
	}
}

func main() {
	go sendK1(ch1)
	go sendK2(ch2)
	//取值
	for {
		//如果通道都可通信,是随机公平执行其中一条,忽略其他;
		//如果通道都不可通信,且没有default语句时候,处于阻塞状态,直到可以通信为止
		select {
		case ret := <-ch1:
			fmt.Println(ret)
		case ret := <-ch2:
			fmt.Println(ret)
		default:
			fmt.Println("没有数据可取")
			time.Sleep(time.Millisecond * 500)
		}
	}
}


//输出结果如下

没有数据可取
k2:0

k2:1

k2:2

k1:0

k1:1

k2:3

k2:4

k1:2
...

示例:

package main

import "fmt"

//select case通信原理
func main() {
	var ch = make(chan int, 1)
	for i := 0; i < 10; i++ {
		//解决死锁问题
		select {
		case ch <- i:
		case ret := <-ch:
			fmt.Println(ret)
		}
	}
}


//输出结果如下
0
2
4
6
8

示例:使用select 完善生产者和消费者模型,键盘输入回车终止数据。

package main

import (
	"fmt"
	"math/rand"
	"os"
	"time"
)

// 传送随机数通道(使用指针传递)
var itemChan chan *item

// 传送求和值通道
var resultChan chan *result

// 空结构体传输通道
var doneChan chan struct{}

//随机数字结构体
type item struct {
	id  int64
	num int64
}

//求和结构体
type result struct {
	item *item
	sum  int64
}

//生产者函数
func producer(ch chan *item) {
	// 1.生成随机数
	var id int64
	for {
		// 序列号自增
		id++
		number := rand.Int63() // 随机生成正整数
		// 随机数结构体封装
		tmp := &item{
			id:  id,
			num: number,
		}
		// 2. 随机数发送到通道中
		ch <- tmp
	}
}

//计算求和函数
func calc(num int64) int64 {
	// 和:值
	var sum int64
	for num > 0 {
		// 得到每一个位数进行累加
		sum = sum + num%10
		num = num / 10
	}
	return sum
}

//消费者函数
func consumer(ch chan *item, resultChan chan *result) {
	// 从 itemChan 通道中取随机数结构体指针
	for tmp := range ch {
		sum := calc(tmp.num) // tmp.num 就是 (*tmp).num,会自动识别指针
		// 构造 result 结构体
		resObj := &result{
			item: tmp,
			sum:  sum,
		}
		// 结果传递通道 resultChan 等待进行输出
		resultChan <- resObj
	}
}

// 打印结果
func printResult(doneChan chan struct{}, resultChan chan *result) {
	for {
		// 等待 doneChan 输入退出输出信息
		select {
		case ret := <-resultChan:
			fmt.Printf("id:%v,num:%v,sum:%v\n", ret.item.id, ret.item.num, ret.sum)
			time.Sleep(time.Second)
		case <-doneChan:
			return
		}
	}
}

// 监听键盘输入字符传递给 doneChan 通道
func inputChan(doneChan chan struct{}) {
	// 一个字符的输入
	tmp := [1]byte{}
	// 从标准输入获取值,未输入一直处于等待状态
	os.Stdin.Read(tmp[:])
	doneChan <- struct{}{}
}

// 灵活启用指定数量的 goroutine
// n 为要开启的 goroutine 数量
func startWorker(n int, ch chan *item, resultChan chan *result) {
	for i := 0; i < n; i++ {
		go consumer(ch, resultChan)
	}
}

func main() {
	// 通道初始化,结构体指针类型
	itemChan = make(chan *item, 100)
	resultChan = make(chan *result, 100)
	doneChan = make(chan struct{}, 1)
	// 启用生产者 goroutine
	go producer(itemChan)
	// 消费者 goroutine,高并发处理
	startWorker(30, itemChan, resultChan)
	go inputChan(doneChan)
	// 打印结果
	printResult(doneChan, resultChan)
}
8、单向通道

在函数中稚嫩那个发送值而不能接收值为只写通道。

只能接收不能发送值称为只读通道。

可以让代码意向更明确,更清晰。

示例:

package main

import "fmt"

var ch chan int

//只写通道,针对一个函数中实现
func writeCh(ch chan<- int) {
	ch <- 10
}

//只读通道
func read(ch <-chan int) int {
	ret := <-ch
	return ret
}

func main() {
	ch = make(chan int, 1)
	writeCh(ch)
	fmt.Println(read(ch))
}


//输出结果如下
10
总结

出现panic情况:
通道关闭以后,可以取值,但是不能传递值,会出现panic。
通道关闭后,如果再次关闭通道,会panic。
出现死锁的情况:
通道中元素的容量超出也会死锁。
没有传递元素到通道里,直接取值,会死锁。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/995681.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存