生产者-消费者模式golang实现

生产者-消费者模式golang实现,第1张

生产者-消费者模式golang实现

文章目录

生产者-消费者模式其它并发模式

生产者-消费者模式

生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。

基本工作原理

生产者线程增加资源数,如果资源数大于最大值则生产者线程挂起等待,当收到消费者线程的通知后继续生产。
消费者线程减少资源数,如果资源数为0,则消费者线程挂起,等待生产者通知后继续生产。


举例

package main

import (
	"fmt"
	"sync"
	"time"
)

const (
	PRODUCERS  = 5
	ConSUMERS = 2
	PRODUCTS  = 20
)

// productCount为资源的数量,需要互斥处理
var productCount = 0
var lock sync.Mutex
var wg sync.WaitGroup

// 控制生产者阻塞等待
var produceWait chan struct{}
// 控制消费者阻塞等待
var consumeWait chan struct{}

// 当资源达到上限或下限时,挂起单个协程,通过这两个变量休眠同类协程
var stopProduce = false
var stopConsume = false

// Produce 生产者
func Produce(index int, wg *sync.WaitGroup) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("Producer ", index, " panic")
		}
		wg.Done()
	}()

	for {
		time.Sleep(time.Second)
		lock.Lock()
		// 生产者协程发现stopProduce为true,则睡眠5秒
		if stopProduce {
			fmt.Println("Producer ", index, " stop produce, sleep 5 seconds")
			lock.Unlock()
			time.Sleep(time.Second * 5)
			continue
		}
		fmt.Println("Producer ", index, " begin produce")
		if productCount >= PRODUCTS {
			fmt.Println("Products are full")
			// 如果产品满了就停止生产
			stopProduce = true
			lock.Unlock()
			//将当前生产者挂起,等待
			<-produceWait
			lock.Lock()
			stopProduce = false
			lock.Unlock()
			continue
		}
		productCount++
		fmt.Println("Products count is ", productCount)
		// 产品从空的状态到被生产了一个,激活消费者
		if stopConsume {
			var consumerActive struct{}
			consumeWait <- consumerActive
		}
		lock.Unlock()
	}
}

// Consume 消费者
func Consume(index int, wg *sync.WaitGroup) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("Consumer ", index, " panic")
		}
		wg.Done()
	}()

	for {
		time.Sleep(time.Second)
		lock.Lock()
		if stopConsume {
			fmt.Println("Consumer ", index, " stop consume, sleep 5 seconds")
			lock.Unlock()
			time.Sleep(time.Second * 5)
			continue
		}
		fmt.Println("Consumer ", index, " begin consume")
		if productCount <= 0 {
			fmt.Println("Products are empty")
			stopConsume = true
			lock.Unlock()
			//产品空了,将当前消费者挂起
			<-consumeWait
			lock.Lock()
			stopConsume = false
			lock.Unlock()
			continue
		}
		productCount--
		fmt.Println("Products count is ", productCount)

		// 产品从满的状态被消费了一个,激活生产者
		if stopProduce {
			var producerActive struct{}
			produceWait <- producerActive
		}

		lock.Unlock()
	}
}

func main() {
	wg.Add(PRODUCERS + CONSUMERS)
	produceWait = make(chan struct{})
	consumeWait = make(chan struct{})
	for i := 0; i < CONSUMERS; i++ {
		go Consume(i, &wg)
	}
	for i := 0; i < PRODUCERS; i++ {
		go Produce(i, &wg)
	}

	wg.Wait()
}


在程序中可以发现,在produce函数中,当我们发现系统资源到达临界值时,使用channel将当前的协程阻塞了,然后将bool变量stopProduce置为true,让其它协程进入休眠。为什么要这样做呢?
因为程序中的produceWait是非缓冲的,所以一次只能激活一个生产者,这么做在一定程度限制了生产者,所以我们引入bool变量通知其他协程睡眠,避免此问题,consume函数中也是同样的道理。


其它并发模式

参考资料:

https://www.cnblogs.com/secondtonone1/p/11843269.html

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5712586.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存