第十章 Channel--第四天 完结

第十章 Channel--第四天 完结,第1张

概述channel用于goroutine之间的通讯. 其内部实现了同步, 确保并发安全, 多个goroutine同时访问, 不需要加锁. 对channel *** 作行为做如下总结: 1. ch <- :

channel用于goroutine之间的通讯. 其内部实现了同步,确保并发安全,多个goroutine同时访问,不需要加锁.

对channel *** 作行为做如下总结:

1. ch <- : 写入channel

2. ch -> :读出channel

3. clouse : 关闭channel

 

golang 中大部分类型都是值类型,只有 slice / channel / map 是引用类型

 

一. channel的语法1. channel的定义方法
var c chan int //定义了一个int类型的chan  此时c=nil

定义一个有初始值的channel.

c := make(chan int)

  

2. channel是goroutine之间的通讯. 所有有一个goroutine发数据,就要有一个goroutine收数据
package mainimport (    "fmt"    time")func main() {    c := make(chan int)    go func() {        for  {
       // 取出channel n :
= <- c fmt.Println(n) } }() // 放入channel c <- 1 c <- 2 // 让主进程停留1s,保证channel中的数据全部取出后,主协程再关闭 time.Sleep(time.Second)}

有一个goroutine发数据,必须有一个goroutine收数据,不然就不能放进去了,会报异常deadlock

Fatal error: all goroutines are asleep - deadlock!

 

3. 函数是一等公民,channel也是,channel也能作为参数,也能作为返回值

a. channel作为参数传入

) chan也可以作为参数传入. 也可以作为返回值func getChan(c chan int) {      {        n := <- c        fmt.Println(n)    }}func chanDemo () {    c := make(chan )    go getChan(c)
    c <- 1 c <- 2  
time.Sleep(time.Second)
}

func main() {
  chanDemo()
}

