Go select详解

Go select详解,第1张

Go select调度

Go中的select类似于Unix系统中的多路复用模型,当检测到有IO变化的时候,就会执行对应的case下的语句

select 基本功能,接收功能

下面的代码,我们创建了一个生成器,生成器随机睡眠1-2000ms并返回一个数值给C1和C2,然后channel有值以后,我们会把值写入到consumer(消费者)中,消费者会接收我们的数据并处理

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// 生成器随机睡眠1-2000ms并返回一个数值,供给select测试使用
func generator() chan int {
	seed := make(chan int)
	go func() {
		i := 0
		for {
			// 随机睡眠 1~2000ms
			time.Sleep(time.Duration(rand.Intn(2001)) * time.Millisecond)
			seed <- i
			i++
		}
	}()
	return seed
}

// 创建一个消费者
func createConsumer() chan int {
	c := make(chan int)
	go func() {
		for n := range c {
			fmt.Println("当前消费者收到数据:", n)
		}
	}()
	return c
}

func main() {

	c1, c2 := generator(), generator()
	consumer := createConsumer()

	for {
		select {
		case n := <-c1:
			consumer <- n
		case n := <-c2:
			consumer <- n
		}
	}
}
select 实现收发功能

上面的代码有个问题,每次收到c1或者c2的数据以后,再发送给consumer的时候,会有一个阻塞,

另外加一个case,把接受到的数据直接发送出去,这样就不会阻塞住了

func main() {
	c1, c2 := generator(), generator()
	consumer := createConsumer()

	n := 0
	hasValue := false
	for {
		var activeConsumer chan int
		if hasValue {
			activeConsumer = consumer
		}
		select {
		case n = <-c1:
			hasValue = true
		case n  = <-c2:
			hasValue = true
		case activeConsumer <- n: // 此处可以把接收的数据直接发送出去,也可以认为这里是把具体的任务投递给了消费者,然后hasValue置为false,可以继续接收新数据了
			hasValue = false
		}
	}
}
select 实现收发功能(优化版本)

上面的情况,只适用于消费者消费速度足够的情况,实际上我们在日常工作中,经常会出现消息投递过快,而消费者消费数据能力过慢导致消息堆积的问题,这时候我们要把接收的消息进行排队处理
我们可以修改consumer函数,适当的睡眠一段时间来模拟消费者速度过慢的场景,此时消费者最快十秒处理一条数据,消费者1-2000ms生产一条数据,这时候会出现消息阻塞

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// 生成器随机睡眠1-2000ms并返回一个数值,供给select测试使用
func generator() chan int {
	seed := make(chan int)
	go func() {
		i := 0
		for {
			// 随机睡眠 1~2000ms
			time.Sleep(time.Duration(rand.Intn(2001)) * time.Millisecond)
			seed <- i
			i++
		}
	}()
	return seed
}

// 创建一个消费者
func createConsumer() chan int {
	c := make(chan int)
	go func() {
		for n := range c {
			time.Sleep(time.Second * 10) // <<<<<<<<<<<<<<<<<<注意,我们增加了这里的睡眠时间来模拟消费数据延迟的场景
			fmt.Println("当前消费者收到数据:", n)
		}
	}()
	return c
}

func main() {
	c1, c2 := generator(), generator()
	consumer := createConsumer()

	var values []int
	n := 0
	for {
		var activeValue int
		var activeConsumer chan int
		if len(values) > 0 {
			activeConsumer = consumer
			activeValue = values[0]
		}
		select {
		case n = <-c1:
			// 接收到消息以后存到values队列中
			values = append(values, n)
		case n = <-c2:
			// 接收到消息以后存到values队列中
			values = append(values, n)
		case activeConsumer <- activeValue:
			values = values[1:] // 删除当前的元素
		}
	}
}

select 实现计时器功能

有时候我们希望在十秒后退出系统怎么处理呢,只需要增加case判断超时时间即可

func main() {
	c1, c2 := generator(), generator()
	consumer := createConsumer()
	// 十秒钟后这个函数会返回一个值
	timerPowerOff := time.After(time.Second * 10) // <<<<<<<<<<<<<<<<<注意看这里

	var values []int
	n := 0
	for {
		var activeValue int
		var activeConsumer chan int
		if len(values) > 0 {
			activeConsumer = consumer
			activeValue = values[0]
		}
		select {
		case n = <-c1:
			// 接收到消息以后存到values队列中
			values = append(values, n)
		case n = <-c2:
			// 接收到消息以后存到values队列中
			values = append(values, n)
		case activeConsumer <- activeValue:
			values = values[1:] // 删除当前的元素
		case <-timerPowerOff: // 定时关闭,<<<<<<<<<<<<<<<<<注意看这里
			fmt.Println("十秒钟到了,系统将退出,bye bye!!")
			return
		}
	}
}
select 实现单个请求超时检测功能

单个请求超时跟上面差不多,我们generator函数产生消息的时间是 1-2000ms,那么我们判断,如果消息投递时间间隔超过500ms,那么打印time out,代码如下

func main() {
	c1, c2 := generator(), generator()
	consumer := createConsumer()

	timerPowerOff := time.After(time.Second * 10)

	var values []int
	n := 0
	for {
		var activeValue int
		var activeConsumer chan int
		if len(values) > 0 {
			activeConsumer = consumer
			activeValue = values[0]
		}
		select {
		case n = <-c1:
			// 接收到消息以后存到values队列中
			values = append(values, n)
		case n = <-c2:
			// 接收到消息以后存到values队列中
			values = append(values, n)
		case activeConsumer <- activeValue:
			values = values[1:] // 删除当前的元素
		case <-timerPowerOff: // 定时关闭
			fmt.Println("十秒钟到了,系统将退出,bye bye!!")
			return
		case <-time.After(time.Millisecond * 500): // 五百秒接收不到消息打印下面的信息
			fmt.Println("当前消息接收超时, timeout!")
		}
	}
}

select 实现定时功能

有的时候可能做一些定时检查报告的内容,如果发现系统异常及时报告给管理员,可以使用下面增加的case tick 方法

func main() {
	c1, c2 := generator(), generator()
	consumer := createConsumer()
	timerPowerOff := time.After(time.Second * 10)
	tick := time.Tick(time.Second) //  <<<<<<<<<<<<<<<<<注意看这里,每隔一秒钟,系统会往这个channel写一条数据过来,这时候会被select捕获到
	var values []int
	n := 0
	for {
		var activeValue int
		var activeConsumer chan int
		if len(values) > 0 {
			activeConsumer = consumer
			activeValue = values[0]
		}
		select {
		case n = <-c1:
			// 接收到消息以后存到values队列中
			values = append(values, n)
		case n = <-c2:
			// 接收到消息以后存到values队列中
			values = append(values, n)
		case activeConsumer <- activeValue:
			values = values[1:] // 删除当前的元素
		case <-timerPowerOff: // 定时关闭
			fmt.Println("十秒钟到了,系统将退出,bye bye!!")
			return
		case <-time.After(time.Millisecond * 500): // 五百秒接收不到消息打印下面的信息
			fmt.Println("当前消息接收超时, timeout!")
		case <-tick: // 定时功能,一秒执行一次 <<<<<<<<<<<<<<<<<注意看这里
			fmt.Println("一秒钟时间到了,干点儿啥好呢")
		}
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/994129.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存