MapReduce程序一般都需要包含三个部分:Mapper、Reducer和Driver。
注意:Mapper类中的map方法是每执行一行数据执行一次。
Reducer类中的reduce方法是相同的key值的数据执行一次。
具体泛型类型根据具体需求而定。
导入包时要认真,以免导错包,会影响输出结果。
自定义Mapper、Reduce和Driver具体实现如下:
自定义Mapper类:
Mapper类:(程序的Map阶段) import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //1、继承Mapper父类,并写入泛型根据具体情况确定泛型中填写的类型! public class MyMapper extends Mapper { //2、提升输出变量的作用范围,使它能够在类中任意使用,类型由输出类型确定,并记得确定权限,一般可写private,该示范代码是缺省。 Text k = new Text(); IntWritable v = new IntWritable(1);//此时设定输出的v值恒为1,该值根据具体情况而设定,也可不设置。 //3、实现父类Mapper中的map方法,在其中写自己具体的 *** 作,完成需求的功能。((map方法是 *** 作一行数据执行一次!!!!)) @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 3.1 获取一行数据,因为java中针对String类型提供了大量的 *** 作方法,因此将读取的一行数据转化为String类型可以更加便捷的 *** 作数据。 String line = value.toString(); // 3.2 切割读取到的那一行数据,利用.split("")方法,在括号内输入切割符,如:split(" ")-->按空格切割 split("t")-->按制表符切割 等等 String[] words = line.split(" "); // 3.3 输出,对切割后的一行数据的每一个单词数据以一对key,value的形式写出,如:k=a v=1... for (String word : words) {//增强for循环 k.set(word); context.write(k, v); //将k、v传到context对象中。 } } }
自定义Reducer类:
Reducer类:(程序的Rdeuce阶段) import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; //1、继承Reducer父类,并写入泛型<*, *, *, *> 泛型前两位是Map阶段泛型的后两位,Map阶段输出到Reduce阶段,即类型要一致!Reduce的泛型的输出类型根据具体情况确定泛型中的类型! public class MyReducer extends Reducer{ //2、提升输出变量的作用范围,使它能够在类中任意使用,类型由输出类型确定,并记得确定权限,一般可写private,该示范代码是缺省。 int sum;//统计每一个k对应的v的总合 IntWritable v = new IntWritable(); //3、实现父类Reducer中的reduce方法,在其中写自己具体的 *** 作,完成需求的功能。((reduce方法是将相同的key值的数据执行一次!!!!)) @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { // 3.1 累加求和 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 3.2 输出 v.set(sum); context.write(key,v); //将k、v传给context对象。 } }
自定义Driver类:
Driver类:驱动类 import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text;//Text类型为Hadoop定义下的,切勿导错,会使得程序无结果。 import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyDriver {//1、定义main函数,作为程序执行的入口 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息以及获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 关联本Driver程序的jar <本程序关联的WordcountDriver就是当前的WordcountDriver类> job.setJarByClass(MyDriver.class); // 3 关联Mapper和Reducer的jar <即指出Mapper类和Reducer类> job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); // 4 设置Mapper输出的kv类型 <即指出Mapper类里泛型的后两个泛型类型> job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 设置最终输出kv类型 <如果有Reduce阶段,就是指出Reduce类的输出类型,即Reduce类里泛型的后两个类型> <如果没有Reduce阶段,就是指出Mapper阶段输出的类型,即Mapper类里泛型的后两个> 一个MR程序可以没有Reduce阶段,这是需要看具体要求而定的! job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); 或写成 FileInputFormat.setInputPaths(job, new Path("具体输入路径")); //可直接写到某个文件夹,就会执行该文件夹里所有文件 FileOutputFormat.setOutputPath(job, new Path("具体输出路径")); //执行输出的文件夹不能是已存在的,在这里写的文件夹名会在程序执行时创建 // 7 提交job Job运行是通过job.waitForCompletion(true),true表示将运行进度等信息及时输出给用户,false的话只是等待作业结束 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)