目录
1.mr的核心思想
1.1 什么是Mpareduce
1.2 MR的优缺点
1.3 MR的分片机制
1.3.1输入分片的概念
1.3.2 分片大小的选择
1.3.3 分片与块的区别
1.4 运行流程
1.4.1 MapTask的整体概述
1.4.2 ReduceTask的整体概述
1.5 Shulle流程
1.5.1 map端shuffle
1.5.2 reduce端的shuffle
1.6 combiner函数
1.6.1 为什么要使用combiner函数
1.6.2combiner函数的特点
2、mr的实例进程
3、MapReduce程序的组成
4、WordCount的经典案例
4.1 图解
4.2 开发
1.mr的核心思想
1.1 什么是Mpareducemaptask并发实例,完全并行运行,互不相干
reducetask并发实例互不相干,依赖与上一个阶段所有的maptask并发实例
1.2 MR的优缺点1. mapreduce是hadoop的三大重要模块之一
2. mapreduce是一个并发的计算和分析框架,用于计算和分析分布式文件系统上的大数据集。
3. 将计算划分为两个阶段:一个map(映射)阶段,一个reduce(归约)阶段
4. 该框架的开发灵感来源于google的《mapreduce》论文
5. 方便开发人员在不会分布式计算的情况下,开发和运行相关计算程序。
1.3 MR的分片机制 1.3.1输入分片的概念1. 优点
- 适合离线数据处理
- mapreduce编程简单
- 扩展性良好
- 高容错性
2. 缺点
- 不适合实时计算(实时计算:毫秒级别/秒级别,离线计算:秒级别以上)
- 不适合流式计算(mapreduce处理的数据是静态的,不是流式的数据)
- 不适合DAG(有向图)计算
1.3.2 分片大小的选择MapReduce在进行作业提交时,会预先对将要分析的原始数据进行划分处理,形成一个个等长的逻辑数据对象,称之为输入分片(inputSplit),简称“分片”。MapReduce为每一个分片构建一个单独的MapTask,并由该任务来运行用户自定义的map方法,从而处理分片中的每一条记录。
a文件 1025MB --- 9块,最后一块是1MB
分析:分片就是记录MapTask要处理的是哪一个块,通过属性记录,因此称之为逻辑数据。
1.3.3 分片与块的区别#1. 分片数量越多,优势如下:
-- 处理分片对应的原始数据所花的时间更少,也就是小于一个分片处理整个文件的时间。
-- 每一个分片对应一个MapTask,MapTask是并行运行的,效率高
-- 分片越多,负载均衡就越好。
-- 计算机硬件越好,处理速度越快,就可以腾出时间,计算其他任务。
#2. 分片太小的缺点:
如果分片太小,管理分片的总时间和构建map任务的总时间将决定作业的整个执行时间
#3. 分片太大的缺点:
如果分片跨越两个数据块,那么分片的部分数据需要通过网络传输到map任务运行的节点,占用网络带宽,效率更低
#4 得出结论:
因此最佳分片大小应该和HDFS上的块大小一致。hadoop2.x默认128M.
1.4 运行流程 1.4.1 MapTask的整体概述1. 分片是逻辑数据,记录的是要处理的物理块的信息而已
path,start,length,hosts
2. 块是物理的,是真实存储在文件系统上的原始数据文件。
本质: 逻辑数据
物理数据
出现的时机:
mapreduce程序进行计算时,才会遇到分片概念
hdfs的存储才会涉及到分块的概念
1.4.2 ReduceTask的整体概述1. maptask调用FileInputFormat的createRecordReader通过分片数据来读取原始数据
2. 会调用nextKeyValue方法获取每行数据,然后返回一个(K,V)对,K是offset,V是一行数据
3. 将k-v对交给Map函数进行处理
4. 每对k-v调用一次map(K,V,context)方法,经过处理,使用context.write(k,v)
5. 写出的数据交给收集器OutputCollector.collect()处理
6. 将数据写入环形缓冲区,并记录写入的起始偏移量,终止偏移量,环形缓冲区默认大小100M
7. 默认写到80%的时候要溢写到磁盘,溢写磁盘的过程中数据继续写入剩余20%
8. 溢写磁盘之前要先进行分区然后分区内进行排序
9. 默认的分区规则是hashpatitioner,即key的hash%reduceNum
10. 默认的排序规则是key的字典顺序,使用的是快速排序
11. 溢写会形成多个文件,在maptask读取完一个分片数据后,会将环形缓冲区数据刷写到磁盘
12. 将数据多个溢写文件进行合并,分区内排序(外部排序===》归并排序)
1.5 Shulle流程1. reduceTask启动后,开始使用线程fetch属于自己分区的map端产生的临时数据。
2. fetch过来的K2,v2,在内存中会进行归并排序。
3. reduceTask会调用分组器将内存中的k2,v2按照不同的k2来分组。
4. 每一组的>调用一次reduce方法,经过处理,使用context.write(k,v)写出去。
5. 写出去的位置由FileOutputFormat的参数决定。
1.5.1 map端shuffle从map阶段的map函数输出开始,到reduce阶段的reduce函数接受数据为止,这个过程称之为shuffle阶段。
整个shuffle阶段可以划分为map端的shuffle和reduce端的shuffle.
1.5.2 reduce端的shuffle1. map函数的输出数据K2,v2进入环形缓冲区,环形缓冲区默认大小100MB,阈值80MB。
2. 当数据存储达到阈值时,会启动一个线程将该数据溢写出去,剩下的20M会继续写入,当20M写满,80M还未溢写完,则maptask出现阻塞。
3. 在溢写前,会先调用分区器按key分区,然后同一分区的数据进行按key排序,排序算法为QuickSort,规则为字典排序。(注意:分区和排序都是对KV对的元数据进行的)
4. 溢写时会产生临时文件,按照分区号从小到大将每一个分区的数据溢写到文件中(注意:写出时是通过排好序的元数据找到原始数据,写出去)。
5. 在排序后,溢写前,可以调用combiner函数来减少数据的溢写,目的减少磁盘IO.
6. 如果溢写文件数量有多个的话,会进行再次合并,合并成一个最终的临时文件,合并的时候使用的是归并算法。
7. 如果溢写文件数据至少3个,则在合并排序后,会再次调用combiner函数,来减少最终临时文件的大小。如果数量低于3个,则没有必要调用combiner。
8. 为了减少临时文件到reduce端的网络IO,建议将临时文件压缩,再进行传输。
1.6 combiner函数 1.6.1 为什么要使用combiner函数1. 当某一个mapTask结束后,reduceTask会利用线程开始fetch属于自己要处理的分区的数据,线程默认是5个,线程数量是针对于每一个节点来说的,线程通过HTTP协议抓取数据。
2. 如果抓过来的数据量过小,直接在对应的jvm的内存中进行归并排序,输入给reduce函数
3. 如果数据量过大,则会直接拷贝到所在的本地磁盘上,然后将多个文件合并成较大的文件,合并因子是10,合并时采用的算法是归并算法。(细节:最后一次合并一定要满足10这个因子,而且不会溢写成文件,直接输入给reduce).
4. 在归并算法合并时,如果map端的数据是压缩的,那么要在reduceTask的内存中解压缩再合并。
5. reduce处理后将数据存储到HDFS上。
1.6.2combiner函数的特点因为mapreduce在运行作业时,会涉及到磁盘IO和网络IO。而集群中资源都有限(磁盘空间,带宽)。因此在作业期间能减少磁盘IO和网络IO,是最优的。而Combiner函数就可以帮助用户来做到这点。
2、mr的实例进程1. 本质就是运行在各个阶段的排序后的reduce函数。可以在map阶段,也可以在reduce阶段。
2. 是mapreduce作业的一个组件。
3. 父类是Reducer类型。
4. 减少IO
5. 应该在不影响结果的前提下来使用。
6. 使用job.setCombinerClass(.....)来设置
注意:编写程序时要考虑combiner类的reduce函数的泛型问题
因为该reduce函数的输入数据:是map函数的输出数据
输出数据: 是要被reduceTask的reduce函数的,也就不是K3,V3还是K2,V2
3、MapReduce程序的组成MrAppMaster:负责整个程序的过程调度以及状态协调
MapTask:负责map阶段的整个数据处理流程
ReduceTask:负责reduce阶段整个数据处理流程
用户编写三个部分:Mapper、reducer、dirver
Mapper端
1)用户自定义的mapper要继承自己的父类
2)map的输入数据是KV对的形式(KV类型可自定义)
3)Mppper中的业务逻辑写在Map()方法中
4)Mapper的输出数据是KV对的形式(KV的类型可以自定义)
5)map()方法(maptask进程)对每一个调用一次)
Mapper端
1)用户自定义的mapper要继承自己的父类
2)map的输入数据是KV对的形式(KV类型可自定义)
3)Mppper中的业务逻辑写在Map()方法中
4)Mapper的输出数据是KV对的形式(KV的类型可以自定义)
5)map()方法(maptask进程)对每一个调用一次
reducer端
1)用户自定义的reducer要继承自己的父类
2)reducer的输入数据类型要对应Mapper的输出数据类型也是KV
3)Reducer的业务逻辑写在reduce()方法中
4)reducertask进程对每一组相同的组调用一次reduce()方法
4、WordCount的经典案例 4.1 图解 4.2 开发Driver
就是描述job对象1. 获取job对象
2. 指定驱动类型
3. 指定Mapper类型和Reducer类型
4. 指定map阶段的K2,V2类型
5. 指定reduce阶段的K3,V3类型
6. 指定分区的数量.....
7. 指定要统计的文件的输入路径
8. 指定要输出的位置路径
9. 提交程序
pom.xml
org.apache.hadoop
hadoop-common
2.7.6
org.apache.hadoop
hadoop-client
2.7.6
org.apache.hadoop
hadoop-hdfs
2.7.6
package com.qf.bigdata.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
//key 偏移量 value 就是每行文本
//按照空格对每行文本进行分词 *** 作
String line = value.toString(); //就是每行的偏移量
String[] words = line.split(""); //就是对每行的内容进行切分成字符串数组
for (int i = 0 ;i < words.length;i++){
//(word,1)pp
Text word = new Text(words[i]);
IntWritable countNum = new IntWritable(1);
context.write(word,countNum);
}
}
}
package com.qf.bigdata.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer {
@Override
protected void reduce(Text k3,Iterable v3,Context context) throws IOException,InterruptedException{
//进行求和 *** 作,需要计算V3的长度
//
int count = 0;
for(IntWritable v : v3){
int value = v.get();
count += value;
}
context.write(k3,new IntWritable(count));
}
}
package com.qf.bigdata.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountJob {
public static void main(String[] args) throws IOException, InterruptedException {
try{
Configuration configuration = new Configuration();
//实例化job
Job job = Job.getInstance();
job.setJarByClass(WordCountJob.class);
//设置Mapper
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reducer
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定文件的输入路径
FileInputFormat.setInputPaths(job,new Path("d:/wordcount.txt"));
//指定文件的输出路径
Path outPath = new Path("D:/output");
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(outPath)){
fs.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job,outPath);
//提交等待完成
System.exit(job.waitForCompletion(true)?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)