如果您需要针对另一个(或相同的row_i,row_j)数据集处理原始数据,则可以:
1)创建一个S3存储桶以存储数据副本。将此副本的位置传递到您的任务类,例如,以下代码中的self.options.bucket和self.options.my_datafile_copy_location。警告:不幸的是,似乎整个文件在处理之前必须先“下载”到任务计算机。如果连接变弱或加载时间太长,此作业可能会失败。这是一些Python
/ MRJob代码来执行此 *** 作。
将其放在您的映射器函数中:
d1 = line1.split('t', 1)v1, col1 = d1[0], d1[1]conn = boto.connect_s3(aws_access_key_id=<AWS_ACCESS_KEY_ID>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)bucket = conn.get_bucket(self.options.bucket) # bucket = conn.get_bucket(MY_UNIQUE_BUCKET_NAME_AS_STRING)data_copy = bucket.get_key(self.options.my_datafile_copy_location).get_contents_as_string().rstrip()### CAVEAT: Needs to get the whole file before processing the rest.for line2 in data_copy.split('n'): d2 = line2.split('t', 1) v2, col2 = d2[0], d2[1] ## Now, insert pre to do any operations between v1 and v2 (or c1 and c2) here: yield <your output key, value pairs>conn.close()
2)创建一个SimpleDB域,并将所有数据存储在那里。在boto和SimpleDB上阅读此处:http
:
//pre.google.com/p/boto/wiki/SimpleDbIntro
您的映射器代码如下所示:
dline = dline.strip()d0 = dline.split('t', 1)v1, c1 = d0[0], d0[1]sdb = boto.connect_sdb(aws_access_key_id=<AWS_ACCESS_KEY>, aws_secret_access_key=<AWS_SECRET_ACCESS_KEY>)domain = sdb.get_domain(MY_DOMAIN_STRING_NAME)for item in domain: v2, c2 = item.name, item['column'] ## Now, insert pre to do any operations between v1 and v2 (or c1 and c2) here: yield <your output key, value pairs>sdb.close()
如果您有大量数据,则第二个选项可能会更好,因为它可以针对每一行数据而不是一次针对全部数据进行请求。请记住,SimpleDB值的最大长度不能超过1024个字符,因此,如果您的数据值比该值长,则可能需要通过某种方法进行压缩/解压缩。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)