MapReduce学习4-3:分组

MapReduce学习4-3:分组,第1张

MapReduce学习4-3:分组

    • 1 默认分组比较器
    • 2 分组案例

在 Shufflle阶段输入到 reduce阶段之前,会进行分组

默认分组规则就是同一个 key就会进入同一个 reduce方法中,并且这些 同一个key的所有的值将会存储在一个 迭代器values之中,也就是 reduce方法的 第二个参数

既然同一个 key会进入到同一个比较器之中,那么判断同一个 key就会涉及到 比较,也就是 分组比较。也就是通过比较判断这个 key是否同一个然后将所对应值整合到一个迭代器 values中,然后被同一个 reduce方法处理

 protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

1 默认分组比较器

我们一般运行实例(没有配置任何分组相关的配置)都会看到数据已经进行分组,hadoop的分组比较事实上是通过分组比较器实现的,存在默认的分组比较器。从ReduceTask.class可以看到该比较器

通过getOutputValueGroupingComparator方法可以拿到该默认比较器

在getOutputValueGroupingComparator方法中可以看到,首先是获取配置类中设置的类


这里获取的是mapreduce.job.output.group.comparator.class对应的值,可以在mapred-site.xml文件中进行配置该分组比较器类,而默认配置文件(mapred-default.xml)中并没有配置该类

如果在配置文件中配置了该分组比较器,那么直接反射方法创建该分组比较器并返回

如果没有配置该类那么就调用getOutputKeyComparator方法获取比较器类


关于更多getOutputKeyComparator方法获取比较器可以参考
:MapReduce学习4-1:排序

2 分组案例

默认分组是通过默认的分组比较器实现的,也可以通过自定义分组比较器,自定义进入同一个组的数据的规则,而不限于比较整个key相同才进入同一个分组

1、需求:一个订单中有会有不同的商品,不同商品会产生一定的成交额,求出一堆订单中每个订单中最高的成交额,并且按订单id进行升序排序

2、分析:将整体数据按订单id升序排序,并且在在同一个订单内按金额降序排序。也就是整体升序排序,局部降序排序

3、输入数据

订单id     商品 id     成交金额
10000001   pdt_01     222.8
10000002   Pdt_03     522.8
10000002   pdt_04     122.4
10000003   pdt_06     232.8
10000003   pdt_02     33.8
10000001   pdt_02     33.8
10000002   pdt_05     722.4

4、期望输出:每个订单中成交额最大的记录

10000001   222.8
10000002   722.4
10000003   232.8

5、GroupCompareDriver.class

package com.groupCompare.maven;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class GroupCompareDriver {

    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(GroupCompareDriver.class);
        job.setMapperClass(GroupCompareMapper.class);
        job.setReducerClass(GroupCompareReducer.class);

        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 配置分组比较器
        job.setGroupingComparatorClass(OrderGroupCpmparator.class);


        FileInputFormat.setInputPaths(job, new Path("E:\bigdata\study\test_files\groupinput"));
        FileOutputFormat.setOutputPath(job, new Path("E:\bigdata\study\test_files\groupoutput"));


        job.waitForCompletion(true);
    }
}

6、GroupCompareMapper.class

package com.groupCompare.maven;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class GroupCompareMapper extends Mapper {


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        super.map(key, value, context);

        String line = value.toString();

        String[] infos = line.split("\W+");

        OrderBean outK = new OrderBean();

        outK.setOrderId(infos[0]);
        outK.setPrice(Double.parseDouble(infos[2]));


        context.write(outK, NullWritable.get());

    }
}

7、OrderBean.class

package com.groupCompare.maven;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable {

    private String orderId;
    private Double price;

    public OrderBean(){

    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.orderId);
        out.writeDouble(this.price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId =  in.readUTF();
        this.price = in.readDouble();
    }

    @Override
    public int compareTo(OrderBean o) {

        // Stirng的compareTo方法,下述逻辑会让数据记录按orderId升序排序
        int order_compare_result = this.orderId.compareTo(o.getOrderId());

        

        return  order_compare_result == 0? -this.price.compareTo(o.getPrice()):order_compare_result;
    }

    @Override
    public String toString() {
        return  this.getOrderId()+"t"+this.getPrice();
    }
}

8、GroupCompareReducer.class

package com.groupCompare.maven;


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class GroupCompareReducer extends Reducer {

    @Override
    protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException {
//        super.reduce(key, values, context);
        
        context.write(key, values.iterator().next());
    }
}

这里compareTo是实现排序的一种方式,这里实现同一个分区内的数据的排序比较,而不会分组比较,这里注意区分,例如该排序会发生在Map阶段输出数据到环形缓冲区,在数据将要输出到磁盘之前,会对每个分区的数据进行快速排序,这里的快速排序就会调用上述比较。相关Shfflle原理可以参考:MapReduce学习4:框架原理详解

这里compareTo是实现排序的一种方式,更多可以参考:MapReduce学习4-1:排序

9、OrderGroupCpmparator.class:实现分组比较

package com.groupCompare.maven;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupCpmparator extends WritableComparator {


    public  OrderGroupCpmparator(){

        super(OrderBean.class, true);

    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {

        OrderBean aBean = (OrderBean)a;
        OrderBean bBean = (OrderBean)b;
        

        return aBean.getOrderId().compareTo(bBean.getOrderId());
    }
}

对应传入的两个数据进行比较,如果返回0,那么就会进入同一个分组,其他值不会进入同一个分组

分组是在有序基础上实现的,对于上述测试数据,对于订单id还是成交金额都是无序的

订单id     商品 id     成交金额
10000001   pdt_01     222.8
10000002   Pdt_03     522.8
10000002   pdt_04     122.4
10000003   pdt_06     232.8
10000003   pdt_02     33.8
10000001   pdt_02     33.8
10000002   pdt_05     722.4

经过reduce之前的归并排序,就会整理成如下,按orderId聚集,并且按orderId升序排序,聚集的部分按成交金额降序排序

10000001   pdt_01     222.8
10000001   pdt_02     33.8
10000002   pdt_05     722.4
10000002   Pdt_03     522.8
10000002   pdt_04     122.4
10000003   pdt_06     232.8
10000003   pdt_02     33.8

那么分组的一句就是不是实现分好的,而是调用reduce方法之前首先是进行比较的而比较的规则就是我们的设定,本次案例就是比较ordreId,orderId相同就会进入同一个分组

如上述已经有序的数据,他会首先获取第1行数据,然后用第1行数据进行对比,使用跟我们的规则,发现第2行的orderId跟自己相同,但是第3行不相同,那么前两行分为一个组,然后被redcue方法处理。下一次如法炮制,从第3行开始

假设经过reduce之前的归并排序后变成了以下

10000001   pdt_01     222.8
10000001   pdt_02     33.8
10000002   pdt_05     722.4
10000002   Pdt_03     522.8
10000003   pdt_06     232.8
10000003   pdt_02     33.8
10000002   pdt_04     122.4

那么第3、4行的数据会进入一个分组并被reduce方法处理,最后一行的数据会被单独当成一个分组,即使orderId是相同的

在reduce方法中,key事实上指向一个栈中的地址,指向同一块内存,而内存在栈中,也就是说reduce方法中,key是会被重复利用的,而改变的是堆内存的内容,因而更可以获取“不同的key”

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5690207.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存