您的输入文件可能超出了软内存的大小限制。对于大文件,请使用
BlobstoreLineInputReader或
BlobstoreZipLineInputReader。
这些输入阅读器传递与
map函数不同的东西,它们传递
start_position文件和文本行中的。
您的
map函数可能类似于:
def time_count_map(data): """Time count map function.""" text = data[1] try: reader = csv.reader([text.replace('', '')], skipinitialspace=True) for s in reader: """Calculate time elapsed""" sdw = s[1] start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") edw = s[2] end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") time_difference = time.mktime(end_date) - time.mktime(start_date) yield (s[0], time_difference) except IndexError, e: logging.debug(e)
使用
BlobstoreLineInputReader可以使作业更快地运行,因为它可以使用多个分片(最多256个),但这意味着您需要上传未压缩的文件,这可能会很麻烦。我通过将压缩文件上传到EC2
Windows服务器来处理它,然后从那里解压缩并上传,因为上游带宽很大。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)