python下 多进程间的通信--生产者消费者模式

python下 多进程间的通信--生产者消费者模式,第1张

概述#@File:07生产者消费者模型.py#@Information:#版本一:#缺点:程序会一直阻塞在q.get(),会一直等待从消息队列取数据frommultiprocessingimportQueue,Processimporttimeimportrandomdefproducer(name,food,q):foriinrange(4):#模拟生产data

# @file : 07生产者消费者模型.py
# @information:


# 版本一:
# 缺点:程序会一直阻塞在 q.get(),会一直等待从消息队列取数据

from multiprocessing import Queue, Process
import time
import random


def producer(name, food, q):
for i in range(4):
# 模拟生产
data = '{}做了{}{}'.format(name, food, i)
# 模拟延迟,生产时间
time.sleep(3)
print(data)
# 模拟装菜,将数据放进消息队列中
q.put(data)


def consumer(name, q):
# 一直消费,光盘行动
while True:
# 模拟消费
food = q.get(timeout=5)
time.sleep(random.randint(1,3))
print('{}吃了{}'.format(name, food))


if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=('大厨egon', '包子', q))
p2 = Process(target=producer, args=('马叉虫tank', '白菜', q))
c1 = Process(target=consumer, args=('春哥', q))

p1.start()
p2.start()
c1.start()


# 版本二:
# 优化:
# 1.通过p1.join() p2.join() 让主进程等待两个生产者全部生产结束
# 2.再利用q.get(timeout=5) 等待取值,超过5秒报错,+ 异常捕获,使得生产者退出取值循环。
# 缺陷:
# 1.不能确定具体等多少秒,q.get(timeout=5),不知道生产者需要多少时间。

from multiprocessing import Queue, Process
import time
import random


def producer(name, food, q):
for i in range(4):
# 模拟生产
data = '{}做了{}{}'.format(name, food, i)
# 模拟延迟,生产时间
time.sleep(3)
print(data)
# 模拟装菜,将数据放进消息队列中
q.put(data)


def consumer(name, q):
# 一直消费,光盘行动
while True:
# 模拟消费
try:
food = q.get(timeout=5)
except Exception:
print('没有菜了')
break
time.sleep(random.randint(1, 3))
print('{}吃了{}'.format(name, food))


if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=('大厨egon', '包子', q))
p2 = Process(target=producer, args=('马叉虫tank', '白菜', q))
c1 = Process(target=consumer, args=('春哥', q))
c2 = Process(target=consumer, args=('熊哥', q))

p1.start()
p2.start()
c1.start()
c2.start()
p1.join()
p2.join()


# 版本三:
# 优化:
# 1.通过利用 JoinableQueue 类 调用 join() 和 task_done() 方法 来自动判断 队列中是否还有数据

# JoinableQueue 这个类内部,每当你往队列中存入数据时,内部有一个计数器 +1
# q.task_done() 调用时(常用在 get 取数据之后),计数器 -1
# q.join() 调用时,会等到计数器为0时,才往后运行。

# 2.将消费者进程设置成守护进程,等到主进程中执行完毕后,直接终止消费者进程。
# (因为主进程最后调用q.join,说明队列中没有数据了,且主进程后续没有执行代码,主进程结束),


from multiprocessing import JoinableQueue, Process
import time
import random


def producer(name, food, q):
for i in range(1, 4):
# 模拟生产
data = '{}做了{}{}'.format(name, food, i)
# 模拟延迟,生产时间
time.sleep(3)
print(data)
# 模拟装菜,将数据放进消息队列中
q.put(data)


def consumer(name, q):
# 一直消费,光盘行动
while True:
# 模拟消费
food = q.get()
time.sleep(random.randint(1, 3))
print('{}吃了{}'.format(name, food))
q.task_done() # 告诉当前队列 前一步的 *** 作处理完毕(这里是取出了一个数据,且处理完毕)


if __name__ == '__main__':
q = JoinableQueue()
# # 设置生产者子进程,并开启
p1 = Process(target=producer, args=('大厨egon', '包子', q))
p2 = Process(target=producer, args=('马叉虫tank', '白菜', q))
p1.start()
p2.start()

# 设置消费者子进程
c1 = Process(target=consumer, args=('春哥', q))
c2 = Process(target=consumer, args=('熊哥', q))

# 并设置其为守护进程,开启
c1.daemon = True
c1.start()
c2.daemon = True
c2.start()

# 主进程等待生产者子进程,全部生产完
p1.join()
p2.join()

# 等待队列中所有数据被处理完,再往下执行代码
q.join()

# 只要q.join()执行完毕后,说明消费者已经处理完数据了,那么消费者的进程就没有必要存在了。
# 由此可以将 消费者的进程 设置成守护进程

 

# 版本四:原理同版本三一样,测试queue模块下的Queue,
# 测试发现:
# q = queue.Queue() 实例化对象,返回的结果,作为进程 p = multiprocessing.Process() 的参数时,
# 不能以 .start() 方法启动,(会报 can't pickle _thread.lock objects 的错误,Process 内部会涉及到序列化pickle,其不支持 queue.Queue()的返回值 )
# 只能.run() 调用函数方式,启动进程。

from multiprocessing import Process
import queue
import time
import random
import pickle


def producer(name, food, q):
for i in range(1, 4):
data = '{}做了{}{}'.format(name, food, i)
time.sleep(3)
print(data)
q.put(data)


def consumer(name, q):
while True:
food = q.get()
time.sleep(random.randint(1, 3))
print('{}吃了{}'.format(name, food))
q.task_done()


if __name__ == '__main__':
q = queue.Queue()
p1 = Process(target=producer, args=('大厨egon', '包子', q))
p2 = Process(target=producer, args=('马叉虫tank', '白菜', q))

c1 = Process(target=consumer, args=('春哥', q))
c2 = Process(target=consumer, args=('熊哥', q))

# p1.start() # 会报 can't pickle _thread.lock objects 的错误
# p2.start()
p1.run()
p2.run()

c1.daemon = True
# c1.start()
c1.run()
c2.daemon = True
# c2.start()
c2.run()

p1.join()
p2.join()

q.join()

总结

以上是内存溢出为你收集整理的python下 多进程间的通信--生产者消费者模式全部内容,希望文章能够帮你解决python下 多进程间的通信--生产者消费者模式所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1188062.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-03
下一篇 2022-06-03

发表评论

登录后才能评论

评论列表(0条)

保存