MapReduce编程规范——Mapper、Reducer和Driver具体实现

MapReduce编程规范——Mapper、Reducer和Driver具体实现,第1张

MapReduce编程规范——Mapper、Reducer和Driver具体实现

MapReduce程序一般都需要包含三个部分:Mapper、Reducer和Driver。

注意:Mapper类中的map方法是每执行一行数据执行一次。

          Reducer类中的reduce方法是相同的key值的数据执行一次。

          具体泛型类型根据具体需求而定。

         导入包时要认真,以免导错包,会影响输出结果。

自定义Mapper、Reduce和Driver具体实现如下:

自定义Mapper类:

Mapper类:(程序的Map阶段)

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//1、继承Mapper父类,并写入泛型   根据具体情况确定泛型中填写的类型!
public class MyMapper extends Mapper{
    //2、提升输出变量的作用范围,使它能够在类中任意使用,类型由输出类型确定,并记得确定权限,一般可写private,该示范代码是缺省。
    Text k = new Text(); 
    IntWritable v = new IntWritable(1);//此时设定输出的v值恒为1,该值根据具体情况而设定,也可不设置。
    //3、实现父类Mapper中的map方法,在其中写自己具体的 *** 作,完成需求的功能。((map方法是 *** 作一行数据执行一次!!!!))
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         // 3.1 获取一行数据,因为java中针对String类型提供了大量的 *** 作方法,因此将读取的一行数据转化为String类型可以更加便捷的 *** 作数据。  
        String line = value.toString();   
         // 3.2 切割读取到的那一行数据,利用.split("")方法,在括号内输入切割符,如:split(" ")-->按空格切割  split("t")-->按制表符切割 等等  
        String[] words = line.split(" ");    
        // 3.3 输出,对切割后的一行数据的每一个单词数据以一对key,value的形式写出,如:k=a  v=1...  
         for (String word : words) {//增强for循环  
             k.set(word);   
            context.write(k, v); //将k、v传到context对象中。
         } 
    }
}

 自定义Reducer类:

Reducer类:(程序的Rdeuce阶段)

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//1、继承Reducer父类,并写入泛型<*, *, *, *>   泛型前两位是Map阶段泛型的后两位,Map阶段输出到Reduce阶段,即类型要一致!Reduce的泛型的输出类型根据具体情况确定泛型中的类型!
public class MyReducer extends Reducer{
    //2、提升输出变量的作用范围,使它能够在类中任意使用,类型由输出类型确定,并记得确定权限,一般可写private,该示范代码是缺省。
    int sum;//统计每一个k对应的v的总合
    IntWritable v = new IntWritable();
    //3、实现父类Reducer中的reduce方法,在其中写自己具体的 *** 作,完成需求的功能。((reduce方法是将相同的key值的数据执行一次!!!!)) 
    @Override 
    protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {    
        // 3.1 累加求和  
        sum = 0;  
        for (IntWritable count : values) {   
            sum += count.get();  
        }   
         // 3.2 输出        
         v.set(sum);  
        context.write(key,v); //将k、v传给context对象。
    }
}

自定义Driver类:

Driver类:驱动类

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;//Text类型为Hadoop定义下的,切勿导错,会使得程序无结果。
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyDriver {//1、定义main函数,作为程序执行的入口 
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  

    // 1 获取配置信息以及获取job对象  
    Configuration conf = new Configuration();  
    Job job = Job.getInstance(conf);  

    // 2 关联本Driver程序的jar   <本程序关联的WordcountDriver就是当前的WordcountDriver类>  
    job.setJarByClass(MyDriver.class);  

    // 3 关联Mapper和Reducer的jar   <即指出Mapper类和Reducer类>  
    job.setMapperClass(MyMapper.class);  
    job.setReducerClass(MyReducer.class);  

    // 4 设置Mapper输出的kv类型   <即指出Mapper类里泛型的后两个泛型类型>  
    job.setMapOutputKeyClass(Text.class);  
    job.setMapOutputValueClass(IntWritable.class);  

    // 5 设置最终输出kv类型
    <如果有Reduce阶段,就是指出Reduce类的输出类型,即Reduce类里泛型的后两个类型>
    <如果没有Reduce阶段,就是指出Mapper阶段输出的类型,即Mapper类里泛型的后两个>                                  
    一个MR程序可以没有Reduce阶段,这是需要看具体要求而定的!  
    job.setOutputKeyClass(Text.class);  
    job.setOutputValueClass(IntWritable.class);  
  
    // 6 设置输入和输出路径  
    FileInputFormat.setInputPaths(job, new Path(args[0]));  
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    或写成  
    FileInputFormat.setInputPaths(job, new Path("具体输入路径"));
    //可直接写到某个文件夹,就会执行该文件夹里所有文件  
    FileOutputFormat.setOutputPath(job, new Path("具体输出路径"));
    //执行输出的文件夹不能是已存在的,在这里写的文件夹名会在程序执行时创建  

    // 7 提交job    Job运行是通过job.waitForCompletion(true),true表示将运行进度等信息及时输出给用户,false的话只是等待作业结束  
    boolean result = job.waitForCompletion(true);  
    System.exit(result ? 0 : 1); 
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5665702.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存