生产者-消费者模式其它并发模式
生产者-消费者模式生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。
基本工作原理
生产者线程增加资源数,如果资源数大于最大值则生产者线程挂起等待,当收到消费者线程的通知后继续生产。
消费者线程减少资源数,如果资源数为0,则消费者线程挂起,等待生产者通知后继续生产。
举例
package main import ( "fmt" "sync" "time" ) const ( PRODUCERS = 5 ConSUMERS = 2 PRODUCTS = 20 ) // productCount为资源的数量,需要互斥处理 var productCount = 0 var lock sync.Mutex var wg sync.WaitGroup // 控制生产者阻塞等待 var produceWait chan struct{} // 控制消费者阻塞等待 var consumeWait chan struct{} // 当资源达到上限或下限时,挂起单个协程,通过这两个变量休眠同类协程 var stopProduce = false var stopConsume = false // Produce 生产者 func Produce(index int, wg *sync.WaitGroup) { defer func() { if err := recover(); err != nil { fmt.Println("Producer ", index, " panic") } wg.Done() }() for { time.Sleep(time.Second) lock.Lock() // 生产者协程发现stopProduce为true,则睡眠5秒 if stopProduce { fmt.Println("Producer ", index, " stop produce, sleep 5 seconds") lock.Unlock() time.Sleep(time.Second * 5) continue } fmt.Println("Producer ", index, " begin produce") if productCount >= PRODUCTS { fmt.Println("Products are full") // 如果产品满了就停止生产 stopProduce = true lock.Unlock() //将当前生产者挂起,等待 <-produceWait lock.Lock() stopProduce = false lock.Unlock() continue } productCount++ fmt.Println("Products count is ", productCount) // 产品从空的状态到被生产了一个,激活消费者 if stopConsume { var consumerActive struct{} consumeWait <- consumerActive } lock.Unlock() } } // Consume 消费者 func Consume(index int, wg *sync.WaitGroup) { defer func() { if err := recover(); err != nil { fmt.Println("Consumer ", index, " panic") } wg.Done() }() for { time.Sleep(time.Second) lock.Lock() if stopConsume { fmt.Println("Consumer ", index, " stop consume, sleep 5 seconds") lock.Unlock() time.Sleep(time.Second * 5) continue } fmt.Println("Consumer ", index, " begin consume") if productCount <= 0 { fmt.Println("Products are empty") stopConsume = true lock.Unlock() //产品空了,将当前消费者挂起 <-consumeWait lock.Lock() stopConsume = false lock.Unlock() continue } productCount-- fmt.Println("Products count is ", productCount) // 产品从满的状态被消费了一个,激活生产者 if stopProduce { var producerActive struct{} produceWait <- producerActive } lock.Unlock() } } func main() { wg.Add(PRODUCERS + CONSUMERS) produceWait = make(chan struct{}) consumeWait = make(chan struct{}) for i := 0; i < CONSUMERS; i++ { go Consume(i, &wg) } for i := 0; i < PRODUCERS; i++ { go Produce(i, &wg) } wg.Wait() }
在程序中可以发现,在produce函数中,当我们发现系统资源到达临界值时,使用channel将当前的协程阻塞了,然后将bool变量stopProduce置为true,让其它协程进入休眠。为什么要这样做呢?
因为程序中的produceWait是非缓冲的,所以一次只能激活一个生产者,这么做在一定程度限制了生产者,所以我们引入bool变量通知其他协程睡眠,避免此问题,consume函数中也是同样的道理。
其它并发模式
参考资料:
https://www.cnblogs.com/secondtonone1/p/11843269.html
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)