- 1. MapReduce概述
- 1.1 MapReduce介绍
- 1.2 MapReduce特点
- 1.3 MapReduce架构
- 1.4 MapReduce编程
- 1.5 MapReduce工作流程
- 1.6 Hadoop序列化机制
- 1.6.1 什么是序列化
- 1.6.2 Hadoop的序列化机制
- 1.7 Hadoop中数据类型
- 2. MapReduce入门案例
- 2.1 业务需求(WordCount)
- 2.2 编程思路
- 2.3 开发环境搭建
- 2.4 编程实现
- 2.4.1 Mapper
- 2.4.2 Reducer
- 2.4.3 Driver
- 2.4.4 log4j
- 2.4.5 整体
- 3. MapReduce程序运行模式
- 3.1 运行模式
- 3.2 本地模式运行
- 3.3 集群模式运行
- 4. MapReduce流程梳理
- 4.1 MapReduce输入输出梳理
- 4.1.2 输入
- 4.1.2 输出
- 4.2 WordCount案例执行流程图
- 4.3 Map阶段执行过程
- 4.4 Reduce阶段执行过程
MapReduce的思想核心是 “先分再合,分而治之” 。
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
一个完整的mapreduce程序在分布式运行时有三类实例进程:
- MRAppMaster:负责整个程序的过程调度及状态协调
- MapTask:负责map阶段的整个数据处理流程
- ReduceTask:负责reduce阶段的整个数据处理流程
MapReduce分布式的运算程序分成2个阶段,分别是Map阶段和Reduce阶段。Map阶段对应的是MapTask并发实例,完全并行运行。Reduce阶段对应的是ReduceTask并发实例,数据依赖于上一个阶段所有MapTask并发实例的数据输出结果。
MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端驱动)。
用户自定义的Mapper和Reducer都要继承各自的父类。Mapper中的业务逻辑写在map()方法中,Reducer的业务逻辑写在reduce()方法中。整个程序需要一个Driver来进行提交,提交的是一个描述了各种必要信息的job对象。
注意:整个MapReduce程序中,数据都是以kv键值对的形式流转的。因此在实际编程解决各种业务问题中,需要考虑每个阶段的输入输出的kv分别是什么。并且在MapReduce中数据会因为某些默认的机制进行排序进行分组。所以说kv的类型数据确定极为重要。
工作流程分为3个阶段:map、shuffle、reduce
- map阶段:
负责从数据源读取数据进行处理,默认情况下读取数据返回的是kv键值对类型,经过自定义map方法处理之后,输出的也应该是kv键值对类型。 - shuffle阶段:
map输出的数据经过分区、排序、分组等动作后重组,相当于洗牌的逆过程,这是MapReduce的核心!
默认分区规则:key相同的分在同一个分区,同一个分区被同一个reduce处理。
默认排序规则:根据key字典序排序
默认分组规则:key相同的分为一组,一组调用reduce处理一次。 - reduce阶段:
负责对shuffle后的数据进行聚合处理。输出的结果也是kv键值对。
序列化 (Serialization):是将结构化的对象转换成字节流以便于进行网络传输或写入持久存储的过程。
反序列化(Deserialization):是将字节流转换为一系列结构化的对象的过程,重新创建该对象。
简单的说:把对象转换为字节序列的过程称为对象的序列化。把字节序列恢复为对象的过程称为对象的反序列化。
序列化的用途:
- 作为一种持久化格式。
- 作为一种通信的数据格式。
- 作为一种数据拷贝、克隆机制。
Hadoop的序列化不采用java的序列化机制,而是实现了自己的序列化机制。 Hadoop采用的运算模型是将任务分散在不同节点上进行初步处理;然后将分节点上已经计算完毕的任务汇总进行合并 *** 作。在集群节点间通信时需要频繁的序列化和反序列化,因此要求序列化速度要快、体积要小、占用网络带宽要小,使数据适合远程传输并保证传输后数据的质量
原因:
- java的序列化机制比较臃肿,重量级,是不断的创建对象的机制,并且会额外附带很多信息(校验、继承关系系统等)。
- Hadoop的序列化机制中,用户可以复用对象,这样就减少了java对象的分配和回收,提高了应用效率。
Hadoop通过Writable接口实现的序列化机制,Writable接口提供两个方法(write和readFields)。write叫做序列化方法,用于把对象指定的字段写出去;readFields叫做反序列化方法,用于从字节流中读取字段重构对象。
Hadoop没有提供对象比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。
Hadoop序列化特点:高效、紧凑、扩展性强。
1.7 Hadoop中数据类型Hadoop提供了如下数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。
举个例子:
package dut.mr.covid_19; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class covidcount implements WritableComparable2. MapReduce入门案例 2.1 业务需求(WordCount){ private long cases; private long deaths; @Override public void write(DataOutput out) throws IOException { out.writeLong(cases); out.writeLong(deaths); } @Override public void readFields(DataInput in) throws IOException { this.cases = in.readLong(); this.deaths = in.readLong(); } @Override public int compareTo(covidcount o) { return this.cases - o.getCases() > 0 ? -1 : (this.cases - o.cases < 0 ? 1 : 0); } } public covidcount() { } public long getCases() { return cases; } // 封装一个set方法,用于对象属性赋值 public void set(long cases, long deaths) { this.cases = cases; this.deaths = deaths; } public void setCases(long cases) { this.cases = cases; } public long getDeaths() { return deaths; } public void setDeaths(long deaths) { this.deaths = deaths; } @Override public String toString() { return cases + "t" + deaths; }
统计指定文件中,每个单词出现的总次数。
- map阶段的核心:
把输入的数据经过切割,全部标记1。因此输出的kv就是<单词,1>。 - shuffle阶段核心:
经过默认的排序分区分组,key相同的单词会作为一组数据构成新的kv对。 - reduce阶段核心:
处理shuffle完的一组数据,该组数据就是该单词所有的键值对。对所有的1进行累加,得到单词的总次数。
这部分我会做一个专门针对环境讲解。
2.4 编程实现 2.4.1 Mapperpackage dut.mr.wordcount; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper2.4.2 Reducer{ private final static LongWritable outvalue = new LongWritable(1); private Text outkey =new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //拿取一行数据String String line=value.toString(); //一行 String[] words=line.split("\s+"); //空格划分单词 //遍历输出 for(String word : words) { //输出数据,把每个单词标记1 输出结果<单词,1> outkey.set(word); context.write(outkey, outvalue); } } }
package dut.mr.wordcount; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer2.4.3 Driver{ private LongWritable outValue =new LongWritable(); @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { // 统计变量 long count=0; // 遍历该组的values for(LongWritable value : values) { count +=value.get(); } outValue.set(count); //输出结果 context.write(key,outValue); } }
这个是比较老的版本写法,我称它为V1版本。
package dut.mr.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.BasicConfigurator; public class WordCountDriver_v1 { public static void main(String[] args) throws Exception { // 自动快速地使用缺省Log4j环境 BasicConfigurator.configure(); // 创建配置对象 Configuration conf = new Configuration(); // 构建job作业实例,参数(配置对象,job名字) Job job = Job.getInstance(conf, WordCountDriver_v1.class.getSimpleName()); // 设置mr程序运行的主类 job.setJarByClass(WordCountDriver_v1.class); // 设置本次mr程序的mapper类型,reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 指定mapper 阶段输出的key value数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 指定reduce阶段输出的key,value类型,也是mr程序最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);、 // 配置本次作业的输入输出数据路径 Path input = new Path(args[0]); Path output = new Path(args[1]); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); // 判断输出路径是否存在,若存在先删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } // 提交本次job作业 // job.submit(); //不能实时监视 boolean resultflag = job.waitForCompletion(true); // 是否实时监视 // 退出程序,对job进行绑定 System.exit(resultflag ? 0 : 1); // 0正常退出,1异常退出 } }
这个是新版本写法,我称它为V2版本。后续我还是沿用V1版本来讲解
package dut.mr.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.BasicConfigurator; public class WordCountDriver_v2 extends Configured implements Tool { public static void main(String[] args) throws Exception { // 自动快速地使用缺省Log4j环境 BasicConfigurator.configure(); // 创建配置对象 Configuration conf = new Configuration(); // 本地模式还是yarn模式,默认本地local conf.set("mapreduce.framework.name", "local"); // 使用工具类ToolRunner提交程序 int status = ToolRunner.run(conf, new WordCountDriver_v2(), args); System.exit(status); } @Override public int run(String[] args) throws Exception { // 构建job作业实例,参数(配置对象,job名字) Job job = Job.getInstance(getConf(), WordCountDriver_v2.class.getSimpleName()); // 设置mr程序运行的主类 job.setJarByClass(WordCountDriver_v2.class); // 设置本次mr程序的mapper类型,reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 指定mapper 阶段输出的key value数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 指定reduce阶段输出的key,value类型,也是mr程序最终输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 配置本次作业的输入输出数据路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }2.4.4 log4j
log4j是用来在控制台输出显示有关MapReduce运行过程中的信息
hadoop.root.logger=INFO, console # out level |name is console log4j.rootLogger = INFO, console #data to console//ERROR WARN INFO DEBUG log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n2.4.5 整体 3. MapReduce程序运行模式 3.1 运行模式
运行模式讲的是:mr程序是单机运行还是分布式运行?mr程序需要的运算资源是yarn分配还是单机系统分配?
运行在何种模式 取决于下述这个参数:(默认为local)
mapreduce.framework.name=yarn //集群模式 mapreduce.framework.name=local //本地模式
在HDFS配置mapred-default.xml中定义。如果代码中有设置则会覆盖环境配置、
3.2 本地模式运行- mr程序是被提交给LocalJobRunner在本地以单进程的形式运行。
- 处理的数据及输出结果可以在本地文件系统,也可以在hdfs上。
- 将mapreduce程序提交给yarn集群,分发到很多的节点上并发执行。
- 处理的数据和输出结果应该位于hdfs文件系统
- 提交集群的实现步骤:
将程序打成jar包,然后在集群的任意一个节点上用命令启动
hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver args
yarn jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver args
// 判断输出路径是否存在,若存在先删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); }4.2 WordCount案例执行流程图 4.3 Map阶段执行过程
- 第一阶段是把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下,Split size = Block size。每一个切片由一个MapTask处理。(getSplits)
- 第二阶段是对切片中的数据按照一定的规则解析成
对。默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节),value是本行的文本内容。(TextInputFormat) - 第三阶段是调用Mapper类中的map方法。上阶段中每解析出来的一个
,调用一次map方法。 - 第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务。
- 第五阶段是对每个分区中的键值对进行排序。首先,按照k进行排序,对于k相同的键值对,按照v进行排序。比如三个键值对<2,2>、<1,3>、<2,1>。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接写入到磁盘中。
- 第六阶段是对数据进行局部聚合处理,也就是combiner处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。本阶段默认是没有的。
- 第一阶段是Reducer任务会主动从Mapper任务读取其输出的键值对。Mapper任务可能会有很多,因此Reducer会读取多个Mapper的输出。
- 第二阶段是把读取到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
- 第三阶段是对排序后的键值对调用reduce方法。k相等的键值对调用一次reduce方法,最后把这些输出的键值对写入到HDFS文件中。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)