您可以使用ID作为两个映射器的键来加入它们。您可以像这样编写地图任务
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException{ //Get the line //split the line to get ID seperate //word1 = A //word2 = 30 //Likewise for A ABC //word1 = A //word2 = ABC context.write(word1, word2);}
我认为您可以重新使用相同的Map任务。然后编写一个普通的Reducer作业,在该作业中Hadoop框架将关键数据分组。因此,您将能够获得ID作为密钥。您可以缓存其中一个值,然后进行连接。
String merge = "";public void reduce(Text key, Iterable<Text> values, Context context){ int i =0; for(Text value:values) { if(i == 0){ merge = value.toString()+","; } else{ merge += value.toString(); } i++; } valEmit.set(merge); context.write(key, valEmit);}
最后,您可以编写Driver类
public int run(String[] args) throws Exception { Configuration c=new Configuration(); String[] files=new GenericOptionsParser(c,args).getRemainingArgs(); Path p1=new Path(files[0]); Path p2=new Path(files[1]); Path p3=new Path(files[2]); FileSystem fs = FileSystem.get(c); if(fs.exists(p3)){ fs.delete(p3, true); } Job job = new Job(c,"Multiple Job"); job.setJarByClass(MultipleFiles.class); MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class); MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class); job.setReducerClass(MultipleReducer.class); . .}
您可以在此处找到示例
希望这可以帮助。
更新
输入1
A 30D 20
输入2
A ABCD EFGH
输出量
A ABC 30D EFGH 20
Mapper.java
import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> { Text keyEmit = new Text(); Text valEmit = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String parts[] = line.split(" "); keyEmit.set(parts[0]); valEmit.set(parts[1]); context.write(keyEmit, valEmit); }}
Reducer.java
import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class ReducerJoin extends Reducer<Text, Text, Text, Text> { Text valEmit = new Text(); String merge = ""; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String character = ""; String number = ""; for (Text value : values) { // ordering output String val = value.toString(); char myChar = val.charAt(0); if (Character.isDigit(myChar)) { number = val; } else { character = val; } } merge = character + " " + number; valEmit.set(merge); context.write(key, valEmit); }}
驾驶舱
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class Driver extends Configured implements Tool { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub // checking the arguments count if (args.length != 3) { System.err .println("Usage : <inputlocation> <inputlocation> <outputlocation> "); System.exit(0); } int res = ToolRunner.run(new Configuration(), new Driver(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub String source1 = args[0]; String source2 = args[1]; String dest = args[2]; Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", " "); // changing default // delimiter to user // input delimiter FileSystem fs = FileSystem.get(conf); Job job = new Job(conf, "Multiple Jobs"); job.setJarByClass(Driver.class); Path p1 = new Path(source1); Path p2 = new Path(source2); Path out = new Path(dest); MultipleInputs.addInputPath(job, p1, TextInputFormat.class, Mapper1.class); MultipleInputs.addInputPath(job, p2, TextInputFormat.class, Mapper1.class); job.setReducerClass(ReducerJoin.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); if (fs.exists(out)) fs.delete(out, true); TextOutputFormat.setOutputPath(job, out); boolean success = job.waitForCompletion(true); return success ? 0 : 1; }}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)