任务一:流量统计项目案例
1.基本思路:实现自定义的 bean 来封装流量信息,并将 bean 作为 Map 输出的 key 来传输。
2.MapReduce 程序在处理数据的过程中会对数据排序(Map 输出的 kv 对传输到 Reduce 之前,会排序),排序的依据是 Map 输出的 key, 所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到 key 中。
任务二:WritableComparable 排序
1.排序是 MapReduce 框架中最重要的 *** 作之一。 MapTask 和 ReduceTak 均会对数据按照 key 进行排序。该 *** 作属于 Hadoop 的默认行为,任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
2.WritableComparable 继承自 Writable和 java.lang.Comparable 接口,是一个Writable也是一个Comparable,也就是说,既可以序列化,也可以比较。
3.完整的 bean 程序如下所示:
package com.hongyaa.mr.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBeanSort implements WritableComparable{ private long upFlow; // 总上行流量 private long downFlow; // 总下行流量 private long sumFlow; // 总流量 // 无参构造方法必须有,目的是为了在反序列化 *** 作创建对象实例时调用无参构造器 public FlowBeanSort() { super(); } // 带参的构造方法,目的是为了对象的初始化 public FlowBeanSort(long upFlow, long downFlow, long sumFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } // 序列化方法,将对象的字段信息写入输出流 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } // 反序列化方法,从输入流中读取各个字段信息 // 注意:字段的反序列化顺序需要和序列化的顺序保持一致,而且字段的类型和个数也要保持一致 @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } // 实现自定义排序,按照总流量倒序排序 @Override public int compareTo(FlowBeanSort o) { // 自定义降序排列 return this.sumFlow > o.getSumFlow() ? -1 : 1; } // 重写toString()方法 @Override public String toString() { return upFlow + "t" + downFlow + "t" + sumFlow; } }
任务三:MapReduce 编程
1.Map 端程序编写
具体代码如下所示:
// 输入数据是上一个统计程序的输出结果,已经是各个手机号的总流量信息 public class FlowSumSortMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { // (1)获取一行文本的内容,并将其转换为String类型,之后按照分隔符“t”进行切分 String[] splits = value.toString().split("t"); // (2)取出手机号 String telephone = splits[0]; // (3)封装对象 FlowBeanSort fbs = new FlowBeanSort(); fbs.setUpFlow(Long.parseLong(splits[1])); fbs.setDownFlow(Long.parseLong(splits[2])); fbs.setSumFlow(Long.parseLong(splits[3])); // (4)将封装的fbs对象作为key,将手机号作为value,分发给Reduce端 context.write(fbs, new Text(telephone)); } }
2.Reduce 端程序编写
具体代码如下所示:
public class FlowSumSortReducer extends Reducer{ @Override protected void reduce(FlowBeanSort key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { // 遍历集合 for (Text tele : values) { // 将手机号作为key,将封装好的流量信息作为value,作为最终的输出结果 context.write(new Text(tele), key); } } }
3.Driver 端程序编写
完整代码如下所示:
public class FlowSumSortDemo { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // (1)获取配置信息类 Configuration conf = new Configuration(); // 指定mapreduce程序运行的hdfs的相关运行参数 conf.set("fs.defaultFS", "hdfs://localhost:9000"); // (2)新建一个Job对象 Job job = Job.getInstance(conf); // (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写) job.setJarByClass(FlowSumSortDemo.class); // (4)指定 Mapper 类和 Reducer 类 job.setMapperClass(FlowSumSortMapper.class); job.setReducerClass(FlowSumSortReducer.class); // (5)指定 MapTask 的输出key-value类型 job.setMapOutputKeyClass(FlowBeanSort.class); job.setMapOutputValueClass(Text.class); // (6)指定 ReduceTask 的输出key-value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBeanSort.class); // (7)指定该 mapreduce 程序数据的输入和输出路径 Path inPath = new Path("/flow/output_sum"); Path outPath = new Path("/flow/output_sort"); // 获取fs对象 FileSystem fs = FileSystem.get(conf); if (fs.exists(outPath)) { fs.delete(outPath, true); } FileInputFormat.setInputPaths(job, inPath); FileOutputFormat.setOutputPath(job, outPath); // (8)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出 boolean waitForCompletion = job.waitForCompletion(true); System.exit(waitForCompletion ? 0 : 1); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)