python– 在主进程中异步等待多处理队列

python– 在主进程中异步等待多处理队列,第1张

概述我有以下场景:多个工作进程将有关其当前状态的事件发送到事件调度程序.如果我们在主进程中,则此事件调度程序需要处理所有事件,或者如果我们处于工作进程中,则表示主进程的事件调度程序处理这些事件.这里的主要关键是事件处理也必须在主进程的主线程中,所以我不能在线程内运行一段时间的True循环并等待来自那里的工作进程的消息.所以我拥有的是:import asyncio

我有以下场景:多个工作进程将有关其当前状态的事件发送到事件调度程序.如果我们在主进程中,则此事件调度程序需要处理所有事件,或者如果我们处于工作进程中,则表示主进程的事件调度程序处理这些事件.

这里的主要关键是事件处理也必须在主进程的主线程中,所以我不能在线程内运行一段时间的True循环并等待来自那里的工作进程的消息.

所以我拥有的是:

import asynciofrom concurrent.futures import ThreadPoolExecutorfrom multiprocessing import current_process,Process,Queuefrom threading import current_threadfrom time import sleepdef get_q(q):    print("Waiting for the queue ({} / {})\n".format(current_thread().@R_404_6889@,current_process().@R_404_6889@))    return q.get()async def message_q(q):    while True:        f = loop.run_in_executor(None,get_q,q)        await f        if f.result() is None:            print("Done")            return;        print("Got the result ({} / {})".format(current_thread().@R_404_6889@,current_process().@R_404_6889@))        print("Result is: {}\n".format(f.result()))async def something_else():    while True:        print("Something else\n")        await asyncio.sleep(2)def other_process(q):    for i in range(5):        print("Putting something in the queue ({})".format(current_process().@R_404_6889@))        q.put(i)        sleep(1)    q.put(None)q = Queue()Process(target=other_process,args=(q,),daemon=True).start()loop = asyncio.get_event_loop()loop.set_default_executor(ThreadPoolExecutor(max_workers=1))asyncio.ensure_future(message_q(q))asyncio.ensure_future(something_else())loop.run_until_complete(asyncio.sleep(6))

other_process()是一个示例性的工作进程,它使用队列来发信号通知主进程,该进程运行事件循环来处理东西并等待队列上的任何数据.在实际情况中,此过程将向事件调度程序发出信号,该事件调度程序随后将处理队列消息传递,将消息传递给主进程事件调度程序,但在此我稍微简化了一下.

但是,我对此并不十分满意.一次又一次地将get_q()提交给ThreadPoolExecutor会产生更多的开销,并且不像一个长时间运行的线程那样干净.此外,只要队列中没有其他数据,就会等待f没有最佳和阻塞,这会阻止事件循环退出.我的解决方法是在worker完成后发送None,如果队列中没有None,则退出message_q().

有没有更好的方法来实现这个?性能非常关键,我想将Queue对象保持在事件调度程序的本地,而不是将其传递给管理工作进程的代码(或者需要调用某种finalize()方法).

最佳答案我现在将其实现为异步上下文管理器.上下文管理器调用

asyncio.ensure_future(message_q())

在其__aenter __()方法中,在其__aexit __()方法中将None添加到队列中以关闭message_q()中的无限循环.

然后,可以在process-spawning代码部分的async with语句中使用上下文管理器,从而无需手动调用shutdown方法.但是,在确保message_q()协程允许上下文管理器初始化队列侦听器之后,建议在__aenter __()方法中调用await asyncio.sleep(0).否则,不会立即调用message_q().这本身不是问题(因为队列无论如何都被填充),但是它会延迟事件转发,直到代码中出现下一个等待.

应该使用ProcesspoolExecutor和loop.run_in_executor()生成进程,因此等待进程不会阻止事件循环.

您可能还希望使用JoinableQueue来确保在退出上下文管理器之前处理了所有事件,而不是使用Queue.

总结

以上是内存溢出为你收集整理的python – 在主进程中异步等待多处理队列全部内容,希望文章能够帮你解决python – 在主进程中异步等待多处理队列所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1206214.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存