package ks; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class mybean implements Writable{ //定义变量 private long upFlow; //上行数据包数 private long downFlow; //下行数据包数 private long sumFlow; //空参构造 public mybean(){ super(); } //有参构造 public mybean(long upFlow, long downFlow){ super(); this.upFlow = upFlow; this.downFlow = downFlow; sumFlow = upFlow + downFlow; } //定义set方法,可有可无 public void set(long upFlow,long downFlow){ this.upFlow = upFlow; this.downFlow = downFlow; sumFlow = upFlow + downFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getUpFlow() { return upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public void setSumFlow(long sumFlow){ this.sumFlow = sumFlow; } public long getSumFlow(){ return sumFlow; } 重写tostring方法 public String toString() { return upFlow + "t" + downFlow + "t" + upFlow + downFlow; } //序列化方法 public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //反序列化方法,顺序必须和序列化方法一致 public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } }
#重写map方法
package ks; import mreduce.mybean; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //第一个参数表示接收key的数据类型(一般都为LongWritable ) //第二个参数表示接受的value的数据类型(一般都为Text) //第三个参数表示写出的key的数据类型 //第四个参数表示写出的value的数据类型 public class mymapper extends Mapper重写reduce方法{ Text k = new Text(); mybean v = new mybean(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //获取一行 String line = value.toString(); //对获取到的文本进行切分 String[] splited = line.split("t"); //设置key值 k.set(splited[1]); long upFlow = Long.parseLong(splited[splited.length-3]); long downFlow = Long.parseLong(splited[splited.length-2]); //设置value值 v.set(upFlow,downFlow); //写出数据 context.write(k,v); } }
package ks; import mreduce.mybean; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.io.Text; import java.io.IOException; //第一个参数表示接收key的数据类型(一般都为LongWritable ) //第二个参数表示接受的value的数据类型(一般都为Text) //第三个参数表示写出的key的数据类型 //第四个参数表示写出的value的数据类型 public class myReducer extends Reducer分区{ //定义一个bean对象,用于存写出的value mybean v = new mybean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long sumUpFlow = 0; long sumDownFlow = 0; //进行统计 for (mybean flow: values) { sumUpFlow += flow.getUpFlow(); sumDownFlow += flow.getDownFlow(); } //调用set方法,初始化v对象 v.set(sumUpFlow,sumDownFlow); //写出 context.write(key,v); } }
import mreduce.mybean; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.io.Text; //第一个参数表示接收key的数据类型 //第二个参数表示接受的value的数据类型 public class mypartition extends Partitioner主类{ @Override public int getPartition(Text key, mybean value, int numPartitions) { //获取手机号的前三位数字 String pre = key.toString().substring(0,3); //默认分区为4号分区 int partition = 4; if("136".equals(pre)){ //手机号前三位为136在0号分区 partition = 0; }else if("137".equals(pre)){ //手机号前三位为137在1号分区 partition = 1; }else if("138".equals(pre)){ //手机号前三位为138在2号分区 partition = 2; }else if("139".equals(pre)){ //手机号前三位为139在3号分区 partition = 3; } //返回分区 return partition; } }
package ks; import mreduce.mybean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.BasicConfigurator; import java.io.IOException; public class mydriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set("mapred.jar", "D:\Hadoop\untitled1\untitled\target\untitled1-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(conf); //进行打包 job.setJarByClass(mydriver.class); //设置数据源路径 FileInputFormat.setInputPaths(job, new Path("file:///d:\value.txt")); //绑定map类 job.setMapperClass(mymapper.class); //shuffle过程 job.setPartitionerClass(mypartition.class); job.setNumReduceTasks(5); //设置map的输出数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(mybean.class); //绑定reduce类 job.setReducerClass(myReducer.class); //设置reduce的输出数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(mybean.class); //设置文件保存路径 FileOutputFormat.setOutputPath(job, new Path("file:///d:\output1")); //交给yarn去执行,直到执行结束才退出本程序 job.waitForCompletion(true); BasicConfigurator.configure(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)