MapReduce专题

MapReduce专题,第1张

1. 概念 1.1 设计构思

MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发布在Hadoop集群上;核心思想是分而治之,即把复杂的任务分解为若干个“简单的任务”来并行处理,最后对这些小任务的结果进行全局汇总,适合并行计算相互间不具有计算依赖关系的大数据,但不可拆分的计算任务或相互间有依赖关系的数据无法进行并行计算

1.2 抽象模型:Map和Reduce

MapReduce定义了如下的Map和Reduce两个抽象的编程接口

  • Map:(k1;v1)→[(k2;v2)],负责“分”即对一组数据元素进行某种重复式的处理;
  • Reduce:(k2;[v2]→[(k3;v3)]),负责“合”即对Map的中间结果进行某种进一步的结果整理
1.2.1 Mapper抽象类

若要实现特定的功能,可以覆写一个继承自Mapper的Java类,然后重写里面的一些方法,其中比较重要的有以下四个方法

  • setup方法

    属于Mapper类的初始化方法,用于实现对一些对象的初始化工作

  • map方法

    当读取每一行数据时都会调用一次map方法,用来实现处理每一条数据

  • cleanup方法

    在整个MapTask执行完成之后会调用该方法,主要用于做一些清理工作,例如断开连接,关闭资源

  • run方法

    覆写该方法能够实现对所有的MapTask更精准的 *** 作控制

1.2.2 Reducer抽象类
  • setup方法

    属于Reducer类的初始化方法,用于实现对一些对象的初始化工作

  • reduce方法

    所有从MapTask发送来的数据,都会调用reduce方法,实现对数据的处理

  • cleanup方法

    在整个ReduceTask执行完成之后会调用该方法,主要用于做一些清理工作,例如断开连接,关闭资源

  • run方法

    覆写该方法能够实现对所有的ReduceTask更精准的 *** 作控制

