怎么使用golang的channel做广播

怎么使用golang的channel做广播,第1张

概述怎么使用golang的channel做广播 使用golang中的channel做广播需要使用到golang并发模式中的扇出模式,也就是说多个接入点监听一个输入源。这种模式的结果是,只要输入源输入一个消息,任何一个监听者都能获取到这个消息。这里仅有一个例外就是channel关闭。这个关闭将所有监听者都关闭,这就是扇出模式。删除模式简单定义为:多个函数可以从同一个channel读取数据,直到这个cha 怎么使用golang的channel做广播

使用golang中的channel做广播需要使用到golang并发模式中的扇出模式,也就是说多个接入点监听一个输入源。这种模式的结果是,只要输入源输入一个消息,任何一个监听者都能获取到这个消息。这里仅有一个例外就是channel关闭。这个关闭将所有监听者都关闭,这就是扇出模式。删除模式简单定义为:多个函数可以从同一个channel读取数据,直到这个channel关闭。

当监听者数量已知时

让每个worker监听专有的广播channel,并且从主channel中派发消息到每一个专有的广播channel中。

type worker struct {    name   string    source chan interface{}    quit   chan struct{}}func (w *worker) Start() {    w.source = make(chan interface{})    go func() {        for {            select {            case msg := <-w.source:                fmt.Println("==========>> ",w.name,msg)            case <-w.quit: // 后面解释                fmt.Println(w.name," quit!")                return            }        }    }()}

此时定义两个worker:

workers := []*worker{&worker{},&worker{}}for _,w := range workers { w.Start() }

派发消息:

go func() {        msg := "test"        count := 0        var sendMsg string        for {            select {            case <-globalQuit:                fmt.Println("Stop send message")                return            case <-time.Tick(500 * time.Millisecond):                count++                sendMsg = fmt.Sprintf("%s-%d",msg,count)                fmt.Println("Send message is ",sendMsg)                for _,wk := range workers {                    wk.source <- sendMsg                }            }        }    }()
当监听者数量为未知时:

在这种情况下,上述解决办法依然可行。唯一不同的地方在于,无论什么时候需要一个新的worker时,仅仅只需要新建一个worker,并开启它,然后push到workers的slice中。但这种方式需要一个线程安全的slice,需要一个同步锁。其实现如下:

type threadSafeSlice struct {    sync.Mutex    workers []*worker}func (slice *threadSafeSlice) Push(w *worker) {    slice.Lock()    defer slice.Unlock()    slice.workers = append(slice.workers,w)}func (slice *threadSafeSlice) Iter(routine func(*worker)) {    slice.Lock()    defer slice.Unlock()    for _,worker := range slice.workers {        routine(worker)    }}

任何时候,需要一个新的worker时:

w := &worker{}w.Start()threadSafeSlice.Push(w)

然后派发消息修改如下伪代码所示:

go func() {    for {        msg := <- ch        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })    }}()
最后一个问题:绝对不要让一个goroutine挂起

一个好的实践是:绝对不要让一个goroutine挂起。所以当完成监听后,必须关闭所有激活的goroutine。这将通过worker中的quitchannel进行:

首先,创建一个全局的quit信号channel:

globalQuit := make(chan struct{})

并且在任何一个新建的worker时,将globalQuit分配给这个worker的quit channel

worker.quit = globalQuit

然后当需要关闭所有的worker时,仅仅只需要这么做:

close(globalQuit)

因此close将被所有监听的goroutine所接受。所有的goroutine将被返回。

最后完善后的代码如下所示:

package mainimport (    "fmt"    "sync"    "time")import (    "os"    "os/signal"    "syscall")type threadSafeSlice struct {    sync.Mutex    workers []*worker}func (slice *threadSafeSlice) Push(w *worker) {    slice.Lock()    defer slice.Unlock()    slice.workers = append(slice.workers,worker := range slice.workers {        routine(worker)    }}type worker struct {    name   string    source chan interface{}    quit   chan struct{}}func (w *worker) Start() {    w.source = make(chan interface{})    go func() {        for {            select {            case msg := <-w.source:                fmt.Println("==========>> "," quit!")                return            }        }    }()}func main() {    globalQuit := make(chan struct{})    tss := &threadSafeSlice{}    // 1秒钟添加一个新的worker至slice中    go func() {        name := "worker"        for i := 0; i < 10; i++ {            time.Sleep(1 * time.Second)            w := &worker{                name: fmt.Sprintf("%s%d",name,i),quit: globalQuit,}            w.Start()            tss.Push(w)        }    }()    // 派发消息    go func() {        msg := "test"        count := 0        var sendMsg string        for {            select {            case <-globalQuit:                fmt.Println("Stop send message")                return            case <-time.Tick(500 * time.Millisecond):                count++                sendMsg = fmt.Sprintf("%s-%d",sendMsg)                tss.Iter(func(w *worker) { w.source <- sendMsg })            }        }    }()    // 截获退出信号    c := make(chan os.Signal, 1)    signal.Notify(c,syscall.SIGINT,syscall.SIGTERM)    for sig := range c {        switch sig {        case syscall.SIGINT,syscall.SIGTERM: // 获取退出信号时,关闭globalQuit,让所有监听者退出            close(globalQuit)            time.Sleep(1 * time.Second)            return        }    }}
总结

以上是内存溢出为你收集整理的怎么使用golang的channel做广播全部内容,希望文章能够帮你解决怎么使用golang的channel做广播所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1281344.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-09
下一篇 2022-06-09

发表评论

登录后才能评论

评论列表(0条)

保存