模块中有
urlretrieve(url, filename=None, reporthook=None,data=None)功能
urllib。如果将
reporthook-function /
object实现为令牌存储桶或泄漏存储桶,则将具有全局速率限制。
编辑:
仔细检查后,我发现进行全球限速并不
reporthook像我想的那么容易。
reporthook仅给出下载量和总大小,仅靠它们本身不足以提供与令牌桶配合使用的信息。解决该问题的一种方法是,在每个速率限制器中存储最后下载的数量,但使用全局令牌桶。
编辑2:将 两个代码合并为一个示例。
"""Rate limiters with shared token bucket."""import osimport sysimport threadingimport timeimport urllibimport urlparseclass TokenBucket(object): """An implementation of the token bucket algorithm. source: http://pre.activestate.com/recipes/511490/ >>> bucket = TokenBucket(80, 0.5) >>> print bucket.consume(10) True >>> print bucket.consume(90) False """ def __init__(self, tokens, fill_rate): """tokens is the total tokens in the bucket. fill_rate is the rate in tokens/second that the bucket will be refilled.""" self.capacity = float(tokens) self._tokens = float(tokens) self.fill_rate = float(fill_rate) self.timestamp = time.time() self.lock = threading.RLock() def consume(self, tokens): """Consume tokens from the bucket. Returns 0 if there were sufficient tokens, otherwise the expected time until enough tokens become available.""" self.lock.acquire() tokens = max(tokens,self.tokens) expected_time = (tokens - self.tokens) / self.fill_rate if expected_time <= 0: self._tokens -= tokens self.lock.release() return max(0,expected_time) @property def tokens(self): self.lock.acquire() if self._tokens < self.capacity: now = time.time() delta = self.fill_rate * (now - self.timestamp) self._tokens = min(self.capacity, self._tokens + delta) self.timestamp = now value = self._tokens self.lock.release() return valueclass RateLimit(object): """Rate limit a url fetch. source: http://mail.python.org/pipermail/python-list/2008-January/472859.html (but mostly rewritten) """ def __init__(self, bucket, filename): self.bucket = bucket self.last_update = 0 self.last_downloaded_kb = 0 self.filename = filename self.avg_rate = None def __call__(self, block_count, block_size, total_size): total_kb = total_size / 1024. downloaded_kb = (block_count * block_size) / 1024. just_downloaded = downloaded_kb - self.last_downloaded_kb self.last_downloaded_kb = downloaded_kb predicted_size = block_size/1024. wait_time = self.bucket.consume(predicted_size) while wait_time > 0: time.sleep(wait_time) wait_time = self.bucket.consume(predicted_size) now = time.time() delta = now - self.last_update if self.last_update != 0: if delta > 0: rate = just_downloaded / delta if self.avg_rate is not None: rate = 0.9 * self.avg_rate + 0.1 * rate self.avg_rate = rate else: rate = self.avg_rate or 0. print "%20s: %4.1f%%, %5.1f KiB/s, %.1f/%.1f KiB" % ( self.filename, 100. * downloaded_kb / total_kb, rate, downloaded_kb, total_kb, ) self.last_update = nowdef main(): """Fetch the contents of urls""" if len(sys.argv) < 4: print 'Syntax: %s rate url1 url2 ...' % sys.argv[0] raise SystemExit(1) rate_limit = float(sys.argv[1]) urls = sys.argv[2:] bucket = TokenBucket(10*rate_limit, rate_limit) print "rate limit = %.1f" % (rate_limit,) threads = [] for url in urls: path = urlparse.urlparse(url,'http')[2] filename = os.path.basename(path) print 'Downloading "%s" to "%s"...' % (url,filename) rate_limiter = RateLimit(bucket, filename) t = threading.Thread( target=urllib.urlretrieve, args=(url, filename, rate_limiter)) t.start() threads.append(t) for t in threads: t.join() print 'All downloads finished'if __name__ == "__main__": main()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)