- MapReduce概念
- 概述
- 优点和缺点
- 可编程组件
- Wordcount案例
- 需求分析
- 代码实现
- 自定义序列化
- 概述
- 自定义序列化步骤
- 手机号及总流量案例
- 需求分析
MapReduce是一个可用于大规模数据处理的分布式计算框架,具体包含以下三层含义:
1)MapReduce是一个并行程序的计算模型与方法
Mapreduce是一个编程模型,主要用来解决海量数据的并行计算。它借助函数式编程和“分而治之”的思想,使用户参照map和reduce两个思想描述清楚想要处理的问题。
2)MapReduce是一个并行程序运行的软件框架
用户只需要编写业务逻辑代码,MapReduce将其和自带默认组件整合成一个完整的分布式运算程序通过自带默认组件,自动完成计算任务的并行化处理,包括自动划分计算数据和计算任务,在集群上自动分配和执行任务以及手机计算结果等复杂细节问题。
3)MapReduce是一个基于集群的高性能并行计算平台
基于MapReduce框架编写出来的应用程序能够运行在上千个节点组成的集群上,以可靠的方式并行处理TB或者PB级别的数据量。
优点
1)Mapreduce易于编程。
2)具有良好的扩展性
3)高容错性
4)适合PB级以上海量数据的离线处理
缺点
1)不适合实时计算
2)不适合流式计算
3)不适合DAG计算
InputFormat
从InputFormat类图看,InputFormat抽象类仅有两个抽象方法:
List getSplits(), 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。
RecordReader
总体来说,通过InputFormat,Mapreduce框架可以做到:
验证作业输入的正确性
将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask 提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用
split
FileSplit有四个属性:文件路径,分片起始位置,分片长度和存储分片的hosts。用这四项数据,就可以计算出提供给每个Mapper的分片数据。在InputFormat的getSplit()方法中构造分片,分片的四个属性会通过调用FileSplit的Constructor设置。
FileSplit的问题:
FileSplit对应的是一个输入文件,就算一个输入文件特别小,它也会作为一个单独的InputSplit来处理,而每一个InputSplit将会由一个独立的Mapper Task来处理,这样的创建与销毁的开销非常巨大。
解决方法:
CombineFileSplit是针对小文件的分片,它将一系列小文件封装在一个InputSplit内,这样一个Mapper就可以处理多个小文件。可以有效的降低进程开销。与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和分片数据所在的host列表四个属性,只不过这些属性不再是一个值,而是一个列表。
RecordReader
系统默认的RecordReader是LineRecordReader,TextInputFormat
LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value
SequenceFileInputFormat的RecordReader是SequenceFileRecordReader
应用场景:自定义读取每一条记录的方式;自定义读入key的类型,如希望读取的key是文件的路径或名字而不是该行在文件中的偏移量。
Map
常用子类
HashMap
linkedHashMap
常用方法
public V put(K key, V value): 把指定的键与指定的值添加到Map集合中。
public V remove(Object key): 把指定的键 所对应的键值对元素 在Map集合中删除,返回被删除元素的值。
public V get(Object key) 根据指定的键,在Map集合中获取对应的值。
boolean containsKey(Object key) 判断集合中是否包含指定的键。
public Set keySet(): 获取Map集合中所有的键,存储到Set集合中。
public Set
Partition
该该主要在Shuffle过程中按照Key值将中间结果分成R份,其中每份都有一个Reduce去负责,可以通过job.setPartitionerClass()方法进行设置,默认的使用hashPartitioner类。实现getPartition函数
Sort
MapTask和ReduceTask均会对数据按照key进行排序,这是hadoop的默认行为。任何应用程序中的数据均会被排序,不管他们是否需要。
如果key是封装为IntWritable类型,那么MapReduce按照数字大小对key进行排序。
如果key是封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
Combiner
当所有数据处理完后,MapTask对所有临时文件进行一次合并,以确保最终会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。进行文件合并过程中,MapTask以分区为单位进行合并。
对于某个分区,它将采用多轮递归合并的方式,每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入到待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个MapTask最终只生 成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销
Group
当整个数据处理结束之后,会对磁盘中所有的临时文件进行归并(把相同的key合并到一起),生成一个大文件
此时的归并是将所有临时文件中的相同分区的合并到一起,并对各个分区中的数据再进行一次排序
如何比较两个key是否相同:
第一种是key的compareTo方法,返回0时认为两个key相同
第二种是设置一个自定义的GroupingComparatorClass,然后重写类里compare方法,这个方法会覆盖掉key中的compareTo方法,从而作为比较key是否相同的方法。
Reduce
在分组完成后,只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
OutputFormat
OutputFormat接口主要用于描述输出数据的格式,它能够将用户提供的key/value对写入特定格式的文件中。其功能与前面描述的InputFormat相似,Hadoop提供的OutputFormat实例会把文件写在本地磁盘或HDFS上。每一个reducer会把结果输出写在公共文件夹中一个单独的文件内。这些文件一般命名为:part-xxxxx,而xxxxx是关联到某个reduce任务的partition的id。
常见的outputFormat实现类:
1.文本输出TextOutputFormat
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用tostring()方法把它们转换为字符串。
2. SequenceFileoutputFormat
将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
3.自定义OutputFormat
根据用户需求,自定义实现输出。
我们希望能够获取到各学历阶段的人数情况(第四列)
mapper
1.将maptask传输的文本内容转换成string
30,Private,196456,Masters,14,Never-married,Prof-specialty,Not-in-family,White,Female,0,0,31,United-States,<=50K
2.根据,将这一行拆分成单词/词组,并且将每行第四个单词/词组设置为key
3.将key输出为
reducer
1.汇总key的个数
2.输出key的总次数
driver
1.获取配置信息,获取job对象实例
2.指定本程序的jar包所在本地路径
3.关联mapper与reducer业务类
4.指定mapper输出数据的kv类型
4.指定最终输出数据的kv类型
6.指定job的输入原始文件所在目录以及输出路径目录
结果展示
mapper类
在不确定有没有空字段的情况下,通过过滤避免统计结果出现偏差
package com.heria.mapreduce.startstation.huizong; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //map阶段 //KEYIN输入数据的value //KEYOUT输出数据的key类型 //VALUEOUT 输出的value类型 public class WordCountMapper extends Mapper{ Text k = new Text(); IntWritable v = new IntWritable(1); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(","); // 3 输出 if(words.length>=3){ if (words[3]!=null){ String x=words[3]; k.set(x); context.write(k, v); } } } }
reducer类
package bigdata.startstation.huizong; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { sum=0; //1 累加求和 for(IntWritable value : values) { sum+= value.get(); } //2 写出 v.set(sum); context.write(key,v); } }
driver类
package com.heria.mapreduce.startstation.huizong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[] {"H:/BIGDATA/0note/hadoop/3-mapreduce/input/person.csv", "h:/BIGDATA/0note/hadoop/3-mapreduce/output/person/education"}; // 1 获取配置信息以及封装任务 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置jar加载路径 job.setJarByClass(WordCountDriver.class); // 3 设置map和reduce类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4 设置map输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }自定义序列化 概述
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。
当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。
为什么不用java的序列化?
Java的序列化是一个重量级序列化框架( Serializable ) ,一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable )。
Hadoop序列化特点:
(1)紧凑:高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)可扩展:随着通信协议的升级而可升级
(4)互 *** 作:支持多语言的交互
Hadoop Writable框架
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public FlowBean() { super(); }
(3)重写序列化方法
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
(4)重写反序列化方法
@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); }
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用”t”分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。
手机号及总流量案例统计每一个手机号耗费的总上行流量、下行流量、总流量
输入数据(phone_data.txt)
输入数据格式:
(3)期望输出数据格式
13560436666 1116 954 2070 手机号码 上行流量 下行流量 总流量需求分析
mapper
1.将maptask传输的文本内容转换成string
9 13729199489 192.168.100.6 240 0 200
2.根据n将这一行拆分,抽取手机号、上行流量、下行流量
3.以手机号为key,bean对象为value输出,即为context.write(手机号,bean)
reducer
1.累加上行流量和下行流量得到总流量。
2.实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输
3.MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前会排序),排序的依据是map输出的key
所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable。
4.重写key的compareTo方法。
driver
1.获取配置信息,获取job对象实例
2.指定本程序的jar包所在本地路径
3.关联mapper与reducer业务类
4.指定mapper输出数据的kv类型
4.指定最终输出数据的kv类型
6.指定job的输入原始文件所在目录以及输出路径目录
7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
结果实现
mapper类
package com.heria.mapreduce.shangguigu.flowsum; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FlowCountMapper extends Mapper{ FlowBean v = new FlowBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割字段 String[] fields = line.split("t"); // 3 封装对象 // 取出手机号码 String phoneNum = fields[1]; // 取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNum); v.set(downFlow, upFlow); // 4 写出 context.write(k, v); } }
reducer类
package com.heria.mapreduce.shangguigu.flowsum; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowCountReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException { long sum_upFlow = 0; long sum_downFlow = 0; // 1 遍历所用bean,将其中的上行流量,下行流量分别累加 for (FlowBean flowBean : values) { sum_upFlow += flowBean.getUpFlow(); sum_downFlow += flowBean.getDownFlow(); } // 2 封装对象 FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); // 3 写出 context.write(key, resultBean); } }
自定义序列化
package com.heria.mapreduce.shangguigu.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; // 1 实现writable接口 public class FlowBean implements Writable{ private long upFlow; private long downFlow; private long sumFlow; //2 反序列化时,需要反射调用空参构造函数,所以必须有 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } //3 写序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //4 反序列化方法 //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致 @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } // 6 编写toString方法,方便后续打印到文本 @Override public String toString() { return upFlow + "t" + downFlow + "t" + sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void set(long downFlow2, long upFlow2) { upFlow = upFlow2; downFlow=downFlow2; sumFlow=upFlow2+downFlow2; } }
driver类
package com.heria.mapreduce.shangguigu.flowsum; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowSumDriver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "h:/BIGDATA/0note/hadoop/3-mapreduce/input/phone_data.txt", "h:/BIGDATA/0note/hadoop/3-mapreduce/output/output2" }; // 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 6 指定本程序的jar包所在的本地路径 job.setJarByClass(FlowSumDriver.class); // 2 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 3 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 4 指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 5 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
文献参考:
《实战大数据:Hadoop+Spark+Flink:从平台构建到交互式数据分析:离线/实时》 杨俊编著.——北京:机械工业出版社,2021.5
参考博文:
InputFormat组件
https://www.cnblogs.com/shitouer/archive/2013/02/28/hadoop-source-code-analyse-mapreduce-inputformat.html
RecordReader组件
https://blog.csdn.net/scgaliguodong123_/article/details/46492039
map组件
combinerhttps://blog.csdn.net/nanZhaiXiaoLang/article/details/114080167
Combiner组件
https://blog.csdn.net/qq_43634001/article/details/109268027
group组件https://blog.csdn.net/qq_42636010/article/details/90071896
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)