- 1 基本概念
- 2 Combiner发生时机
- 3 Combiner作用
- 4 自定义Combiner实现wordcount案例
- 5 结果
- Combiner是MapReuce程序中Mapper和Reducer之外的一种组件
- Combiner组件的父类就是Reducer,因而自定义Combiner需要继承Reducer
- Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量
- Combine能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。
上述概念中知道Combiner的父类是Reducer,但是一般配置的reducer类的父类也是Reducer,二者的区别在于运行的位置:
- Combiner是在每一个MapTask所在的节点运行
2.Reducer是按收全局所有Mapper的输出结果
Combiner作用在Shuffle阶段,他的发生时机有两个
- 当为作业设置Combiner类后,缓存溢出线程将缓存存放到磁盘时,就会调用
- 在数据归并过程中, 临时溢出文件的数量超过mapreduce.map.combine.minspills(默认3)时会调用,其中mapreduce.map.combine.minspills可在mapred-site.xml进行配置
3 Combiner作用关于Shuffle相关的原理可以参考这里:MapReduce学习4:框架原理详解
Combiner的作用是在上述发生时机触发合并
例如自定义官方wordcount实例中,对于wordcount案例,如果在Map阶段·进行切割单词,那么对于同一个单词存在两次,例如ab这个单词,那么Map阶段就会输出两次,Map阶段输出到Shuffle阶段的数据首先是进入到数据缓冲区中的,在达到一定阈值后才会输出到磁盘,如果还是上述对于单词ab不做处理,那么就直接输出数据到磁盘中就是两次,无疑数据量增大了。数据量大就会造成后续reduce阶段的数据请求量更大,因为reduce阶段一般是通过Http GET方式请求Map阶段输出的数据,数据量增大那么就会造成更大的网络开销
自定义Combiner就是自定义处理逻辑将数据进行一些合并,具体怎么合并得看你的使用场景。对于上述提到的wordcount案例,就可以编写逻辑将两次的输出,最终也是会达到统计的目的(自定义案例是统计单词对应的数值并输出的
CombinerDriver.class:主要是配置Combiner类
package com.combiner.maven; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class CombinerDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(CombinerDriver.class); job.setMapperClass(CombinerMapper.class); job.setReducerClass(CombinerReducer.class); job.setCombinerClass(CombinerCb.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("E:\bigdata\study\test_files\combinerinput")); FileOutputFormat.setOutputPath(job, new Path("E:\bigdata\study\test_files\combineroutput")); job.waitForCompletion(true); } }
CombinerMapper.class
package com.combiner.maven; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class CombinerMapper extends Mapper{ private Text outK = new Text(); // 设置IntWritable类型,并且设置的值默认为1 private IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // super.map(key, value, context); 因为是自定义,所以不需要调用父类构造函数 String line = value.toString(); String[] words = line.split("\s+"); for (String word: words){ // 将Java的String类型转换成Text类型 outK.set(word); // 输出,这里会处理成KV的形式,例如:hello,1 context.write(outK, outV); } } }
CombinerReducer.class
package com.combiner.maven; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CombinerReducer extends Reducer{ private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // super.reduce(key, values, context); int sum = 0; for (IntWritable value: values){ // 值的获取,也就是1 sum += value.get(); } outV.set(sum); context.write(key, outV); } }
CombinerCb.class:自定义的Combiner类。下述逻辑是跟上述CombinerReducer是一模一样的,这里主要为了分清楚,事实在CombinerDriver.class中设置的Combiner可以直接设置为CombinerReducer.class
package com.combiner.maven; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CombinerReducer extends Reducer5 结果{ private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // super.reduce(key, values, context); int sum = 0; for (IntWritable value: values){ // 值的获取,也就是1 sum += value.get(); } outV.set(sum); context.write(key, outV); } }
可以通过日志输出看到Shuffle的数据输出量查看结果,因为Combiner作用是优化,不能改变最终的输出结果
首先是看下没有设置Combiner之前
可以看到Shuffle阶段输出的是118字节,也就是输入到reduce的数据量,并且输入分组数是10
而合并后可以看到Shuffle阶段输出的是98字节,也就是输入到reduce的数据量,并且输入分组数是8。并且最后输出的记录数都是8,结果并没有变化
相关日志信息输出的POM.xml配置可以看这里:Hadoop学习9:Maven项目跟中进行HDFS客户端测试(hadoop3.1.2)
本次实例是比较简单的实例,数据量小并且MapTask只有一个,或许会给你产生reducer可有可无的错觉,所以一般一个MapTask的时候不需要Combiner,而多个MapTask的时候才使用Combiner,因为就上述上的例子而言,Combiner其实已经完成了reducer的工作,如果非得经过reducer,那么怎么输入就怎么输出就行了,但是如果多个MapTask输出数据并被reducer接受,那么reducer的的作用才会凸显出来,不过具体还得看你的逻辑
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)