2. 架构 2.1 框架结构

  • MR AppMaster:负责整个程序的过程调度及状态协调
  • MapTask:负责map阶段的整个数据处理流程
  1. 读取数据组件InputFormat(默认TestInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到block,有多少个block就对应启动多少个MapTask

  2. 将输入文件切分为block之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回Key表示每行首字符偏移值,Value表示这一行文本内容)

  3. 读取block返回,进入用户自己继承的Mapper类中,执行用户重写的map函数,RecordReader读取一行调用这里一次

  4. Mapper逻辑结束后,将Mapper的每条结果通过context.write进行collect数据收集(在collect中,会先对其进行分区处理,默认使用HashPartitioner

    • MapReduce提供Partitioner接口,作用是根据KeyValueReducer的数量来决定当前的这对输出数据最终应该交给哪个ReduceTask处理
  5. KeyValue的值序列化成字节数组后写入内存(内存中这片区域叫环形缓冲区,作用是批量收集Mapper结果(Key/Value对和Partition的结果),减少磁盘IO的影响)

    • 环形缓冲区其实是一个存放Key/Value的序列化数据和Key/Value的元数据信息(包括Partition/Key的起始位置、Value的起始位置以及Value的长度)的数组
    • 当Mapper的输出结果很多时,可能会超出缓冲区的内存限制(默认100MB),此时会执行Spill(溢写),即在一定条件下(整个缓冲区存在溢写比例spill.percent,默认是0.8即缓冲区阈值默认为80MB)将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区
    • 溢写由单独线程来完成,不影响往缓冲区中写Mapper结果的线程,即溢写线程启动时不应该阻止Mapper的结果输出,即当缓冲区的数据大小超过默认的阈值80MB时,启动溢写线程锁定这80MB的内存后执行溢写过程,但Mapper的输出结果还可以往剩下的20MB内存继续写。
  6. 当溢写线程启动后,需要对锁定的内存即80MB空间内的Key做排序(排序是MapReduce模型默认的行为,也是对序列化字节做的排序)

    • 在这里可以使用Combiner对有相同KeyKey/Value对的Value合并起来,从而减少溢写到磁盘的数据量(可以在整个模型中多次使用),由于Combiner的输出是Reducer的输入,所以Combiner只能用于Reduce的输入Key/Value与输出Key/Value类型完全一致,且不影响最终结果的场景,例如累加,求最大值
  7. 由于每次溢写都会在磁盘上生成一个临时文件(写之前会判断是否有Combiner),且当整个数据处理结束之后只会有一个最终的文件,因此当Mapper输出结果很大需要发生多次溢写时就需要合并溢写文件

  • ReduceTask:负责reduce阶段的整个数据处理流程
  1. Copy:通过HTTP方式请求maptask获取属于自己的文件

  2. MergeCopy过来的数据会先放入内存缓冲区中(大小比map端的更为灵活),Merge有三种形式:

    1. 内存到内存(默认情况下不启用)
    2. 内存到磁盘:当内存的数据量到达阈值时,会启用溢写(如果设置Combiner也会启用),然后会在磁盘中产生众多的溢写文件,直到没有map端的数据时才会结束运行此Merge
    3. 磁盘到磁盘:当第二种Merge结束时,会启用此Merge方式合并生成最终的文件
  3. 合并排序:将分散的数据合并成一个大的数据后,还会再对合并后的数据排序

  4. 对排序后的键值对调用reduce方法:让键相等的键值对调用一次reduce方法,每次调用时会产生零个或多个键值对,最后把这些输出的键值对写入到HDFS文件中

2.2 编程规范 2.2.1 Map阶段
  1. 设置InputFormat类,将数据切分为Key-Value(K1和V1)键值对,输入到第二步
  2. 自定义Map逻辑,将第一步的结果转换成另外的Key-Value(K2和V2)键值对,输出结果
2.2.2 Shuffle阶段
  1. 对Map输出的Key-Value键值对进行分区
  2. 对不同分区的数据按照相同的Key排序
  3. (可选)对分组过的数据初步规约(降低数据的网络拷贝)
  4. 对数据进行分组,相同Key的Value放在一个集合中
2.2.3 Reduce阶段
  1. 对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的Key-Value键值对进行处理,转化为新的Key-Value(K3和V3)键值对输出
  2. 设置OutputFormat处理并保存Reduce输出的Key-Value数据
3. 简单使用 3.1 MapReduce程序运行模式 3.1.1 本地运行模式
  • 程序是提交给LocalJobRunner在本地以单进程的形式运行
  • 处理的数据及输出结果可以在本地文件系统,也可以在hdfs
  • 不带集群的配置文件本质上是程序的conf是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname=local参数
  • 本地模式便于进行业务逻辑的debug,在IDEA中仅需打断点即可
3.1.2 集群运行模式
  • 程序提交给yarn集群,分发到很多的节点上并发执行
  • 处理的数据和输出结果应该位于hdfs文件系统
  • 提交集群的实现步骤:
    1. 将程序打成jar包
    2. 在集群的任意一个节点上用hadoop命令启动
3.2 WordCount

需求是在一堆给定的文本文件中统计输出每一个单词出现的总次数

3.2.1 mapper类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**mapper 程序: 继承 mapper类并传入四个类型: 
*keyin : k1 Long ---- LongWritable
*valin : v1 String ------ Text
*keyout : k2 String ------- Text
*valout : v2 Long -------LongWritable
*/
public class MapTask extends Mapper<LongWritable,Text,Text,LongWritable> {
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
	// 1. 获取 v1 中数据
	String val = value.toString();
	// 2. 切割数据
	String[] words = val.split(" ");
	Text text = new Text();
	LongWritable longWritable = new LongWritable(1);
	// 3. 遍历循环, 发给 reduce
	for (String word : words) {
		text.set(word);
		context.write(text,longWritable);
		}
	}
}
3.2.2 reducer类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

/**reducer程序: 继承 reducer类并传入四个类型:
* KEYIN : k2 -----Text
* VALUEIN : v2 ------LongWritable
* KEYOUT : k3 ------ Text
* VALUEOUT : v3 ------ LongWritable
*/
public class ReducerTask extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
	protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
	// 1. 遍历 values 获取每一个值
	long v3 = 0;
	for (LongWritable longWritable : values) {
		v3 += longWritable.get(); //1
		}
	// 2. 输出
	context.write(key,new LongWritable(v3));
	}
}
3.2.3 主类

用来描述job并提交job

import com.sun.org.apache.bcel.internal.generic.NEW;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