b. channel用作数组

 chan也可以作为参数传入. 也可以作为返回值func worker(i int,c chan ) {     c        fmt.Printf(number: %d,worker,%c \n,i,n)    }}func chanDemo () {     开了10个协程,让10个协程分别取各自的数据    var channel [10]chan int    for i := 0; i<10; i++ {        channel[i] = make(chan )        go func(c chan  ) {            worker(00; i <  {        channel[i] <- 'a' + i    }    A    time.Sleep(time.Second)}func main() {    chanDemo()}

分配一个管道数组,里面有10个管道. 向每个管道里放两个数据. 打印. 

 c. channel作为返回值

 chan也可以作为参数传入. 也可以作为返回值func createWorker(i int) chan int {    c := make(chan )    go func() {          {            n := <- c            fmt.Printf(return c}func chanDemo () {    var channel [10]chan int     {        channel[i] = createWorker(i)    }        time.Sleep(time.Second)}func main() {    chanDemo()}

 

4 给channel定义方向.

channel可以收数据也可以发数据. 那么我们返回的channel到底是收数据的还是发数据的呢? 我们可以告诉调用者. 

 chan<- int)    go func() {        // goroutine里,channel是发数据的. 那么我们定义返回的channel只能是收数据的        return c}func chanDemo () {    var channel [10]chan<- int     {        channel[i] = createWorker(i)    }        time.Sleep(time.Second)}func main() {    chanDemo()}

 

5. bufferedChannel 带有缓冲区的channel

我们知道如果创建了一个channel,往里面放了数据,但是没有人接收,那么就会deadlock死锁. 也就是必须要有人立刻能够接收走. 这就是要求,要求立刻被收走. 如果没有,就报错. 我们可以给他一个缓冲,让他在几个范围内可以不被立刻收走,给收数据方一个缓冲的时间

func bufferedChannel() {    c := make(chan int,3)    c <- 23    c <- 4}func main() {    bufferedChannel()

上面的demo,定义了带有三个缓冲的channel. 里面3个以内数据不会报deadlock. 超过三个还没有被取走,才会包deadlock

)func worker(i int,c chan int) {    for  {        n := <- c        fmt.Printf("number: %d,%d \n",n)    }}func bufferedChannel() {    c := make(chan 3)    go worker(0    time.Sleep(time.Second)}func main() {    bufferedChannel()}
6. channel Close()channel 发完了,我没有数据可以再发了,是可以close的. channel 的close是有发数据方close.channel的发送方close了,但是接收方依然是可以接收到数据的. 接收数据的返回值跟channel的类型有关系接收方可以通过ok判断是否接收到数据了
)func worker(i  goroutine里,channel是发数据的. 那么我们定义返回的channel只能是收数据的      {        n,ok := <- c        if !ok {            // 如果没有数据了,就返回            break        }        fmt.Printf( channel发数据方,数据发完了,是可以close的func channelClose() {    c := make(chan )    go worker('bcd'     调用close 告诉接收方,数据已经发完了.    close(c)    time.Sleep(time.Second)}func main() {    bufferedChannel()    channelClose()}

另一种判断是否有数据的方法,使用range来判断.

    close(c)    time.Sleep(time.Second)}func main() {        channelClose()}

 

 

二. Channel的应用: 不要通过共享内存来通信,通过通信来共享内存

通常,比如java是通过共享内存来通信. 比如定义一个公共的flag,这个flag就是共享的一块内存空间. 他的值变了,通知调用者. 这就是通过共享内存来通信. 

 

使用共享内存的话在多线程的场景下为了处理竞态,需要加锁,使用起来比较麻烦。另外使用过多的锁,容易使得程序的代码逻辑坚涩难懂,并且容易使程序死锁,死锁了以后排查问题相当困难,特别是很多锁同时存在的时候。

go语言的channel保证同一个时间只有一个goroutine能够访问里面的数据,为开发者提供了一种优雅简单的工具,所以go原生的做法就是使用channle来通信,而不是使用共享内存来通信。

 

下面来感受一下go的通过通信来共享内存. 

三.  使用channel等待任务结束

我们在上面的demo中,都会有一句话

time.Sleep(time.Second)

让主程序休眠1秒钟.  否则,协程还没执行完,主程序就退出了

再来分析,等待的目的是什么呢? 那就是等协程都执行完了,  主程序再退出. 换个思路,我不用time.sleep,能不能协程执行完了,主动告诉主线程,我执行完了,退出吧呢?

可以的. 

1. 使用通信来共享内存,用channel实现.
go的原则: 不要通过共享内存来通信,通信来共享内存. 

 

go中通信用什么呢? 使用channel

 定义一个bool类型的chan,用来和外部通信. 事情做完了,给外面回复一个donefunc work(i for n := range w.c {        fmt.Printf( w.done <- true    }}type worker struct {    c chan int        // 传递字母的管道    done chan bool        // 标记字母传递完成的管道}w = worker{        c : make(chan int),done : make(chan bool),}    go work(i,w)     w}func chanDemo () {    var wo [10]worker 
  
// 第一步,第二步
wo[i] createWorker(i,wo[i]) } // 第三步 for i := 0; i < 10; i++ { wo[i].c <- 'a' + i // 这里有一个通道,一直等着. 等着取数据. <- wo[i].done } { wo[i].c <- i <- wo[i].done } time.Sleep(time.Second)}func main() { chanDemo()}

分析:

1. 定义了一个worker类型,里面有两个管道. 第一个管道是用来传输字母的. 第二个管道用来标记传送行为是否完成2. 接下来看chanDemo,chanDemo是一个主goroutine. 在这里:  第一步: 创建了10个工作者.   第二步: 开了10个goroutine 用来传输字母. 开goroutine调用createWorker,然后worker的工作是work  第三步: for循环为每一个goroutine添加字母. 然后等待.......直到对应的done channel完成,取出结果. 继续往下执行.
 {   wo[i].c <-  i    这里有一个通道,一直等着. 等着取数据.   <- wo[i].done}

  放数据: wo[i].c <- 'a' + i . 然后就等待......等待到什么时候呢? wo[i].done中有数据可以取出.

  这就让主goroutine保证了10个goroutine都执行完以后,在继续往后执行.

  这解释了为什么goroutine可以让主goroutine等待的原因

输出结果

number: 4567899,J 

这样打印的结果是按照顺序执行的. 一个goroutine执行完了,才能往另一个goroutine中放数据,这样效率太低了. 我们换一种方式,让他不停的打印. 最后一起等待执行完成.

go func() {            w.done <- true        }()    }}type worker struct {    c chan int         传递字母的管道    done chan bool         标记字母传递完成的管道} worker{        c : make(chan ),done : make(chan boolvar work []worker     {        work[i] = createWorker(i,work[i])    }    for i,w := range work {        w.c <-  i    }    for _,w := range work  {       放了两次,所以要等待两次都处理完,在执行后面的结果        <-w.done      <-w.done   }       chanDemo()}

这里有两个变化,

1. 我在取结果done的时候,没有在线等执行完. 而是,你们去执行吧,我最后来收结果,收的顺序也是从1-9的顺序收的. 每个goroutine要有两个结果.

2. 在work具体执行完成的地方,要定义一个协程. 这样程序才能正常运行,否则会报deadline异常. 为什么会报异常呢?

  因为,有两次放数据,第一次放完了,往管道done里写了一个数据,结果没有被收走,又放了一次..... 所以就发生死锁了.

 

新开一个goroutine,让他并行的发done. 就可以了

疑问: 为什么定义成goroutine,他就不会deadline了呢? goroutine还有什么其他的含义?

比如下面这段程序

func main() {    c := make(chan )    c <- }

这么写汇报deadline,因为管道只有发送方,没有接收方. 要求必须既有发送方又有接收方

但是这么写就没问题:

func main() {    c := make(chan )    go func() {        c <-         c <-     }()}

为啥呢?

经过一番研究过终于明白了,看下面这段程序

func main() {    bufferedChannel()    channelClose()    c := make(chan )    c <- 0    go func() {        log.Info("11")        c <- 1        log.Info("22")        c <- 2        log.Info("33")        c <- 3        log.Info("44")        c <- 4    }()    log.Info("1,",<- c)    log.Info("2,<- c )    log.Info("3,1)">log.Info("4,<- c )    time.Sleep(time.Second)}

你运行执行一下,看看结果,只打印出了11

[2020/02/21 07:09:58] channelDemo.go:83 [Info] 11Process finished with exit code 0

原来goroutine中的代码一直在等待,知道有人要收数据了,他才会发

c <- 0    go func() {        log.Info(11)        c <-         log.Info(223344    }()    log.Info("1,<- c)        time.Sleep(time.Second)}

这时的打印结果是

[12:08] channelDemo.go:[85 [Info] 2293 [Info] 1,1)">1

看到了吧,有人收,goroutine才会发,收几个,发几个. 其他的保留待发.

 

2. 使用WaitGroup等待任务的结束

sync. WaitGroup()是系统自带的一个等待任务全部完成的工具

Add: 添加的任务个数Wait: 等待全部goroutine完成Done: 某一个goroutine完成

我们第一步: 定义一个WaitGroup

 开启了一个等待任务,我们通过waitGroup来等待任务的完成var wg sync.WaitGroup

第二步: 添加20个任务,因为我们知道有20个任务就直接添加20就好了,如果不知道,可以在for循环里一个个添加

wg.Add(20)

第三步: 等待20个任务全部结束

wg.Wait()

第四步: 完成了一个任务,就标记他为done

sync w.wg.Done()        }()    }}type worker int              传递字母的管道    wg *sync.WaitGroup        // 标使用waitgroup来标记同步完成sync.WaitGroup) worker {    w :=]worker    var wg sync.WaitGroup     work[i] = createWorker(i,&wg)    }    wg.Add(20 i    }    wg.Wait()        chanDemo()}

 

3. 在go里面函数是一等公民,其实等待任务结束的过程中,或者结束时要做很多事情. 我们要是定义成某一个参数,那就只能接收这个参数了,其他都参数不行. 

如何能够达到扩展的目的呢?定义成函数

w.wg()    }}type worker  wg func()         func() {            fmt.Println("事情1")            wg.Done()            fmt.Println("事情2")        },    }    go work(i,我们通过waitGroup来等待任务的完成    var wg sync.WaitGroup     {        work[i] = createWorker(i,&wg)    }    wg.Add( i    }    wg.Wait()        chanDemo()}
四. 使用channel进行树的遍历

这里使用channel进行树的遍历,是对channel的一个应用.

之前我们对树遍历后处理使用的是函数.我们也可以用channel

比如: 查找树中最大的值

第一步: 循环遍历获取树,然后将所有树节点放入到channel中. 返回一个管道

第二步: 从管道中取出树的节点,进行计算

第三步: 在第一步中,把所有节点都添加到管道中以后,一定要close

 定一个管道,循环遍历,把遍历后的节点添加到管道中func (n *TreeNode) TraveresForChannel() chan *TreeNode {    out := make(chan *TreeNode)    go func() {        n.TraveresFunc(func(node *TreeNode) {            out <- node        })        close(out)    }()    return }
 从管道中取出所有节点,取最大值    max := 0    for c := range root.TraveresForChannel() {        if c.Value > max {            max = c.Value        }    }    fmt.Println(max)

这个逻辑比较清晰. 

五. 用select进行调度

 1. 首先,我们来从官方文档看一下有关select的描述:

A select" statement chooses which of a set of possible send or receive operations will proceed. 
It looks similar to a switch statement but with the cases all referring to communication operations.一个select语句用来选择哪个case中的发送或接收 *** 作可以被立即执行。它类似于switch语句,但是它的case涉及到channel有关的I/O *** 作。

或者换一种说法,select就是用来监听和channel有关的IO *** 作,当 IO *** 作发生时,触发相应的动作。

比如: 现在有两个channel  A 和B,我要从A 和 B中取值,谁先来,就取出谁. 怎么做呢? 

package mainimport func main() {    var A,B chan int    select {    case n := <- A:        fmt.Println(select from A: case n := <- B:        fmt.Println(select from A:default:        fmt.Println(select from default)    }}

这里定义了A和B两个channel,两个channel都是nil. 下面通过select来选择执行,会走一个默认的default.

输出结果:

select from default

这里A和B都不会有输出,所以,就走默认的default,这是非阻塞的方式输出内容. channel是阻塞的,如果实现非阻塞的呢? 那就是使用select.....default实现

如果没有default又不断的从A和B中取值,就会deadlock

B:        fmt.Println(

 

 我们生成一个channelA和B, 让A和B不停的产生数据,然后看看select中是否能取出来

math/rand 生成A 和 Bfunc generator() chan int {    out := make(chan int)    go func() {        i := 0        for  {            // 随机的在1.5s以内休眠            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)            out <- i            i ++        }    }()    return out}func main() {     generator(),generator()     不停的从A和B中收数据      {         {        A:            fmt.Println(B:            fmt.Println(select from B:如果有default会一直执行default

生成的A和B,取出来的值,交给工作者,让工作者打印出来. 工作者是个管道,他会一直等着,有工作来了,就工作,没有的时候,等待

 生成A 和 Bfunc generator() chan out := make(chan )    go func() {        i := 0          {             随机的在1.5s以内休眠            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)             i            i ++        }    }()    }// 工作者做的具体的工作内容--工作内容,从管道里不停的取数据func doWork(i int,c chan int) {    for cc := range c  {        fmt.Printf("取出的i:%d,取出的值时:%d\n",cc)    }}// 创建工作者func createWorker(i int) chan int {    in := make(chan int)    // 开始工作    go doWork(i,in)    return inA:            w <- n        B:            w <- n        }    }}

有一个生成数据的管道,然后从生成数据的管道里取出数据,将其交给工作者,让工作者开始工作

 goroutine是非抢占式的,一个通道打开了会一直占有,直到主动释放. 这里有个问题: 那就是管道取出的一直是第一个的

下面我们在select中既可以向管道中存数据,又可以取数据

 

} 工作者做的具体的工作内容--工作内容,从管道里不停的取数据func doWork(i for cc := range c  {        fmt.Printf(取出的i:%d,取出的值时:%d\n 创建工作者func createWorker(i int) chan in := make(chan  开始工作    go doWork(i,in}func main() {    )    n :=     hasValue := false    var activeWorker chan int        if hasValue {            activeWorker = w        }        select {        case n = <-A:            hasValue = true        case n = <-B:            hasValue = true        case activeWorker <- n:            hasValue = false        }    }}

结果:

 

 依然有问题: 这里生成数据的速度是在1.5秒以内随机的. 如果生成数据的速度快,消费数据的速度慢. 那么,就有可能有数据打印不出来. 被跳过了 

我们让工作者取数据的速度慢一些

 range c  {        time.Sleep(time.Second * 5)        fmt.Printf(0,取出的值时:取出的i:122729364856

速度一慢下来,我们发现就丢数据了,有些数据被跳过了. 那怎么办呢? 我们用一个数组来接收

 range c  {        time.Sleep(time.Second * )        fmt.Printf()    // 不停的从A和B中收数据    var values []int     {        var activeWorker chan int        var activeValue int        if len(values)  > 0 {            activeValue = values[0]            activeWorker = w        }        case n := <-A:         生成数据的channel            values = append(values,1)">case n := <-B:        case activeWorker <- activeValue:         消费数据的channel            /*             * 取出saveData中的第一个数据,放入到channel中             * 将第一个数据删除             */            values = values[1:]        }    }}

这样就不会丢数据了,5秒打印一个数据. 那么我们想知道,现在管道里积压了多少数据,怎么处理呢?

 做这件事之前,我们先来做一件事,这个程序是一直运行的,他不会停,我们设定一个10秒钟. 让程序10秒以后自动退出

使用time.After(10)

var values []int    // time.After返回的是一个管道. 也就是10秒以后,往管道里放一个数据    tm := time.After(10 * time.Second)    var activeValue if len(values)  >  {            activeValue = values[]            activeWorker = w        }         消费数据的channel            /*             * 取出saveData中的第一个数据,放入到channel中             * 将第一个数据删除             */            values = values[:]                    case <- tm: // 如果从管道中取出来值. 那么执行这个case            fmt.Println("bye")            return        }    }}

10后自动结束了

 

 接下来的一个需求,如果数据生成的速度太慢了,怎么办呢? 我们增加一个判断,如果数据生成的速度<800毫秒,打印一个timeout

 time.After返回的是一个管道. 也就是10秒以后,往管道里放一个数据    tm := time.After(10 * time.Second)    :]        case <- time.After(800 * time.Millisecond):  // 连续打印的两个数据之间时间间隔>800毫秒                fmt.Println("timeout")        case <- tm:  如果从管道中取出来值. 那么执行这个case            fmt.Println(bye)            return            /**             * 上面这两个case有什么区别呢?             * tm: 整体select运行的时间,超过10秒退出             * time.After(800 * time.Millisecond) : 这个是两个数据打印之间的时间间隔>800毫秒             */        }    }}

 打印结果

 

2. 定时器的使用

 我们最后来做这件事: 每秒钟打印出已经积压的数据,使用time.tick(1s) 这是一个定时器. 返回的也是一个channel,1秒钟往channel中放一个数据

 time.Second)    // 定义一个定时器,每秒钟执行定时任务    tick := time.Tick(time.Second)    :]        case <- time.After(800 * time.Millisecond):   连续打印的两个数据之间时间间隔>800毫秒                fmt.Println(timeout)        return            *             * 上面这两个case有什么区别呢?             * tm: 整体select运行的时间,超过10秒退出             * time.After(800 * time.Millisecond) : 这个是两个数据打印之间的时间间隔>800毫秒             */         case <- tick:            fmt.Println("积压数据个数:"

 感觉select不是很好理解,还需要在查询资料,学习一遍

完整代码

 range c  {        time.Sleep(time.Second )        fmt.Printf( 定义一个定时器,每秒钟执行定时任务    tick := time.Tick(time.Second)    */         case <- tick:            fmt.Println(积压数据个数:go通过通信来共享内存,而不是通过共享内存来通信. go通过channel实现了这样的一个特性

看上面的demo. 我们定义了两个channel来存入数据,定义了一个channel来取出数据. 然后对数据进行处理. 在通信的过程中进行了存和取数据的过程. 这就是通过通信来共享内存.

 六. 传统的同步机制

go建议通过通信来共享内存,但go本身也是支持传统的同步机制的,比如锁, 

go使用csp模型来实现同步,建议少使用锁来同步,锁是通过共享内存来通讯. 但也是可以用的.

下面我们来看如何使用锁. 

1. Mutex

 我们定义两个同时存数据的场景,一个取数据的场景. 并发进行,先不加锁,看看有什么问题?

 定义一个自定义的类型AtomicInttype AtomicInt int 增加方法func (a *AtomicInt) increase() {    *a ++ 取数据的方法func (a *AtomicInt) get() {    int(*a)}func main() {     a AtomicInt     存数据    a.increase()     定义了一个单独的协程去存数据,这样就有两个协程同时存数据    go func() {        a.increase()    }()    time.Sleep(time.Second)     取数据---这时取的是1还是2 呢?    fmt.Println(a.get())}

这个程序短期看运行不会有什么问题,但这里确实是有冲突的. 我们用-race来看一下

go run -race automicInt.go

 

 可以看出发生冲突了,什么冲突呢? 在写数据之前,发现有个地方在读数据. 阿欧....这可不好. 有同步问题. 

下面我们使用Mutex来加锁

 定义一个自定义的类型AtomicInttype AtomicInt struct {    value int    lock sync.Mutex}AtomicInt) increase() {    a.lock.Lock()    defer a.lock.Unlock()    a.value ++}{    lock.Lock()    defer a.lock.Unlock()    (a.value)}func main() {    ())}

其实这了展示了lock的用法还是挺简单的. 在结构体里,就是对结构体中的数据进行加锁. 加在哪里,就是对谁加的.

 

这一章学完了,感受是: 挺难的,尤其是select. 又要通过管道读数据,又要通过管道写数据,挺费事. 再学一遍. 眼睛透彻了

 

 

参考资料:

1. https://www.cnblogs.com/tobycnblogs/p/9935465.html

2. 

总结

以上是内存溢出为你收集整理的第十章 Channel--第四天 完结全部内容,希望文章能够帮你解决第十章 Channel--第四天 完结所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1254747.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-07
下一篇 2022-06-07

发表评论

登录后才能评论

评论列表(0条)

保存