我如何修改上面的程序,以便生产者是可以与消费者/工人同时调度的协程?
可以对示例进行概括,而无需更改其基本逻辑:
- 将插入循环移到单独的生产者协程。
- 在后台启动消费者,让他们在生产物品时对其进行处理。
- 在消费者运行时,启动生产者并等待他们完成生产物品,例如
await producer()
或await gather(*producers)
,等等。 - 完成所有生产者后,请等待消费者使用来处理其余项目
await queue.join()
。 - 取消消费者,所有消费者现在都在等待队列中的下一个物品的交付,因为我们知道生产者已经完成,所以永远不会到达。
这是实现上述内容的示例:
import asyncio, randomasync def rnd_sleep(t): # sleep for T seconds on average await asyncio.sleep(t * random.random() * 2)async def producer(queue): while True: # produce a token and send it to a consumer token = random.random() print(f'produced {token}') if token < .05: break await queue.put(token) await rnd_sleep(.1)async def consumer(queue): while True: token = await queue.get() # process the token received from a producer await rnd_sleep(.3) queue.task_done() print(f'consumed {token}')async def main(): queue = asyncio.Queue() # fire up the both producers and consumers producers = [asyncio.create_task(producer(queue)) for _ in range(3)] consumers = [asyncio.create_task(consumer(queue)) for _ in range(10)] # with both producers and consumers running, wait for # the producers to finish await asyncio.gather(*producers) print('---- done producing') # wait for the remaining tasks to be processed await queue.join() # cancel the consumers, which are now idle for c in consumers: c.cancel()asyncio.run(main())
请注意,在现实生活中的生产者和消费者(尤其是涉及网络访问的生产者和消费者)中,您可能希望捕获处理期间发生的与IO相关的异常。如果异常是可恢复的(就像大多数与网络相关的异常一样),则只需捕获异常并记录错误即可。您仍应调用,
task_done()因为否则
queue.join()将由于未处理的项目而挂起。如果有必要重新尝试处理该项目,则可以在调用之前将其返回到队列中
task_done()。例如:
# like the above, but handling exceptions during processing:async def consumer(queue): while True: token = await queue.get() try: # this uses aiohttp or whatever await process(token) except aiohttp.ClientError as e: print(f"Error processing token {token}: {e}") # If it makes sense, return the token to the queue to be # processed again. (You can use a counter to avoid # processing a faulty token infinitely.) #await queue.put(token) queue.task_done() print(f'consumed {token}')
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)