MRJob的多个输入

MRJob的多个输入,第1张

MRJob的多个输入

如果您需要针对另一个(或相同的row_i,row_j)数据集处理原始数据,则可以:

1)创建一个S3存储桶以存储数据副本。将此副本的位置传递到您的任务类,例如,以下代码中的self.options.bucket和self.options.my_datafile_copy_location。警告:不幸的是,似乎整个文件在处理之前必须先“下载”到任务计算机。如果连接变弱或加载时间太长,此作业可能会失败。这是一些Python
/ MRJob代码来执行此 *** 作。

将其放在您的映射器函数中:

d1 = line1.split('t', 1)v1, col1 = d1[0], d1[1]conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)bucket = conn.get_bucket(self.options.bucket)  # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()### CAVEAT: Needs to get the whole file before processing the rest.for line2 in data_copy.split('n'):    d2 = line2.split('t', 1)    v2, col2 = d2[0], d2[1]    ## Now, insert pre to do any operations between v1 and v2 (or c1 and c2) here:    yield <your output key, value pairs>conn.close()

2)创建一个SimpleDB域,并将所有数据存储在那里。在boto和SimpleDB上阅读此处:http
:
//pre.google.com/p/boto/wiki/SimpleDbIntro

您的映射器代码如下所示:

dline = dline.strip()d0 = dline.split('t', 1)v1, c1 = d0[0], d0[1]sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)for item in domain:    v2, c2 = item.name, item['column']    ## Now, insert pre to do any operations between v1 and v2 (or c1 and c2) here:    yield <your output key, value pairs>sdb.close()

如果您有大量数据,则第二个选项可能会更好,因为它可以针对每一行数据而不是一次针对全部数据进行请求。请记住,SimpleDB值的最大长度不能超过1024个字符,因此,如果您的数据值比该值长,则可能需要通过某种方法进行压缩/解压缩。



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5632044.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存