如何在方法中终止长时间运行的CPU绑定计算?
您尝试的方法不起作用,因为所返回的期货
ProcessPoolExecutor无法取消。虽然ASYNCIO的
run_in_executor
尝试传播的取消,这只不过是忽略由
Future.cancel一次任务开始执行。
没有任何根本原因。与线程不同,可以安全地终止进程,因此完全有可能
ProcessPoolExecutor.submit返回
cancel终止相应进程的将来。Asyncio协程定义了取消语义,并会自动使用它。不幸的是,
ProcessPoolExecutor.submit返回一个常规
concurrent.futures.Future,该常规假定最低公分母,并且将运行中的未来视为不可触及。
结果,要取消在子流程中执行的任务,必须
ProcessPoolExecutor完全规避并管理自己的流程。面临的挑战是如何在不重新实现一半的情况下执行此 *** 作
multiprocessing。标准库提供的一个选项就是
multiprocessing.Pool为此目的(滥用),因为它支持可靠地关闭工作进程。A
CancellablePool可以按以下方式工作:
- 与其生成固定数量的进程,不如生成固定数量的1-worker池。
- 从异步协程向池分配任务。如果在等待另一个进程中的任务完成时取消协程,请终止单进程池并创建一个新池。
- 由于所有内容都是由单个asyncio线程协调的,因此不必担心争用情况,例如意外杀死已经开始执行另一任务的进程。(如果要支持中的取消,则需要防止这种情况
ProcessPoolExecutor
。)
这是该想法的示例实现:
import asyncioimport multiprocessingclass CancellablePool: def __init__(self, max_workers=3): self._free = {self._new_pool() for _ in range(max_workers)} self._working = set() self._change = asyncio.Event() def _new_pool(self): return multiprocessing.Pool(1) async def apply(self, fn, *args): """ Like multiprocessing.Pool.apply_async, but: * is an asyncio coroutine * terminates the process if cancelled """ while not self._free: await self._change.wait() self._change.clear() pool = usable_pool = self._free.pop() self._working.add(pool) loop = asyncio.get_event_loop() fut = loop.create_future() def _on_done(obj): loop.call_soon_threadsafe(fut.set_result, obj) def _on_err(err): loop.call_soon_threadsafe(fut.set_exception, err) pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err) try: return await fut except asyncio.CancelledError: pool.terminate() usable_pool = self._new_pool() finally: self._working.remove(pool) self._free.add(usable_pool) self._change.set() def shutdown(self): for p in self._working | self._free: p.terminate() self._free.clear()
显示取消的简约测试用例:
def really_long_process(): print("I am a really long computation.....") large_val = 9729379273492397293479237492734 ** 344323 print("I finally computed this large value: {}".format(large_val))async def main(): loop = asyncio.get_event_loop() pool = CancellablePool() tasks = [loop.create_task(pool.apply(really_long_process)) for _ in range(5)] for t in tasks: try: await asyncio.wait_for(t, 1) except asyncio.TimeoutError: print('task timed out and cancelled') pool.shutdown()asyncio.get_event_loop().run_until_complete(main())
请注意,CPU使用率从未超过3个内核,并且在测试即将结束时它如何开始下降,表明进程已按预期终止。
要将其应用于问题的代码,请创建
self._lmz_executor一个实例
CancellablePool并将其更改
self._loop.run_in_executor(...)为
self._loop.create_task(self._lmz_executor.apply(...))。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)