就目前而言,在我看来,这段代码的并行下载数量有限,但是并行解析作业的数量却不受限制。那是故意的吗?我将假设为“
no”,因为如果URL的数量接近无穷大,而您的网络恰好快而解析器却很慢,那么您的内存使用量也会:)。
因此,这将具有有限的并行性,但将通过下载顺序执行解析,而不是:
from twisted.internet import defer, taskfrom twisted.web.client import getPageBATCH_SIZE = 5def main_task(reactor): def fetch_urls(): for url in get_urls(): yield getPage(url).addCallback(parse) coop = task.Cooperator() urls = fetch_urls() return (defer.DeferredList([coop.coiterate(urls) for _ in xrange(BATCH_SIZE)]) .addCallback(task_finished))task.react(main_task)
之所以可以这样做是因为
parse(显然)返回了
Deferred,将其作为回调添加到所返回的
getPage结果中,导致直到完成交易才
Deferred调用由所添加的回调。
coiterate``parse
自从您问惯用的Twisted代码以来,我还自由地进行了一些现代化(使用
task.react而不是手动运行Reactor,内联表达式使内容更简短等)。
如果您确实确实想拥有比并行获取更多的并行解析,那么类似的方法可能会更好:
from twisted.internet import defer, taskfrom twisted.web.client import getPagePARALLEL_FETCHES = 5PARALLEL_PARSES = 10def main_task(reactor): parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES) def parseWhenReady(r): def parallelParse(_): parse(r).addBoth( lambda result: parseSemaphore.release().addCallback( lambda _: result ) ) return parseSemaphore.acquire().addCallback(parallelParse) def fetch_urls(): for url in get_urls(): yield getPage(url).addCallback(parseWhenReady) coop = task.Cooperator() urls = fetch_urls() return (defer.DeferredList([coop.coiterate(urls) for _ in xrange(PARALLEL_FETCHES)]) .addCallback(lambda done: defer.DeferredList( [parseSemaphore.acquire() for _ in xrange(PARALLEL_PARSES)] )) .addCallback(task_finished))task.react(main_task)
您可以看到
parseWhenReadyreturn的
Deferred返回值
acquire,因此只要并行解析 开始
就可以继续进行并行提取,因此即使解析器过载,您也不会继续进行任意提取。但是,请
parallelParse谨慎地避免
Deferred返回由
parse或返回的值
release,因为随着这些 *** 作的进行,提取应该能够继续进行。
(请注意,由于您最初的示例无法运行,因此我根本没有测试过任何一个。希望即使有错误,意图也很清楚。)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)