确实在文档中有一个提示:
回调应该立即完成,因为 否则处理结果的线程 将被阻塞。
回调是在主进程中处理的,但是 它们在自己的单独线程中运行 。创建a时,
Pool它实际上会在
Thread内部创建一些对象:
class Pool(object): Process = Process def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None): self._setup_queues() self._taskqueue = Queue.Queue() self._cache = {} ... # stuff we don't care about self._worker_handler = threading.Thread( target=Pool._handle_workers, args=(self, ) ) self._worker_handler.daemon = True self._worker_handler._state = RUN self._worker_handler.start() self._task_handler = threading.Thread( target=Pool._handle_tasks, args=(self._taskqueue, self._quick_put, self._outqueue, self._pool, self._cache) ) self._task_handler.daemon = True self._task_handler._state = RUN self._task_handler.start() self._result_handler = threading.Thread( target=Pool._handle_results, args=(self._outqueue, self._quick_get, self._cache) ) self._result_handler.daemon = True self._result_handler._state = RUN self._result_handler.start()
对我们来说有趣的线索是
_result_handler; 我们很快就会找到原因。
切换齿轮一秒钟,当您运行时
apply_async,它会在
ApplyResult内部创建一个对象来管理从孩子那里获得结果:
def apply_async(self, func, args=(), kwds={}, callback=None): assert self._state == RUN result = ApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return resultclass ApplyResult(object): def __init__(self, cache, callback): self._cond = threading.Condition(threading.Lock()) self._job = job_counter.next() self._cache = cache self._ready = False self._callback = callback cache[self._job] = self def _set(self, i, obj): self._success, self._value = obj if self._callback and self._success: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job]
如您所见,如果任务成功,则该
_set方法最终将实际执行
callback传入的方法。另请注意,它会
cache在末尾将自身添加到全局字典中
__init__。
现在,回到
_result_handler线程对象。该对象调用该
_handle_results函数,如下所示:
while 1: try: task = get() except (IOError, EOFError): debug('result handler got EOFError/IOError -- exiting') return if thread._state: assert thread._state == TERMINATE debug('result handler found thread._state=TERMINATE') break if task is None: debug('result handler got sentinel') break job, i, obj = task try: cache[job]._set(i, obj) # Here is _set (and therefore our callback) being called! except KeyError: pass # More stuff
这是一个循环,只是将子级结果从队列中拉出,在中找到它的条目
cache,然后调用
_set,执行我们的回调。即使您处于循环中,它也可以运行,因为它不在主线程中运行。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)