单词计数是MapReduce的入门程序,跟编程语言当中的“Hello world”一样。
案例讲解若干个文件当中,文件内容为若干个单词,要求计算出文件中每个单词的出现次数,且按照单词的字母顺序进行排序,每个单词和其出现次数占一行。
例如
hello world
hello hadoop
hello hdfs
hi hadoop
hi mapreduce
结果为
hadoop 2
hdfs 1
hello 3
hi 2
mapreduce 1
world 1
设计思路
MapReduce计算模型由三个阶段组成:Map阶段、Shuffle阶段、Reduce阶段。
通过Map任务读取HDFS中的数据块,这些数据将Map任务以完全并行化的方式处理,然后将Map任务的输出进行Shuffle后输入到Reduce任务中,最终Reduce任务将计算的结果输出到HDFS文件系统中。
单词计数当中:
在Map阶段,将接受到的数据进行内容分割,逐个划分出单词,将单词作为key,value为1输出。
在Shuffle阶段,由于单词作为Key,将单词进行分组,把相同的单词都归为一组,发送给Reduce。
在Reduce阶段,将接收到的Key-ValueList进行统计,负责计算单词出现次数并输出最终结果。
代码样例首先需要导入相关的依赖
org.apache.hadoop
hadoop-client
3.1.3 # hadoop版本
Mapper类
package com.mapreduce;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordcountMap extends Mapper { //自定义Mapper类
private static final IntWritable one = new IntWritable(1);
private static final Text word = new Text();
public void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
//默认根据空格,制表符\t,换行符\n,回车符\r分割字符串
StringTokenizer str = new StringTokenizer(value.toString());
//循环输出每个单词与数量
while (str.hasMoreTokens()) {
this.word.set(str.nextToken());
//输出每个单词与数量
context.write(this.word, one);
}
}
}
Reducer类
package com.mapreduce;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import java.io.IOException;
public class WordcountReducer extends Reducer{ //自定义Reducer类
public void reduce(Text key,Iterable values,
Reducer.Context context)
throws IOException,InterruptedException{
//统计单词总数
int sum = 0;
for(IntWritable v:values) {
sum=sum+v.get();
}
//输出统计结果
context.write(key,new IntWritable(sum));
}
}
驱动类
package com.mapreduce;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.Configuration;
public class Main {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//初始化Configuration类
String otherArgs[] = new GenericOptionsParser(conf, args).getRemainingArgs();//通过实例化对象GenericOptionsParser可以获得程序执行所传入的参数
if (otherArgs.length < 2) {
System.err.println("Usage:wordcount[...]");
System.exit(2);
}
//构建任务对象
Job job = Job.getInstance(conf, WordcountMap.class.getName());
job.setJarByClass(Main.class);
//设置Mapper类,Reducer类
job.setMapperClass(WordcountMap.class);
job.setReducerClass(WordcountReducer.class);
//设置Map任务输出类型,与map()方法一致
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置Reduce任务输出类型,与reduce()方法一致
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入、输出文件格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//设置需要统计的文件的输入路径
for (int i = 0; i < otherArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
//提交给hadoop集群
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
程序解读
Hadoop本身提供了一整套可序列化传输的基本数据类型,而不是直接使用JAVA的数据类型,Hadoop的IntWritable类型相当于JAVA的Integer类型,Text相当于JAVA的String类型。
WordcountMap是自定义的类,继承了MapReduce提供的Mapper类,并重写了其中的map方法。Mapper类是一个泛型类,有四个形参类型,分别指定map方法的输入key、输入value、输出key、输出value的类型。在单词计数当中,输入key是长整数的偏移量,输入value是文件当中的一行字符串,也就是若干个单词,经过划分后,输出的key就是单词,输出的value就是单词数量,划分出的是一个个单词,所以输出的value也就是单词数量为1。
WordcountReducer是自定义的类,继承了MapReduce提供的Reducer类,并重写了其中的reduce方法。Reducer类跟Mapper类一样,是一个泛型类,有四个形参类型,分别指定reduce方法的输入key、输入value、输出key、输出value的类型。Reducer类的输入参数类型必须匹配Mapper类的输出类型。在单词计数中,输入的参数是单个单词和单词数量的集合。输出的key就是单词,value就是集合中元素相加的总量,也就是单词的数量。
驱动类中,Main方法中的Configuration类用于读取Hadoop的配置文件,也可以使用set方法重新设置配置属性。Job对象就是配置需要的Mapper类、Reducer类、Map任务输出类型、Reduce任务输出类型、输入文件格式、输出文件格式、输入文件路径、输出文件路径等信息。设置好后直接提交到Hadoop集群。
程序运行使用命令
hadoop jar Main.jar com.mapreduce.Main /input/word.txt /output
运行程序
Main.jar为打包后的jar包名,com.mapreduce.Main为驱动类的包名和类名。input/word.txt为单词文件的路径名,output为输出结果的路径(路径存在会报错)。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)