扭曲:等待子任务完成

扭曲:等待子任务完成,第1张

扭曲:等待子任务完成

就目前而言,在我看来,这段代码的并行下载数量有限,但是并行解析作业的数量却不受限制。那是故意的吗?我将假设为“
no”,因为如果URL的数量接近无穷大,而您的网络恰好快而解析器却很慢,那么您的内存使用量也会:)。

因此,这将具有有限的并行性,但将通过下载顺序执行解析,而不是:

from twisted.internet import defer, taskfrom twisted.web.client import getPageBATCH_SIZE = 5def main_task(reactor):    def fetch_urls():        for url in get_urls(): yield getPage(url).addCallback(parse)    coop = task.Cooperator()    urls = fetch_urls()    return (defer.DeferredList([coop.coiterate(urls)         for _ in xrange(BATCH_SIZE)]) .addCallback(task_finished))task.react(main_task)

之所以可以这样做是因为

parse
(显然)返回
Deferred
,将其作为回调添加到所返回的
getPage
结果中,导致直到完成交易才
Deferred
调用由所添加的回调。
coiterate``parse

自从您问惯用的Twisted代码以来,我还自由地进行了一些现代化(使用

task.react
而不是手动运行Reactor,内联表达式使内容更简短等)。

如果您确实确实想拥有比并行获取更多的并行解析,那么类似的方法可能会更好:

from twisted.internet import defer, taskfrom twisted.web.client import getPagePARALLEL_FETCHES = 5PARALLEL_PARSES = 10def main_task(reactor):    parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)    def parseWhenReady(r):        def parallelParse(_): parse(r).addBoth(     lambda result: parseSemaphore.release().addCallback(         lambda _: result     ) )        return parseSemaphore.acquire().addCallback(parallelParse)    def fetch_urls():        for url in get_urls(): yield getPage(url).addCallback(parseWhenReady)    coop = task.Cooperator()    urls = fetch_urls()    return (defer.DeferredList([coop.coiterate(urls)         for _ in xrange(PARALLEL_FETCHES)]) .addCallback(lambda done:   defer.DeferredList(      [parseSemaphore.acquire()       for _ in xrange(PARALLEL_PARSES)]   )) .addCallback(task_finished))task.react(main_task)

您可以看到

parseWhenReady
return的
Deferred
返回值
acquire
,因此只要并行解析 开始
就可以继续进行并行提取,因此即使解析器过载,您也不会继续进行任意提取。但是,请
parallelParse
谨慎地避免
Deferred
返回由
parse
或返回的值
release
,因为随着这些 *** 作的进行,提取应该能够继续进行。

(请注意,由于您最初的示例无法运行,因此我根本没有测试过任何一个。希望即使有错误,意图也很清楚。)



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5650686.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存