如何使用asyncio和current.futures.ProcessPoolExecutor终止Python中长时间运行的计算(受CPU约束的任务)?

如何使用asyncio和current.futures.ProcessPoolExecutor终止Python中长时间运行的计算(受CPU约束的任务)?,第1张

如何使用asyncio和current.futures.ProcessPoolExecutor终止Python中长时间运行的计算(受CPU约束的任务)?

如何在方法中终止长时间运行的CPU绑定计算?

您尝试的方法不起作用,因为所返回的期货

ProcessPoolExecutor
无法取消。虽然ASYNCIO的
run_in_executor

尝试传播的取消,这只不过是忽略由
Future.cancel
一次任务开始执行。

没有任何根本原因。与线程不同,可以安全地终止进程,因此完全有可能

ProcessPoolExecutor.submit
返回
cancel
终止相应进程的将来。Asyncio协程定义了取消语义,并会自动使用它。不幸的是,
ProcessPoolExecutor.submit
返回一个常规
concurrent.futures.Future
,该常规假定最低公分母,并且将运行中的未来视为不可触及。

结果,要取消在子流程中执行的任务,必须

ProcessPoolExecutor
完全规避并管理自己的流程。面临的挑战是如何在不重新实现一半的情况下执行此 *** 作
multiprocessing
。标准库提供的一个选项就是
multiprocessing.Pool
为此目的(滥用),因为它支持可靠地关闭工作进程。A
CancellablePool
可以按以下方式工作:

  • 与其生成固定数量的进程,不如生成固定数量的1-worker池。
  • 从异步协程向池分配任务。如果在等待另一个进程中的任务完成时取消协程,请终止单进程池并创建一个新池。
  • 由于所有内容都是由单个asyncio线程协调的,因此不必担心争用情况,例如意外杀死已经开始执行另一任务的进程。(如果要支持中的取消,则需要防止这种情况
    ProcessPoolExecutor
    。)

这是该想法的示例实现:

import asyncioimport multiprocessingclass CancellablePool:    def __init__(self, max_workers=3):        self._free = {self._new_pool() for _ in range(max_workers)}        self._working = set()        self._change = asyncio.Event()    def _new_pool(self):        return multiprocessing.Pool(1)    async def apply(self, fn, *args):        """        Like multiprocessing.Pool.apply_async, but:         * is an asyncio coroutine         * terminates the process if cancelled        """        while not self._free: await self._change.wait() self._change.clear()        pool = usable_pool = self._free.pop()        self._working.add(pool)        loop = asyncio.get_event_loop()        fut = loop.create_future()        def _on_done(obj): loop.call_soon_threadsafe(fut.set_result, obj)        def _on_err(err): loop.call_soon_threadsafe(fut.set_exception, err)        pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)        try: return await fut        except asyncio.CancelledError: pool.terminate() usable_pool = self._new_pool()        finally: self._working.remove(pool) self._free.add(usable_pool) self._change.set()    def shutdown(self):        for p in self._working | self._free: p.terminate()        self._free.clear()

显示取消的简约测试用例:

def really_long_process():    print("I am a really long computation.....")    large_val = 9729379273492397293479237492734 ** 344323    print("I finally computed this large value: {}".format(large_val))async def main():    loop = asyncio.get_event_loop()    pool = CancellablePool()    tasks = [loop.create_task(pool.apply(really_long_process))  for _ in range(5)]    for t in tasks:        try: await asyncio.wait_for(t, 1)        except asyncio.TimeoutError: print('task timed out and cancelled')    pool.shutdown()asyncio.get_event_loop().run_until_complete(main())

请注意,CPU使用率从未超过3个内核,并且在测试即将结束时它如何开始下降,表明进程已按预期终止。

要将其应用于问题的代码,请创建

self._lmz_executor
一个实例
CancellablePool
并将其更改
self._loop.run_in_executor(...)
self._loop.create_task(self._lmz_executor.apply(...))



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5631190.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存