- 1 排序分类
- 2 相关接口
- 3 比较器的配置
- 3.1 Text类的自定义比较器
- 3.2 比较器获取
- 4 自定义比较规则
- 4.1 两种情况
- 4.2 方案1实例(自定义比较器:Comparator实现)
- 4.3 情况2实例(利用默认比较器:Comparable实现)
- 5 区内排序
在 MapRedece的 Shuffle过程中溢出前在分区内进行的是 快速排序,该过程是 自动进行的。无论 reduceTask还是 mapTask默认是 按字典顺序排序,并且默认是对 key的字典顺序排序。 Shuffle相关可以参考: MapReduce学习4:框架原理详解
1 排序分类
部分排序:MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序
全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个reduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构
辅助排序:(GroupingComparator分组):在Reduce端对key进行分组。应用于在接收的key为bean对象时,想让一个或几个字段相同〈全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序
二次排序:在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序
Java中比较相关的类和接口:
- comparable : 比较接口,通过compareTo方法定义比较规则
- comparator: 比较器,通过compare方法定义比较规则
hadoop比较使用的类和接口:
- WritableComparable :支持序列化和比较的接口
- WritableComparator : 比较器
hadoop实现排序实际就是实现比较,实际是调用的比较器处理:包括自定义比较器以及默认比较器。但是无论哪种比较器,发生比较的对象的类都需要实现WritableComparable接口,并且只有key才会发生比较而value不会。例如自定义wordcount官方实例中。在Map阶段输出到环形缓冲区的时候,在达到一定阈值的时候将要将要输出到磁盘之前会在每个分区内进行快速排序,在实例中,map输出的key是单词,而value固定是1,表示1个单词,那么分组排只能是key也就是单词进行排序,并且默认是按字典顺序,也就是对于两个单词ab和ba,ab会排在前边。key的类型是Text,那么Text.class就需要实现WritableComparable接口。接下来详细解析
对于上述的环形缓冲区等概念涉及Shuffle原理,可以参考这里:MapReduce学习4:框架原理详解
从源码中可以看到WritableComparable接口如下,其继承了Writable以及Comparable,Writable是用来实现hadoop序列化的接口,Writable为Comparable处理序列化问题,所以在hadoop中进行比较也是需要序列化的,因为继承的两个接口可以说是绑定的。关于hadoop序列化以及Writable接口,可以参考这里:MapReduce学习3:序列化
public interface WritableComparable3 比较器的配置 3.1 Text类的自定义比较器extends Writable, Comparable { }
我们一般运行实例并且map输出的key的类型使用hadoop基本数据类型,输出的数据都是一个局部有序的状态,这是因为基本数据类型都自定义了比较器用以排序比较
如下为Text.class源码片段,可以看到其比较器:
自定义比较器都是统一继承的WritableComparator类,如上是Text自定义比较器:Comparator类
那么既然有了比较器怎么用它进行比较排序?他是通过将该类的对应的比较器进行注册,然后使后续逻辑能通过该类找到他所对应的比较器。注册相关是在Text类中的静态代码块中,通过实现的WritableComparator的define方法进行注册该比较类
而define方法中,主要是将对应的类作为key还有Comparator对象放到一个线程安全的HashMap:comparators中,也就是这里的注册即使添加一个k-v关系,后续逻辑从该HashMap中获取Text.class对应的比较器
上述举例了Text类中自定义的比较器,自定义了比较器后,就需要在后续逻辑进行比较执之前进行获取比较器进行比较。如下是在数据即将序列化输出到磁盘之前进行排序时调用的获取排序器的方法getOutputKeyComparator获取比较器。该源码片段可以在MapTask.class找到
通过getOuputKeyComparator方法最终获取比较器,然后进行配置比较器,等待排序时机就会进行调用该比较器进行排序比较。如下是该方法getOuputKeyComparator方法的逻辑getClass方法中,首先是首先是获取配置文件中mapreduce.job.output.key.comparator.class定义的比较器,该配置项可以在mapred-site.xml中进行配置。默认配置文件(hadoop 3.1.2)没有相关配置
public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
如果配置文件配置了相关的比较器的类,那么就会通过getClass进行返回,如果没有就是null
- 配置的比较类存在的情况下通过反射方法进行新建该类并作为构造器返回
- 如果不存在就会通过getMapOutputKeyClass进行获取输出的键的类型(OutputKeyClass可以通过job对象进行配置)。
- 而addSubClass方法是getMapOutputKeyClass方法获取的类强制转换成WritableComparable.class,这样做是保证获取的类是WritableComparable.class的子类,因为Shuffle过程中的涉及排序的key的类型都是统一实现的WritableComparable接口的,例如默认的Text.class都是实现了WritableComparable接口的。这么做的一个直接的原因是:上述获取比较器然后进行后续的比较步骤,但是实际进行比较的时候实际调用的是比较器的compare方法,而对于自定义比较器,compare方法的参数类型都是重写的WritableComparator类的compare方法,而WritableComparator类的compare方法的参数类型是WritableComparable
在没有在配置文件的配置项中配置,那么就进行最后一步逻辑,首先是调用getMapOutputKeyClass方法
getMapOutputKeyClass方法首先也是获取配置文件上mapreduce.map.output.key.class是否相应的值,如果存在,那么返回配置的类
如果不存在就获取配置的输出键的类,该类可以在自定义的Driver文件中配置Map阶段输出的key的类型
job.setMapOutputKeyClass(CompareBean.class);
public static final String MAP_OUTPUT_KEY_CLASS = "mapreduce.map.output.key.class";
最终结果是返回设置的类或者客户端设置的Map阶段的输出key的类并被WritableComparator.get方法接受
get方法就用到了上述线程安全的HashMap,如果通过该HashMap可以获取到类对应的比较器那么正常返回,例如hadoop基本类型中Text就自定义了比较器,那么就会第一段逻辑中通过comparatos获取到该对应关系并最终拿到Text对应的比较器
若获取的比较器不存在,首先是进行类的强制初始化(强制类加载),执行forceInit方法,因为类有可能没有加载进来或者在内存中已经被销毁了。强制初始化类就会重新加载类的静态方法进行加载比较器以及注册比较器
如果强制初始化也没用,那么就新建一个比较器对象(WritableComparator)
comparator = new WritableComparator(c, conf, true);
其中第一个参数就是发生比较的数据的类,第二个参数是配置对象,而对于第三个参数,如果是自定义的比较器,那么就需要传true,具体可以从源码看到
如上述红色箭头,当传入为true的时候,才会新建key1和key2,新建的两个对象就是要比较的对象,从newKey()方法可以看到,他是通过反射的方式创建keyClass,也就是传入的要比较的数据的类。最后新建的两个对象发生比较排序,如果不是true,就没有新建两个对象,也就不会存在比较排序了
上述对排序比较有了一个大致的认识,接下来是介绍自定义比较规则,为什么是自定义比较规则呢?因为实现排序比较的时候,可以自定义比较器,也可以使用默认比较器
排序只针对key,对于一个传入的key,他要实现比较,可能有两种情况为:
- 传入的key有单独设置了比较器类,也就是上述中在配置文件直接配置了mapreduce.job.output.key.comparator.class的值或者通过在driver文件中通过job对象配置比较器类,这种情况就可以直接使用该属性对应的比较器类
- 传入的key没有设置比较器类,那么hadoop就会自行创建一个比较器对象。比较规则是可以自定义的,正常自定义都是没有设置比较器类的,也就是不会像Text.class这种在comparators这个线程安全的HashMap中进行注册比较器,所以会按照逻辑,对于自定义比较规则的就会按上述源码,就会走到如下的位置。那么就会创建一个默认的比较器
那么如何实现自定义的排序规则呢?上文是最终获取了比较器,这个比较器可以是自定义的,也可以是默认的,最终进行过比较的事实就是比较器中调用compare方法。所以实现自定义的比较,就有两种方案
- 在单独配置的自定义的比较器类中重写compare方法
- 只需要在key对应的类型的类(例如key的类型是Text等),那么对应的类只需要继承WritableComparator接口并实现compareTo方法即可
可能在这里比较模糊,接下来的实例会更形象地进行介绍
4.2 方案1实例(自定义比较器:Comparator实现)1、需求:对于每个手机号产生一定的上行和下行流量,统计每个手机的总流量(上行流量+下行流量)并按总流量降序排序输出,输出格式如下:
手机号 上行流量 下行流量 总流量
2、输入数据
#ID 手机号 IP 上行流量 下行流量 网络状态码 1 13660436666 120.196.101.99 1126 954 200 2 13760436667 120.196.102.99 1136 954 200 3 13860436668 120.196.103.99 1146 954 200 4 13960436669 120.196.104.99 1156 954 200 5 13660436676 120.196.105.99 1166 954 200 6 13760436686 120.196.106.99 1176 954 200 7 13860436666 120.196.107.99 1126 954 200 8 13960436667 120.196.108.99 1136 954 200 9 13660436668 120.196.109.99 1146 954 200
3、分析:对于上述数据中,因为需要输出上行流量和下行流量以及总流量,那么可以创建一个bean对象去装载这些数据。因为需要按总流量降序进行排序,但是hadoop的排序规则只是针对输出的key处理的,所以在Map输出后将bean对象作为key并定义compare方法按总流量排序
3、CompareBean.class:用以存储上下行流量以及总流量的bean对象
package com.compare.maven; import com.writable.maven.FlowBean; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CompareBean implements WritableComparable{ private Long downFlow; private Long upFlow; private Long sumFlow; public CompareBean(){ } public Long getDownFlow() { return downFlow; } public void setDownFlow(Long downFlow) { this.downFlow = downFlow; } public Long getUpFlow() { return upFlow; } public void setUpFlow(Long upFlow) { this.upFlow = upFlow; } public Long getSumFlow() { return sumFlow; } public void setSumFlow(Long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow(){ this.sumFlow = this.upFlow + this.downFlow; } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.upFlow); out.writeLong(this.downFlow); out.writeLong(this.sumFlow); } @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public String toString() { return downFlow + "t" + upFlow +"t"+ sumFlow ; } @Override public int compareTo(CompareBean o) { return 0; } }
4、CompareMapper.class
package com.compare.maven; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class CompareMapper extends Mapper{ private Text outK = new Text(); private CompareBean cb = new CompareBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // super.map(key, value, context); String line = value.toString(); String[] words = line.split("\W+"); String phoneNum = words[1]; outK.set(phoneNum); int len = words.length; cb.setUpFlow(Long.parseLong(words[len-3])); cb.setDownFlow(Long.parseLong(words[len-2])); cb.setSumFlow(); context.write(cb,outK); } }
5、CompareReducer.class
package com.compare.maven; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CompareReducer extends Reducer{ @Override protected void reduce(CompareBean key, Iterable values, Context context) throws IOException, InterruptedException { // super.reduce(key, values, context); for(Text value: values){ context.write(value, key); } } }
6、CompareDriver.class
package com.compare.maven; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class CompareDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(CompareDriver.class); job.setMapperClass(CompareMapper.class); job.setReducerClass(CompareReducer.class); job.setMapOutputKeyClass(CompareBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CompareBean.class); FileInputFormat.setInputPaths(job, new Path("E:\bigdata\study\test_files\compareinput")); FileOutputFormat.setOutputPath(job, new Path("E:\bigdata\study\test_files\compareoutput")); // 设置比较器类 job.setSortComparatorClass(CompareWritableComparator.class); job.waitForCompletion(true); } }
通过job.setSortComparatorClass(CompareWritableComparator.class),实际是设置了配置配置项mapreduce.job.output.key.comparator.class,使得他的值为CompareWritableComparator.class,那么接下来的逻辑就是直接获取该配置项的值,然后通过反射创建比较器实例并返回,在进行比较的时候就会调用我们的compare方法
7、CompareWritableComparator.class:继承WritableComparator ,实现自定义的比较器
package com.compare.maven; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class CompareWritableComparator extends WritableComparator { public CompareWritableComparator(){ super(CompareBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { CompareBean aBean = (CompareBean)a; CompareBean bBean = (CompareBean)b; return -aBean.getSumFlow().compareTo(bBean.getSumFlow()); } }4.3 情况2实例(利用默认比较器:Comparable实现)
上述是自定义比较器,在排序阶段会调用自定义的排序器的compare方法实现排序
对于本次案例实现,要首先取消自定义比较器的设置
package com.compare.maven; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class CompareDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(CompareDriver.class); job.setMapperClass(CompareMapper.class); job.setReducerClass(CompareReducer.class); job.setMapOutputKeyClass(CompareBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CompareBean.class); FileInputFormat.setInputPaths(job, new Path("E:\bigdata\study\test_files\compareinput")); FileOutputFormat.setOutputPath(job, new Path("E:\bigdata\study\test_files\compareoutput")); // 取消配置job.setSortComparatorClass(CompareWritableComparator.class); job.waitForCompletion(true); } }
并且重写CompareBean.class方法中的comapreTo方法
package com.compare.maven; import com.writable.maven.FlowBean; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CompareBean implements WritableComparable{ private Long downFlow; private Long upFlow; private Long sumFlow; public CompareBean(){ } public Long getDownFlow() { return downFlow; } public void setDownFlow(Long downFlow) { this.downFlow = downFlow; } public Long getUpFlow() { return upFlow; } public void setUpFlow(Long upFlow) { this.upFlow = upFlow; } public Long getSumFlow() { return sumFlow; } public void setSumFlow(Long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow(){ this.sumFlow = this.upFlow + this.downFlow; } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.upFlow); out.writeLong(this.downFlow); out.writeLong(this.sumFlow); } @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public String toString() { return downFlow + "t" + upFlow +"t"+ sumFlow ; } @Override public int compareTo(CompareBean o) { return -this.sumFlow.compareTo(o.sumFlow); } }
为什么可以直接设置compareTo方法呢?结合上述源码分析,在没有配置比较器类的时候,那么就会走get方法
对于基本数据类型会进行注册相关的比较器,但是我们这里是没有注册的步骤的,在没有设置Comparator的时候也没有注册就会走compare为null的逻辑,最终就会new一个WritableComparator,也就是创建一个默认的比较器
而最终进行比较是调用默认比较器的compare方法,如下可以看到默认的compare方法实际是调用输出类的compareTo方法,上述排序的时候输出的类是compareBean类,所以就调用他的compareTo方法进行排序比较
1、需求:在4需求的基础上将不同手机分区为不同的省份并且在不同的分区内按总流量降序排序
2、分析:添加分区器并配置分区器,只要配置了比较器,就会自动进行区内排序
3、分区器设置
PhonePatitioner.class
package com.compare.maven; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PhonePatitioner extends Partitioner{ @Override public int getPartition(CompareBean compareBean, Text text, int numPartitions) { String preNum = text.toString().substring(0, 3); int partition=4; if("136".equals(preNum)){ partition=0; }else if("137".equals(preNum)){ partition=1; }else if("138".equals(preNum)){ partition=2; }else if("139".equals(preNum)){ partition=3; } return partition; } } }
分区相关原理和详细实例可以看这里:MapReduce学习4:框架原理详解
4、在CompareDriver.class中配置分区器并设置reduceTask的数量
job.setPartitionerClass(PhonePatitioner.class); job.setNumReduceTasks(4);
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)