// 任务的执行入口: 将八步组合在一起
public class JobMain extends Configured implements Tool {
	// 在 run 方法中编写组装八步
	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(super.getConf(), "JobMain");
		// 如果提交到集群 *** 作. 需要添加一步 : 指定入口类
		job.setJarByClass(JobMain.class);
		// 1. 封装第一步: 读取数据
		job.setInputFormatClass(TextInputFormat.class);
		TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount.txt
"));
		// 2. 封装第二步: 自定义 map 程序
		job.setMapperClass(MapTask.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		// 3. 第三步 第四步 第五步 第六步 省略
		// 4. 第七步: 自定义 reduce 程序
		job.setReducerClass(ReducerTask.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		// 5. 第八步 : 输出路径是一个目录, 而且这个目录必须不存在的
		job.setOutputFormatClass(TextOutputFormat.class);
		TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/output"));
		// 6. 提交任务:
		boolean flag = job.waitForCompletion(true); // 成功 true 不成功 false
		return flag ? 0 : 1;
	}
	public static void main(String[] args) throws Exception {
		Configuration configuration = new Configuration();
		JobMain jobMain = new JobMain();
		int i = ToolRunner.run(configuration, jobMain, args); //返回值 退出码
		System.exit(i); // 退出程序 0 表示正常 其他值表示有异常 1
}
}
4. 应用场景

计算URL的访问频率

搜索引擎的使用中,会遇到大量的URL的访问,可以使用MapReduce来进行统计,得出(URL,次数)结果,以便在后续的分析中使用

倒排索引

Map函数去分析文件格式是(词,文档号)的列表,Reduce函数就分析这个(词,文档号),排序所有的文档号,输出(词,list(文档号)),这个就可以形成一个简单的倒排索引,是一种简单的算法跟踪词在文档中的位置

Top K问题

在各种的文档分析,或者是不同的场景中,经常会遇到关于Top K的问题,例如输出某个文档中出现前5个最多的词汇。

5. 原理优化 5.1 小文件优化

对于MapReduce,一个map默认处理一个分片或者一个小文件,如果map的启动时间比数据处理的时间要长,就会降低性能,而且在map端溢写磁盘的时候每一个map最终会产生reduce数量个数的中间结果,所以如果map的数量较多时,就会生成很多临时文件,而且在reduce拉取数据的时候会增加磁盘的IO

5.1.1 提交阶段

上传的 jar、分片信息,等资源信息会提交到 hdfs 的临时目录中,默认会有10个副本,但是可以通过参数 mapreduce.client.submit.file.replication 控制副本的数量

5.1.2 MapTask阶段

Split 大小:一个 split 会启动一个 MapTask 执行,所以需要合理设置 split 大小;Math.max(minSize, Math.min(maxSize, blockSize))
MapTask 资源:mapreduce.map.memory.mb 默认1G, mapreduce.map.cpu.vcores 默认为 1
Map 任务向磁盘溢写之前会先写入环形缓冲区,可以较少溢写的次数,减少磁盘IO;当达到磁盘文件达到一定量时,会进行小文件的合并,可以增大合并文件的数量;

设置环形缓冲区大小:mapreduce.task.io.sort.mb # 排序map输出所需要使用内存缓冲的大小,以兆为单位, 默认为100

设置环形缓冲区溢写阈值:mapreduce.map.sort.spill.percent # map输出缓冲和用来磁盘溢写过程的记录边界索引,这两者使用的阈值,默认0.8

设置合并磁盘文件个数:mapreduce.task.io.sort.factor # 排序文件时,一次最多合并的文件数,默认10

Map 输出文件的数据压缩可以减少数据量的大小:mapreduce.map.output.compress # 在map溢写磁盘的过程是否使用压缩,默认false

压缩的方式:mapreduce.map.output.compress.codec

5.1.3 ReduceTask阶段

Reduce 个数设置:mapreduce.job.reduces
Reduce 资源设置:mapreduce.reduce.memory.mb , mapreduce.reduce.cpu.vcores
Reduce 进行数据拉取时,可以设置拉取时的并行度 : mapreduce.reduce.shuffle.parallelcopies # 把map输出复制到reduce的线程数,默认5
Reduce Task 下载任务时,可以设置最大的下载时间段 : mapreduce.reduce.shuffle.read.timeout
Reduce Task 拉取过来的数据会先放入内存缓冲区,当达到一定量之后才溢写到磁盘; 该内存基于 JVM 的堆大小;mapreduce.reduce.s huffle.input.buffer.percent(default 0.7) shuffle 在 reduce 内存中的最大值为: 0.7 × maxHeap of reduce task,内存到磁盘的阈值:mapreduce.reduce.shuffle.merge.percent(default0.66)
对于拉取的数据,进行归并排序时的合并因子:mapreduce.task.io.sort.factor # 排序文件时一次最多合并文件的个数 (对于 map 输出很多,可以选择提高该值);

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/737071.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-28
下一篇 2022-04-28

发表评论

登录后才能评论

评论列表(0条)

保存