目录
1.排序概述
2.WritableConparable排序案例实 ***
2.1需求
2.2 需求分析
2.3 数据准备
2.3代码实现
3.结果展示
1.排序概述
2.WritableConparable排序案例实 *** 2.1需求排序是Mapreduce中最重要的 *** 作之一。无论是MapTask还是ReduceTask均会对数据按照key进行排序。该 *** 作数据hadoop的默认行为。任何逻辑上的数据均会被排序,而不管业务逻辑上是否需要。那么如何根据业务需求,对数据进行排序呢?本文将基于下面这篇博文的基础上进行改进。
Hadoop案例:Partition类控制文件输出个数https://mp.csdn.net/mp_blog/creation/editor/121602844
情景展现:客户经理拿着需求跟你说,这份文件除了将电话按不同省份文件输出以外,每个文件内部还要按照总流量进行排序。也就是说输出的文件首先按照总流量排序,再按照上行流量的顺序排序。
2.2 需求分析(1)确定map函数的输入输出
(2)确定reduce函数的输入输出
(3)因为要分区,因此需要自定义分区函数继承Partition类实现getPatition()方法。分区是在map开始之后做的,因此这里的输入数据类型为map的输出数据类型
(4)因为要实现自定义排序,而排序的内容是Flowbean对象中的总流量,因此定义Flowbean类需要继承WritableComparable重写compareTo()方法、重写toString()方法、做序列化。
(5)在Driver类中的main方法中,把自定义的相关类关联起来
2.3 数据准备 2.3代码实现自定义Flowbean包装上行流量、下行流量以及总流量
package com.yangmin.mapreduce.partitionandwritableComparable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable{ private long upFlow; //上行流量 private long downFlow; //下行流量 private long sumFlow; //总流量 //提供无参构造 public FlowBean() { } //生成三个get/set方法 public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } //实现序列化 @Override public void write(DataOutput out) throws IOException { out.writeLong(this.upFlow); out.writeLong(this.downFlow); out.writeLong(this.sumFlow); } @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public int compareTo(FlowBean o) { // 按总流量倒序排列 if (this.sumFlow > o.sumFlow){ return -1; }else if (this.sumFlow < o.sumFlow){ return 1; }else { //按照上行流量的正序排 if (this.upFlow > o.upFlow){ return -1; }else if (this.upFlow < o.upFlow){ return 1; }else { return 0; } } } @Override public String toString() { return upFlow + "t" + downFlow + "t" + sumFlow; } }
自定义ProvincePartion类继承WritableComparable,注意这里的key,value输入类型
package com.yangmin.mapreduce.partitionandwritableComparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartition2 extends Partitioner{ @Override public int getPartition(FlowBean flowBean, Text text, int numPartitions) { int partition; String phone = text.toString(); String sub = phone.substring(0, 3); //分支判断 if ("136".equals(sub)) { partition = 0; }else if ("137".equals(sub)) { partition = 1; }else if ("138".equals(sub)) { partition = 2; }else if ("139".equals(sub)) { partition = 3; }else { partition = 4; } return partition; } }
Mapp类,这里数据的输入输出类型。
package com.yangmin.mapreduce.partitionandwritableComparable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper{ private FlowBean outk = new FlowBean(); private Text outv = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取一行数据 String s = value.toString(); //切割数据 String[] split = s.split("t"); //封装outk outv outk.setUpFlow(Long.parseLong(split[1])); outk.setDownFlow(Long.parseLong(split[2])); outk.setSumFlow(); outv.set(split[0]); //写出outk outv context.write(outk,outv); } }
reducer类,输入是map的输出,输出的时候需要将key 与value调转过来。输出到文件的格式才是: 电话 上行总流量 下行总流量 全部总流量。
package com.yangmin.mapreduce.partitionandwritableComparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer{ @Override protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException { //遍历values集合,循环写出,避免总流量相同的情况 for (Text value : values) { //调整k,v位置,反向写出 context.write(value, key); } } }
Driver类:关联分区、设置reduceTask的个数
package com.yangmin.mapreduce.partitionandwritableComparable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //关联map和reduce job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //设置map段输出kv job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); //设置程序最终输出kv值 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //关联分区 job.setPartitionerClass(ProvincePartition2.class); //设置reducetask的个数 job.setNumReduceTasks(5); //设置程序的输入输出路径 FileInputFormat.setInputPaths(job,new Path("C:\ZProject\bigdata\output\output_writable")); FileOutputFormat.setOutputPath(job,new Path("C:\ZProject\bigdata\output\output_writable_comparable_partition")); //提交job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }3.结果展示
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)