使用多处理池的apply_async方法时,谁运行回调?

使用多处理池的apply_async方法时,谁运行回调?,第1张

使用多处理池的apply_async方法时,谁运行回调?

确实在文档中有一个提示:

回调应该立即完成,因为 否则处理结果的线程 将被阻塞。

回调是在主进程中处理的,但是 它们在自己的单独线程中运行 。创建a时,

Pool
它实际上会在
Thread
内部创建一些对象

class Pool(object):    Process = Process    def __init__(self, processes=None, initializer=None, initargs=(),      maxtasksperchild=None):        self._setup_queues()        self._taskqueue = Queue.Queue()        self._cache = {}        ... # stuff we don't care about        self._worker_handler = threading.Thread( target=Pool._handle_workers, args=(self, ) )        self._worker_handler.daemon = True        self._worker_handler._state = RUN         self._worker_handler.start()        self._task_handler = threading.Thread( target=Pool._handle_tasks, args=(self._taskqueue, self._quick_put, self._outqueue,       self._pool, self._cache) )        self._task_handler.daemon = True        self._task_handler._state = RUN         self._task_handler.start()        self._result_handler = threading.Thread( target=Pool._handle_results, args=(self._outqueue, self._quick_get, self._cache) )        self._result_handler.daemon = True        self._result_handler._state = RUN        self._result_handler.start()

对我们来说有趣的线索是

_result_handler
; 我们很快就会找到原因。

切换齿轮一秒钟,当您运行时

apply_async
,它会在
ApplyResult
内部创建一个对象来管理从孩子那里获得结果:

def apply_async(self, func, args=(), kwds={}, callback=None):    assert self._state == RUN    result = ApplyResult(self._cache, callback)    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))    return resultclass ApplyResult(object):    def __init__(self, cache, callback):        self._cond = threading.Condition(threading.Lock())        self._job = job_counter.next()        self._cache = cache        self._ready = False        self._callback = callback        cache[self._job] = self    def _set(self, i, obj):        self._success, self._value = obj        if self._callback and self._success: self._callback(self._value)        self._cond.acquire()        try: self._ready = True self._cond.notify()        finally: self._cond.release()        del self._cache[self._job]

如您所见,如果任务成功,则该

_set
方法最终将实际执行
callback
传入的方法。另请注意,它会
cache
在末尾将自身添加到全局字典中
__init__

现在,回到

_result_handler
线程对象。该对象调用该
_handle_results
函数,如下所示:

    while 1:        try: task = get()        except (IOError, EOFError): debug('result handler got EOFError/IOError -- exiting') return        if thread._state: assert thread._state == TERMINATE debug('result handler found thread._state=TERMINATE') break        if task is None: debug('result handler got sentinel') break        job, i, obj = task        try: cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!        except KeyError: pass        # More stuff

这是一个循环,只是将子级结果从队列中拉出,在中找到它的条目

cache
,然后调用
_set
,执行我们的回调。即使您处于循环中,它也可以运行,因为它不在主线程中运行。



欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5647503.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存