我认为您可能需要使您的
add_task方法知道是否从事件循环以外的线程中调用了它。这样,如果从同一个线程中调用
asyncio.async它,则可以直接调用它,否则,它可以做一些额外的工作来将任务从循环线程传递到调用线程。这是一个例子:
import timeimport asyncioimport functoolsfrom threading import Thread, current_thread, Eventfrom concurrent.futures import Futureclass B(Thread): def __init__(self, start_event): Thread.__init__(self) self.loop = None self.tid = None self.event = start_event def run(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.tid = current_thread() self.loop.call_soon(self.event.set) self.loop.run_forever() def stop(self): self.loop.call_soon_threadsafe(self.loop.stop) def add_task(self, coro): """this method should return a task object, that I can cancel, not a handle""" def _async_add(func, fut): try: ret = func() fut.set_result(ret) except Exception as e: fut.set_exception(e) f = functools.partial(asyncio.async, coro, loop=self.loop) if current_thread() == self.tid: return f() # We can call directly if we're not going between threads. else: # We're in a non-event loop thread so we use a Future # to get the task from the event loop thread once # it's ready. fut = Future() self.loop.call_soon_threadsafe(_async_add, f, fut) return fut.result() def cancel_task(self, task): self.loop.call_soon_threadsafe(task.cancel)@asyncio.coroutinedef test(): while True: print("running") yield from asyncio.sleep(1)event = Event()b = B(event)b.start()event.wait() # Let the loop's thread signal us, rather than sleepingt = b.add_task(test()) # This is a real tasktime.sleep(10)b.stop()
首先,我们将事件循环的线程ID保存在该
run方法中,以便
add_task稍后可以确定是否来自其他线程的调用。如果
add_task从非事件循环线程
call_soon_threadsafe调用了if
,则我们使用来调用一个将调度协程的函数,然后使用a
concurrent.futures.Future将任务传递回调用线程,该线程等待的结果
Future。
取消任务的注意事项:调用
cancela时
Task,
CancelledError下次事件循环运行时,协程中将出现a
。这意味着,任务包装的协程将在下一次达到屈服点时因异常而中止-
除非协程捕获到
CancelledError并阻止自身中止。另请注意,这仅在要包装的函数实际上是可中断的协程时才有效;例如,不能真正取消
asyncio.Future由的返回值
baseEventLoop.run_in_executor,因为它实际上包裹在周围
concurrent.futures.Future,并且一旦其基础函数实际开始执行,就不能取消它们。在那种情况下,
asyncio.Future将说它已取消,但是实际在执行程序中运行的功能将继续运行。
编辑: 根据Andrew
Svetlov的建议,将第一个示例更新为使用
concurrent.futures.Future,而不是
queue.Queue。
注意:
asyncio.async不推荐使用,因为使用3.4.4版本
asyncio.ensure_future代替。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)