MapReduce学习4-2:Combiner解析与实例

MapReduce学习4-2:Combiner解析与实例,第1张

MapReduce学习4-2:Combiner解析与实例

    • 1 基本概念
    • 2 Combiner发生时机
    • 3 Combiner作用
    • 4 自定义Combiner实现wordcount案例
    • 5 结果

1 基本概念
  1. Combiner是MapReuce程序中Mapper和Reducer之外的一种组件
  2. Combiner组件的父类就是Reducer,因而自定义Combiner需要继承Reducer
  3. Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量
  4. Combine能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。
2 Combiner发生时机

上述概念中知道Combiner的父类是Reducer,但是一般配置的reducer类的父类也是Reducer,二者的区别在于运行的位置:

  1. Combiner是在每一个MapTask所在的节点运行
    2.Reducer是按收全局所有Mapper的输出结果

Combiner作用在Shuffle阶段,他的发生时机有两个

  1. 当为作业设置Combiner类后,缓存溢出线程将缓存存放到磁盘时,就会调用
  2. 在数据归并过程中, 临时溢出文件的数量超过mapreduce.map.combine.minspills(默认3)时会调用,其中mapreduce.map.combine.minspills可在mapred-site.xml进行配置

关于Shuffle相关的原理可以参考这里:MapReduce学习4:框架原理详解

3 Combiner作用

Combiner的作用是在上述发生时机触发合并

例如自定义官方wordcount实例中,对于wordcount案例,如果在Map阶段·进行切割单词,那么对于同一个单词存在两次,例如ab这个单词,那么Map阶段就会输出两次,Map阶段输出到Shuffle阶段的数据首先是进入到数据缓冲区中的,在达到一定阈值后才会输出到磁盘,如果还是上述对于单词ab不做处理,那么就直接输出数据到磁盘中就是两次,无疑数据量增大了。数据量大就会造成后续reduce阶段的数据请求量更大,因为reduce阶段一般是通过Http GET方式请求Map阶段输出的数据,数据量增大那么就会造成更大的网络开销

自定义Combiner就是自定义处理逻辑将数据进行一些合并,具体怎么合并得看你的使用场景。对于上述提到的wordcount案例,就可以编写逻辑将两次的输出,最终也是会达到统计的目的(自定义案例是统计单词对应的数值并输出的

4 自定义Combiner实现wordcount案例

CombinerDriver.class:主要是配置Combiner类

package com.combiner.maven;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CombinerDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(CombinerDriver.class);
        job.setMapperClass(CombinerMapper.class);
        job.setReducerClass(CombinerReducer.class);

        job.setCombinerClass(CombinerCb.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);



        FileInputFormat.setInputPaths(job, new Path("E:\bigdata\study\test_files\combinerinput"));
        FileOutputFormat.setOutputPath(job, new Path("E:\bigdata\study\test_files\combineroutput"));

        job.waitForCompletion(true);
    }
}

CombinerMapper.class

package com.combiner.maven;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;



public class CombinerMapper extends Mapper {

    private Text outK = new Text();

    // 设置IntWritable类型,并且设置的值默认为1
    private IntWritable outV = new IntWritable(1);

    

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        super.map(key, value, context);  因为是自定义,所以不需要调用父类构造函数

        String line = value.toString();
        String[] words = line.split("\s+");

        for (String word: words){
            // 将Java的String类型转换成Text类型
            outK.set(word);

            // 输出,这里会处理成KV的形式,例如:hello,1
            context.write(outK, outV);

        }
    }


}

CombinerReducer.class

package com.combiner.maven;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;



public class CombinerReducer extends Reducer {
    private IntWritable outV = new IntWritable();
    
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//        super.reduce(key, values, context);
        int sum = 0;
        for (IntWritable value: values){
            // 值的获取,也就是1
            sum += value.get();

        }
        outV.set(sum);
        context.write(key, outV);
    }
}

CombinerCb.class:自定义的Combiner类。下述逻辑是跟上述CombinerReducer是一模一样的,这里主要为了分清楚,事实在CombinerDriver.class中设置的Combiner可以直接设置为CombinerReducer.class

package com.combiner.maven;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;



public class CombinerReducer extends Reducer {
    private IntWritable outV = new IntWritable();
    
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//        super.reduce(key, values, context);
        int sum = 0;
        for (IntWritable value: values){
            // 值的获取,也就是1
            sum += value.get();

        }
        outV.set(sum);
        context.write(key, outV);
    }
}

5 结果

可以通过日志输出看到Shuffle的数据输出量查看结果,因为Combiner作用是优化,不能改变最终的输出结果

首先是看下没有设置Combiner之前
可以看到Shuffle阶段输出的是118字节,也就是输入到reduce的数据量,并且输入分组数是10


而合并后可以看到Shuffle阶段输出的是98字节,也就是输入到reduce的数据量,并且输入分组数是8。并且最后输出的记录数都是8,结果并没有变化

相关日志信息输出的POM.xml配置可以看这里:Hadoop学习9:Maven项目跟中进行HDFS客户端测试(hadoop3.1.2)

本次实例是比较简单的实例,数据量小并且MapTask只有一个,或许会给你产生reducer可有可无的错觉,所以一般一个MapTask的时候不需要Combiner,而多个MapTask的时候才使用Combiner,因为就上述上的例子而言,Combiner其实已经完成了reducer的工作,如果非得经过reducer,那么怎么输入就怎么输出就行了,但是如果多个MapTask输出数据并被reducer接受,那么reducer的的作用才会凸显出来,不过具体还得看你的逻辑

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5692499.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存