- 1 默认分组比较器
- 2 分组案例
在 Shufflle阶段输入到 reduce阶段之前,会进行分组
默认分组规则就是同一个 key就会进入同一个 reduce方法中,并且这些 同一个key的所有的值将会存储在一个 迭代器values之中,也就是 reduce方法的 第二个参数
既然同一个 key会进入到同一个比较器之中,那么判断同一个 key就会涉及到 比较,也就是 分组比较。也就是通过比较判断这个 key是否同一个然后将所对应值整合到一个迭代器 values中,然后被同一个 reduce方法处理
protected void reduce(Text key, Iterable1 默认分组比较器values, Context context) throws IOException, InterruptedException {
我们一般运行实例(没有配置任何分组相关的配置)都会看到数据已经进行分组,hadoop的分组比较事实上是通过分组比较器实现的,存在默认的分组比较器。从ReduceTask.class可以看到该比较器
通过getOutputValueGroupingComparator方法可以拿到该默认比较器
在getOutputValueGroupingComparator方法中可以看到,首先是获取配置类中设置的类
这里获取的是mapreduce.job.output.group.comparator.class对应的值,可以在mapred-site.xml文件中进行配置该分组比较器类,而默认配置文件(mapred-default.xml)中并没有配置该类
如果在配置文件中配置了该分组比较器,那么直接反射方法创建该分组比较器并返回
如果没有配置该类那么就调用getOutputKeyComparator方法获取比较器类
关于更多getOutputKeyComparator方法获取比较器可以参考
:MapReduce学习4-1:排序
默认分组是通过默认的分组比较器实现的,也可以通过自定义分组比较器,自定义进入同一个组的数据的规则,而不限于比较整个key相同才进入同一个分组
1、需求:一个订单中有会有不同的商品,不同商品会产生一定的成交额,求出一堆订单中每个订单中最高的成交额,并且按订单id进行升序排序
2、分析:将整体数据按订单id升序排序,并且在在同一个订单内按金额降序排序。也就是整体升序排序,局部降序排序
3、输入数据
订单id 商品 id 成交金额 10000001 pdt_01 222.8 10000002 Pdt_03 522.8 10000002 pdt_04 122.4 10000003 pdt_06 232.8 10000003 pdt_02 33.8 10000001 pdt_02 33.8 10000002 pdt_05 722.4
4、期望输出:每个订单中成交额最大的记录
10000001 222.8 10000002 722.4 10000003 232.8
5、GroupCompareDriver.class
package com.groupCompare.maven; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class GroupCompareDriver { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(GroupCompareDriver.class); job.setMapperClass(GroupCompareMapper.class); job.setReducerClass(GroupCompareReducer.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 配置分组比较器 job.setGroupingComparatorClass(OrderGroupCpmparator.class); FileInputFormat.setInputPaths(job, new Path("E:\bigdata\study\test_files\groupinput")); FileOutputFormat.setOutputPath(job, new Path("E:\bigdata\study\test_files\groupoutput")); job.waitForCompletion(true); } }
6、GroupCompareMapper.class
package com.groupCompare.maven; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class GroupCompareMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // super.map(key, value, context); String line = value.toString(); String[] infos = line.split("\W+"); OrderBean outK = new OrderBean(); outK.setOrderId(infos[0]); outK.setPrice(Double.parseDouble(infos[2])); context.write(outK, NullWritable.get()); } }
7、OrderBean.class
package com.groupCompare.maven; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable{ private String orderId; private Double price; public OrderBean(){ } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.orderId); out.writeDouble(this.price); } @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.price = in.readDouble(); } @Override public int compareTo(OrderBean o) { // Stirng的compareTo方法,下述逻辑会让数据记录按orderId升序排序 int order_compare_result = this.orderId.compareTo(o.getOrderId()); return order_compare_result == 0? -this.price.compareTo(o.getPrice()):order_compare_result; } @Override public String toString() { return this.getOrderId()+"t"+this.getPrice(); } }
8、GroupCompareReducer.class
package com.groupCompare.maven; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class GroupCompareReducer extends Reducer{ @Override protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException { // super.reduce(key, values, context); context.write(key, values.iterator().next()); } }
这里compareTo是实现排序的一种方式,这里实现同一个分区内的数据的排序比较,而不会分组比较,这里注意区分,例如该排序会发生在Map阶段输出数据到环形缓冲区,在数据将要输出到磁盘之前,会对每个分区的数据进行快速排序,这里的快速排序就会调用上述比较。相关Shfflle原理可以参考:MapReduce学习4:框架原理详解
这里compareTo是实现排序的一种方式,更多可以参考:MapReduce学习4-1:排序
9、OrderGroupCpmparator.class:实现分组比较
package com.groupCompare.maven; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderGroupCpmparator extends WritableComparator { public OrderGroupCpmparator(){ super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean)a; OrderBean bBean = (OrderBean)b; return aBean.getOrderId().compareTo(bBean.getOrderId()); } }
对应传入的两个数据进行比较,如果返回0,那么就会进入同一个分组,其他值不会进入同一个分组
分组是在有序基础上实现的,对于上述测试数据,对于订单id还是成交金额都是无序的
订单id 商品 id 成交金额 10000001 pdt_01 222.8 10000002 Pdt_03 522.8 10000002 pdt_04 122.4 10000003 pdt_06 232.8 10000003 pdt_02 33.8 10000001 pdt_02 33.8 10000002 pdt_05 722.4
经过reduce之前的归并排序,就会整理成如下,按orderId聚集,并且按orderId升序排序,聚集的部分按成交金额降序排序
10000001 pdt_01 222.8 10000001 pdt_02 33.8 10000002 pdt_05 722.4 10000002 Pdt_03 522.8 10000002 pdt_04 122.4 10000003 pdt_06 232.8 10000003 pdt_02 33.8
那么分组的一句就是不是实现分好的,而是调用reduce方法之前首先是进行比较的而比较的规则就是我们的设定,本次案例就是比较ordreId,orderId相同就会进入同一个分组
如上述已经有序的数据,他会首先获取第1行数据,然后用第1行数据进行对比,使用跟我们的规则,发现第2行的orderId跟自己相同,但是第3行不相同,那么前两行分为一个组,然后被redcue方法处理。下一次如法炮制,从第3行开始
假设经过reduce之前的归并排序后变成了以下
10000001 pdt_01 222.8 10000001 pdt_02 33.8 10000002 pdt_05 722.4 10000002 Pdt_03 522.8 10000003 pdt_06 232.8 10000003 pdt_02 33.8 10000002 pdt_04 122.4
那么第3、4行的数据会进入一个分组并被reduce方法处理,最后一行的数据会被单独当成一个分组,即使orderId是相同的
在reduce方法中,key事实上指向一个栈中的地址,指向同一块内存,而内存在栈中,也就是说reduce方法中,key是会被重复利用的,而改变的是堆内存的内容,因而更可以获取“不同的key”
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)