输入数据(以t间隔)
id 手机号 IP地址 访问域名(有的有有的无) 上行流量 下行流量 状态码
1 11111111111 120.196.100.99 100 900 200 2 11111111112 120.196.100.92 www.baidu.com 200 200 200 3 11111111113 120.196.100.93 800 200 200 4 11111111114 120.196.100.95 30 970 200 5 11111111116 120.196.100.95 www.baidu.com 105 895 200 6 11111111115 120.196.100.99 100 900 200 7 11111111112 120.196.100.59 www.baidu.com 300 300 200 8 11111111118 120.196.100.96 150 850 200
Maven必须配置
注意:Windos本地运行需要确定本地有Hadoop依赖并确保和Pom配置文件中版本一致,WordCountDriver中第6点输入输出需要自行修改
4.0.0 com.test2 mapredceDemo11.0-SNAPSHOT org.apache.hadoop hadoop-client3.0.1 junit junit4.12 org.slf4j slf4j-log4j121.7.30 maven-compiler-plugin 3.6.1 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single
resources目录下log4j.properties 配置
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
自定义Writable类实现(FlowBean)
package com.test.mapreduce.writable; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements Writable { private Long upFlow; // 上行流量 private Long downFlow; // 下行流量 private Long sumFlow; // 总流量 public Long getUpFlow() { return upFlow; } public void setUpFlow(Long upFlow) { this.upFlow = upFlow; } public Long getDownFlow() { return downFlow; } public void setDownFlow(Long downFlow) { this.downFlow = downFlow; } public Long getSumFlow() { return sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } public FlowBean() { } @Override public void write(DataOutput dataOutput) throws IOException { // 序列化Value数据 dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { // 反序列化数据 this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } @Override public String toString() { return upFlow + "t" + downFlow + "t" + sumFlow; } }
自定义Mapper类实现(FlowMapper)
package com.test.mapreduce.writable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper{ // 定义Text对象,用于封装数据 private Text k = new Text(); // 定义FlowBean对象,用于封装数据 private FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1. 读取每一行 String line = value.toString(); // 2. 字符串切割 String[] dataList = line.split("t"); // 3. 获取需要封装的数据 String phone = dataList[1]; // 手机号 String up = dataList[dataList.length - 3]; // 上行流量 String down = dataList[dataList.length - 2]; // 下行流量 // 4. 封装K,V k.set(phone); v.setUpFlow(Long.parseLong(up)); v.setDownFlow(Long.parseLong(down)); v.setSumFlow(); // 5. 输出K,V context.write(k, v); } }
自定义Reducer类实现(FlowReducer)
package com.test.mapreduce.writable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer{ // 定义flowBean对象,用于封装Value private FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 1. 每次调用reduce初始化 总上行和总下行流量(因为同一个key可能会有多个数据) long totalUp = 0; long totalDown = 0; // 2. 计算总下行和总上行 for (FlowBean flowBean : values) { totalUp += flowBean.getUpFlow(); totalDown += flowBean.getDownFlow(); } // 3. 封装K,V v.setUpFlow(totalUp); v.setDownFlow(totalDown); v.setSumFlow(); // 4. 输出K,V context.write(key, v); } }
自定义Reducer类实现(FlowDriver)
package com.test.mapreduce.writable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1.创建配置信息Configuration对象并获取Job单例对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.设置关联本Driver程序的jar job.setJarByClass(FlowDriver.class); // 3.设置关联Mapper和Reducer的jar job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); // 4.设置Mapper输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5. 设置最终输出的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6.设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path("D:\input")); FileOutputFormat.setOutputPath(job, new Path("D:\output")); // 7.提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
输出数据
手机号 总上行流量 总下行流量 总流量
11111111111 100 900 1000 11111111112 500 500 1000 11111111113 800 200 1000 11111111114 30 970 1000 11111111115 100 900 1000 11111111116 105 895 1000 11111111118 150 850 1000
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)