hadoop使用combiner合并 *** 作

hadoop使用combiner合并 *** 作,第1张

hadoop使用combiner合并 *** 作 前言

任务从MapperTask出来的时候,数据要暂存在一段缓存空间,然后ReducerTask再拉取这些数据进行处理,map到reduce中间的这一段 *** 作,官方称作为 shuffle

通过前面的章节我们了解到,往往来说,MapperTask的任务数量是多于ReduceTask的,这是因为原始的待处理的文件可能很大,在某些场景下,比如日志文件可能达到TB级别的,于是为了提升Map阶段的任务并行处理能力,需要开启更多的MapTask

为什么需要combiner

combiner顾名思义,为合并的含义,为什么需要合并呢?还记得在wordcount案例中,原始的数据内容格式吗?

关羽 关羽
赵云 
刘备 刘备
黄盖
张飞
马超
魏延

在Map阶段,通过代码调试,我们发现,从Map出去,然后进入到Reduce方法中时,相同的key的内容会循环输出

如果以上面的文本内容为例说明的话,经过Map之后,第一行中的“关羽”这个词,将会拆分成这样 (关羽 1),(关羽 1) ,就是说,key是重复的,假如原始的文件非常大,并且里面重复的内容也特别多,这种重复的数据带来的从map到reduce中间因为数据传输带来的影响就非常大了

假如有一种方法,可以将相同的key进行合并,比如“关羽”这个词经过合并后的效果大概就是 (关羽 2),这样只需要传输一次即可

hadoop提供了通过combiner的功能来达到解决上面的业务痛点的办法

combiner特点
  • combiner是MapReduce之外的一种组件
  • combiner组件父类就是Reducer,他们的区别在于运行位置,combiner是在每一个MapTask所在节点运行,而Reducer接收全局的Mapper阶段输出结果
  • combiner的意义在于对每一个MapTask的输出进行局部汇总,以减少网络传输量
  • 运用combiner前提是不能影响最终的业务输出结果,并且combiner的输出KV要跟Reducer的输入KV对应起来
combiner自定义实现

1、增加一个类,继承reducer

从上面的描述可知,自定义的combiner需要继承reducer,输入数据为Map阶段的输出结果,输出结果类型和reduce阶段相同

public class WordCountCombiner extends Reducer {

    private IntWritable outVal = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable value : values){
            sum +=value.get();
        }
        outVal.set(sum);
        context.write(key,outVal);
    }
}

运行之前的wordcount的job类,重点观察输出日志,重点关注圈起来的部分

public class DemoJobDriver {

    public static void main(String[] args) throws Exception {

        //1、获取job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2、设置jar路径
        job.setJarByClass(DemoJobDriver.class);

        //3、关联mapper 和 Reducer
        job.setMapperClass(DemoMapper.class);
        job.setReducerClass(DemoReducer.class);

        //4、设置 map输出的 key/val 的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5、设置最终输出的key / val 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、设置最终的输出路径
        String inputPath = "F:\网盘\csv\laoban.txt";
        String outPath = "F:\网盘\csv\wordcount";
        
        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outPath));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

2、改造job类,代码中添加下面一行即可

//使用自己的combiner
job.setCombinerClass(WordCountCombiner.class);

再次运行上面的代码,

最后来对比下两种不同方式下的map阶段最终输出数据

通过上面的案例演示,了解到使用combiner这个组件,可以在某些业务场景下,一定程度上可缓解map到reduce端数据传输的大小,属于一种优化策略

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5699952.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存