任务从MapperTask出来的时候,数据要暂存在一段缓存空间,然后ReducerTask再拉取这些数据进行处理,map到reduce中间的这一段 *** 作,官方称作为 shuffle
通过前面的章节我们了解到,往往来说,MapperTask的任务数量是多于ReduceTask的,这是因为原始的待处理的文件可能很大,在某些场景下,比如日志文件可能达到TB级别的,于是为了提升Map阶段的任务并行处理能力,需要开启更多的MapTask
为什么需要combinercombiner顾名思义,为合并的含义,为什么需要合并呢?还记得在wordcount案例中,原始的数据内容格式吗?
关羽 关羽 赵云 刘备 刘备 黄盖 张飞 马超 魏延
在Map阶段,通过代码调试,我们发现,从Map出去,然后进入到Reduce方法中时,相同的key的内容会循环输出
如果以上面的文本内容为例说明的话,经过Map之后,第一行中的“关羽”这个词,将会拆分成这样 (关羽 1),(关羽 1) ,就是说,key是重复的,假如原始的文件非常大,并且里面重复的内容也特别多,这种重复的数据带来的从map到reduce中间因为数据传输带来的影响就非常大了
假如有一种方法,可以将相同的key进行合并,比如“关羽”这个词经过合并后的效果大概就是 (关羽 2),这样只需要传输一次即可
hadoop提供了通过combiner的功能来达到解决上面的业务痛点的办法
combiner特点- combiner是MapReduce之外的一种组件
- combiner组件父类就是Reducer,他们的区别在于运行位置,combiner是在每一个MapTask所在节点运行,而Reducer接收全局的Mapper阶段输出结果
- combiner的意义在于对每一个MapTask的输出进行局部汇总,以减少网络传输量
- 运用combiner前提是不能影响最终的业务输出结果,并且combiner的输出KV要跟Reducer的输入KV对应起来
1、增加一个类,继承reducer
从上面的描述可知,自定义的combiner需要继承reducer,输入数据为Map阶段的输出结果,输出结果类型和reduce阶段相同
public class WordCountCombiner extends Reducer{ private IntWritable outVal = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable value : values){ sum +=value.get(); } outVal.set(sum); context.write(key,outVal); } }
运行之前的wordcount的job类,重点观察输出日志,重点关注圈起来的部分
public class DemoJobDriver { public static void main(String[] args) throws Exception { //1、获取job Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //2、设置jar路径 job.setJarByClass(DemoJobDriver.class); //3、关联mapper 和 Reducer job.setMapperClass(DemoMapper.class); job.setReducerClass(DemoReducer.class); //4、设置 map输出的 key/val 的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5、设置最终输出的key / val 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6、设置最终的输出路径 String inputPath = "F:\网盘\csv\laoban.txt"; String outPath = "F:\网盘\csv\wordcount"; FileInputFormat.setInputPaths(job,new Path(inputPath)); FileOutputFormat.setOutputPath(job,new Path(outPath)); // 7 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
2、改造job类,代码中添加下面一行即可
//使用自己的combiner job.setCombinerClass(WordCountCombiner.class);
再次运行上面的代码,
最后来对比下两种不同方式下的map阶段最终输出数据
通过上面的案例演示,了解到使用combiner这个组件,可以在某些业务场景下,一定程度上可缓解map到reduce端数据传输的大小,属于一种优化策略
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)