Go中的select类似于Unix系统中的多路复用模型,当检测到有IO变化的时候,就会执行对应的case下的语句
select 基本功能,接收功能下面的代码,我们创建了一个生成器,生成器随机睡眠1-2000ms并返回一个数值给C1和C2,然后channel有值以后,我们会把值写入到consumer(消费者)中,消费者会接收我们的数据并处理
package main
import (
"fmt"
"math/rand"
"time"
)
// 生成器随机睡眠1-2000ms并返回一个数值,供给select测试使用
func generator() chan int {
seed := make(chan int)
go func() {
i := 0
for {
// 随机睡眠 1~2000ms
time.Sleep(time.Duration(rand.Intn(2001)) * time.Millisecond)
seed <- i
i++
}
}()
return seed
}
// 创建一个消费者
func createConsumer() chan int {
c := make(chan int)
go func() {
for n := range c {
fmt.Println("当前消费者收到数据:", n)
}
}()
return c
}
func main() {
c1, c2 := generator(), generator()
consumer := createConsumer()
for {
select {
case n := <-c1:
consumer <- n
case n := <-c2:
consumer <- n
}
}
}
select 实现收发功能
上面的代码有个问题,每次收到c1或者c2的数据以后,再发送给consumer的时候,会有一个阻塞,
另外加一个case,把接受到的数据直接发送出去,这样就不会阻塞住了
func main() {
c1, c2 := generator(), generator()
consumer := createConsumer()
n := 0
hasValue := false
for {
var activeConsumer chan int
if hasValue {
activeConsumer = consumer
}
select {
case n = <-c1:
hasValue = true
case n = <-c2:
hasValue = true
case activeConsumer <- n: // 此处可以把接收的数据直接发送出去,也可以认为这里是把具体的任务投递给了消费者,然后hasValue置为false,可以继续接收新数据了
hasValue = false
}
}
}
select 实现收发功能(优化版本)
上面的情况,只适用于消费者消费速度足够的情况,实际上我们在日常工作中,经常会出现消息投递过快,而消费者消费数据能力过慢导致消息堆积的问题,这时候我们要把接收的消息进行排队处理
我们可以修改consumer函数,适当的睡眠一段时间来模拟消费者速度过慢的场景,此时消费者最快十秒处理一条数据,消费者1-2000ms生产一条数据,这时候会出现消息阻塞
package main
import (
"fmt"
"math/rand"
"time"
)
// 生成器随机睡眠1-2000ms并返回一个数值,供给select测试使用
func generator() chan int {
seed := make(chan int)
go func() {
i := 0
for {
// 随机睡眠 1~2000ms
time.Sleep(time.Duration(rand.Intn(2001)) * time.Millisecond)
seed <- i
i++
}
}()
return seed
}
// 创建一个消费者
func createConsumer() chan int {
c := make(chan int)
go func() {
for n := range c {
time.Sleep(time.Second * 10) // <<<<<<<<<<<<<<<<<<注意,我们增加了这里的睡眠时间来模拟消费数据延迟的场景
fmt.Println("当前消费者收到数据:", n)
}
}()
return c
}
func main() {
c1, c2 := generator(), generator()
consumer := createConsumer()
var values []int
n := 0
for {
var activeValue int
var activeConsumer chan int
if len(values) > 0 {
activeConsumer = consumer
activeValue = values[0]
}
select {
case n = <-c1:
// 接收到消息以后存到values队列中
values = append(values, n)
case n = <-c2:
// 接收到消息以后存到values队列中
values = append(values, n)
case activeConsumer <- activeValue:
values = values[1:] // 删除当前的元素
}
}
}
select 实现计时器功能
有时候我们希望在十秒后退出系统怎么处理呢,只需要增加case判断超时时间即可
func main() {
c1, c2 := generator(), generator()
consumer := createConsumer()
// 十秒钟后这个函数会返回一个值
timerPowerOff := time.After(time.Second * 10) // <<<<<<<<<<<<<<<<<注意看这里
var values []int
n := 0
for {
var activeValue int
var activeConsumer chan int
if len(values) > 0 {
activeConsumer = consumer
activeValue = values[0]
}
select {
case n = <-c1:
// 接收到消息以后存到values队列中
values = append(values, n)
case n = <-c2:
// 接收到消息以后存到values队列中
values = append(values, n)
case activeConsumer <- activeValue:
values = values[1:] // 删除当前的元素
case <-timerPowerOff: // 定时关闭,<<<<<<<<<<<<<<<<<注意看这里
fmt.Println("十秒钟到了,系统将退出,bye bye!!")
return
}
}
}
select 实现单个请求超时检测功能
单个请求超时跟上面差不多,我们generator
函数产生消息的时间是 1-2000ms,那么我们判断,如果消息投递时间间隔超过500ms,那么打印time out,代码如下
func main() {
c1, c2 := generator(), generator()
consumer := createConsumer()
timerPowerOff := time.After(time.Second * 10)
var values []int
n := 0
for {
var activeValue int
var activeConsumer chan int
if len(values) > 0 {
activeConsumer = consumer
activeValue = values[0]
}
select {
case n = <-c1:
// 接收到消息以后存到values队列中
values = append(values, n)
case n = <-c2:
// 接收到消息以后存到values队列中
values = append(values, n)
case activeConsumer <- activeValue:
values = values[1:] // 删除当前的元素
case <-timerPowerOff: // 定时关闭
fmt.Println("十秒钟到了,系统将退出,bye bye!!")
return
case <-time.After(time.Millisecond * 500): // 五百秒接收不到消息打印下面的信息
fmt.Println("当前消息接收超时, timeout!")
}
}
}
select 实现定时功能
有的时候可能做一些定时检查报告的内容,如果发现系统异常及时报告给管理员,可以使用下面增加的case tick 方法
func main() {
c1, c2 := generator(), generator()
consumer := createConsumer()
timerPowerOff := time.After(time.Second * 10)
tick := time.Tick(time.Second) // <<<<<<<<<<<<<<<<<注意看这里,每隔一秒钟,系统会往这个channel写一条数据过来,这时候会被select捕获到
var values []int
n := 0
for {
var activeValue int
var activeConsumer chan int
if len(values) > 0 {
activeConsumer = consumer
activeValue = values[0]
}
select {
case n = <-c1:
// 接收到消息以后存到values队列中
values = append(values, n)
case n = <-c2:
// 接收到消息以后存到values队列中
values = append(values, n)
case activeConsumer <- activeValue:
values = values[1:] // 删除当前的元素
case <-timerPowerOff: // 定时关闭
fmt.Println("十秒钟到了,系统将退出,bye bye!!")
return
case <-time.After(time.Millisecond * 500): // 五百秒接收不到消息打印下面的信息
fmt.Println("当前消息接收超时, timeout!")
case <-tick: // 定时功能,一秒执行一次 <<<<<<<<<<<<<<<<<注意看这里
fmt.Println("一秒钟时间到了,干点儿啥好呢")
}
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)