使用asyncio.Queue进行生产者-消费者流

使用asyncio.Queue进行生产者-消费者流,第1张

使用asyncio.Queue进行生产者-消费者

我如何修改上面的程序,以便生产者是可以与消费者/工人同时调度的协程?

可以对示例进行概括,而无需更改其基本逻辑:

  • 将插入循环移到单独的生产者协程。
  • 在后台启动消费者,让他们在生产物品时对其进行处理。
  • 在消费者运行时,启动生产者并等待他们完成生产物品,例如
    await producer()
    await gather(*producers)
    ,等等。
  • 完成所有生产者后,请等待消费者使用来处理其余项目
    await queue.join()
  • 取消消费者,所有消费者现在都在等待队列中的下一个物品的交付,因为我们知道生产者已经完成,所以永远不会到达。

这是实现上述内容的示例:

import asyncio, randomasync def rnd_sleep(t):    # sleep for T seconds on average    await asyncio.sleep(t * random.random() * 2)async def producer(queue):    while True:        # produce a token and send it to a consumer        token = random.random()        print(f'produced {token}')        if token < .05: break        await queue.put(token)        await rnd_sleep(.1)async def consumer(queue):    while True:        token = await queue.get()        # process the token received from a producer        await rnd_sleep(.3)        queue.task_done()        print(f'consumed {token}')async def main():    queue = asyncio.Queue()    # fire up the both producers and consumers    producers = [asyncio.create_task(producer(queue))      for _ in range(3)]    consumers = [asyncio.create_task(consumer(queue))      for _ in range(10)]    # with both producers and consumers running, wait for    # the producers to finish    await asyncio.gather(*producers)    print('---- done producing')    # wait for the remaining tasks to be processed    await queue.join()    # cancel the consumers, which are now idle    for c in consumers:        c.cancel()asyncio.run(main())

请注意,在现实生活中的生产者和消费者(尤其是涉及网络访问的生产者和消费者)中,您可能希望捕获处理期间发生的与IO相关的异常。如果异常是可恢复的(就像大多数与网络相关的异常一样),则只需捕获异常并记录错误即可。您仍应调用,

task_done()
因为否则
queue.join()
将由于未处理的项目而挂起。如果有必要重新尝试处理该项目,则可以在调用之前将其返回到队列
task_done()
。例如:

# like the above, but handling exceptions during processing:async def consumer(queue):    while True:        token = await queue.get()        try: # this uses aiohttp or whatever await process(token)        except aiohttp.ClientError as e: print(f"Error processing token {token}: {e}") # If it makes sense, return the token to the queue to be # processed again. (You can use a counter to avoid # processing a faulty token infinitely.) #await queue.put(token)        queue.task_done()        print(f'consumed {token}')


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5654222.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存