python asyncio,如何从另一个线程创建和取消任务

python asyncio,如何从另一个线程创建和取消任务,第1张

python asyncio,如何从另一个线程创建和取消任务

我认为您可能需要使您的

add_task
方法知道是否从事件循环以外的线程中调用了它。这样,如果从同一个线程中调用
asyncio.async
它,则可以直接调用它,否则,它可以做一些额外的工作来将任务从循环线程传递到调用线程。这是一个例子:

import timeimport asyncioimport functoolsfrom threading import Thread, current_thread, Eventfrom concurrent.futures import Futureclass B(Thread):    def __init__(self, start_event):        Thread.__init__(self)        self.loop = None        self.tid = None        self.event = start_event    def run(self):        self.loop = asyncio.new_event_loop()        asyncio.set_event_loop(self.loop)        self.tid = current_thread()        self.loop.call_soon(self.event.set)        self.loop.run_forever()    def stop(self):        self.loop.call_soon_threadsafe(self.loop.stop)    def add_task(self, coro):        """this method should return a task object, that I          can cancel, not a handle"""        def _async_add(func, fut): try:     ret = func()     fut.set_result(ret) except Exception as e:     fut.set_exception(e)        f = functools.partial(asyncio.async, coro, loop=self.loop)        if current_thread() == self.tid: return f() # We can call directly if we're not going between threads.        else: # We're in a non-event loop thread so we use a Future # to get the task from the event loop thread once # it's ready. fut = Future() self.loop.call_soon_threadsafe(_async_add, f, fut) return fut.result()    def cancel_task(self, task):        self.loop.call_soon_threadsafe(task.cancel)@asyncio.coroutinedef test():    while True:        print("running")        yield from asyncio.sleep(1)event = Event()b = B(event)b.start()event.wait() # Let the loop's thread signal us, rather than sleepingt = b.add_task(test()) # This is a real tasktime.sleep(10)b.stop()

首先,我们将事件循环的线程ID保存在该

run
方法中,以便
add_task
稍后可以确定是否来自其他线程的调用。如果
add_task
从非事件循环线程
call_soon_threadsafe
调用了if
,则我们使用来调用一个将调度协程的函数,然后使用a
concurrent.futures.Future
将任务传递回调用线程,该线程等待的结果
Future

取消任务的注意事项:调用

cancel
a时
Task
CancelledError
下次事件循环运行时,协程中将出现a
。这意味着,任务包装的协程将在下一次达到屈服点时因异常而中止-
除非协程捕获到
CancelledError
并阻止自身中止。另请注意,这仅在要包装的函数实际上是可中断的协程时才有效;例如,不能真正取消
asyncio.Future
由的返回值
baseEventLoop.run_in_executor
,因为它实际上包裹在周围
concurrent.futures.Future
,并且一旦其基础函数实际开始执行,就不能取消它们。在那种情况下,
asyncio.Future
将说它已取消,但是实际在执行程序中运行的功能将继续运行。

编辑: 根据Andrew
Svetlov的建议,将第一个示例更新为使用

concurrent.futures.Future
,而不是
queue.Queue

注意:

asyncio.async
不推荐使用,因为使用3.4.4版本
asyncio.ensure_future
代替。



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5662798.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存