Hadoop多个输入

Hadoop多个输入,第1张

Hadoop多个输入

您可以使用ID作为两个映射器的键来加入它们。您可以像这样编写地图任务

public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException{    //Get the line    //split the line to get ID seperate    //word1 = A     //word2 = 30     //Likewise for A ABC        //word1 = A        //word2 = ABC    context.write(word1, word2);}

我认为您可以重新使用相同的Map任务。然后编写一个普通的Reducer作业,在该作业中Hadoop框架将关键数据分组。因此,您将能够获得ID作为密钥。您可以缓存其中一个值,然后进行连接。

String merge = "";public void reduce(Text key, Iterable<Text> values, Context context){    int i =0;    for(Text value:values)    {        if(i == 0){ merge = value.toString()+",";        }        else{ merge += value.toString();        }        i++;    }    valEmit.set(merge);    context.write(key, valEmit);}

最后,您可以编写Driver类

public int run(String[] args) throws Exception { Configuration c=new Configuration(); String[] files=new GenericOptionsParser(c,args).getRemainingArgs(); Path p1=new Path(files[0]); Path p2=new Path(files[1]); Path p3=new Path(files[2]); FileSystem fs = FileSystem.get(c); if(fs.exists(p3)){  fs.delete(p3, true);  } Job job = new Job(c,"Multiple Job"); job.setJarByClass(MultipleFiles.class); MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class); MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class); job.setReducerClass(MultipleReducer.class); . .}

您可以在此处找到示例

希望这可以帮助。


更新

输入1

A 30D 20

输入2

A ABCD EFGH

输出量

A ABC 30D EFGH 20

Mapper.java

import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {    Text keyEmit = new Text();    Text valEmit = new Text();    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        String line = value.toString();        String parts[] = line.split(" ");        keyEmit.set(parts[0]);        valEmit.set(parts[1]);        context.write(keyEmit, valEmit);    }}

Reducer.java

import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class ReducerJoin extends Reducer<Text, Text, Text, Text> {    Text valEmit = new Text();    String merge = "";    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {        String character = "";        String number = "";        for (Text value : values) { // ordering output String val = value.toString(); char myChar = val.charAt(0); if (Character.isDigit(myChar)) {     number = val; } else {     character = val; }        }        merge = character + " " + number;        valEmit.set(merge);        context.write(key, valEmit);    }}

驾驶舱

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class Driver extends Configured implements Tool {    public static void main(String[] args) throws Exception {        // TODO Auto-generated method stub        // checking the arguments count        if (args.length != 3) { System.err         .println("Usage : <inputlocation>  <inputlocation>  <outputlocation> "); System.exit(0);        }        int res = ToolRunner.run(new Configuration(), new Driver(), args);        System.exit(res);    }    @Override    public int run(String[] args) throws Exception {        // TODO Auto-generated method stub        String source1 = args[0];        String source2 = args[1];        String dest = args[2];        Configuration conf = new Configuration();        conf.set("mapred.textoutputformat.separator", " "); // changing default     // delimiter to user     // input delimiter        FileSystem fs = FileSystem.get(conf);        Job job = new Job(conf, "Multiple Jobs");        job.setJarByClass(Driver.class);        Path p1 = new Path(source1);        Path p2 = new Path(source2);        Path out = new Path(dest);        MultipleInputs.addInputPath(job, p1, TextInputFormat.class,     Mapper1.class);        MultipleInputs.addInputPath(job, p2, TextInputFormat.class,     Mapper1.class);        job.setReducerClass(ReducerJoin.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        job.setOutputFormatClass(TextOutputFormat.class);                if (fs.exists(out)) fs.delete(out, true);        TextOutputFormat.setOutputPath(job, out);        boolean success = job.waitForCompletion(true);        return success ? 0 : 1;    }}


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5093957.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-16
下一篇 2022-11-16

发表评论

登录后才能评论

评论列表(0条)

保存