package demo05.flowOrder; import demo04.flow.FlowNum; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable{ private Integer upFlow; private Integer downFlow; private Integer upCountFlow; private Integer downCountFlow; @Override public int compareTo(FlowNum o) { return this.upFlow > o.getUpFlow()?-1:1; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(upFlow); dataOutput.writeInt(downFlow); dataOutput.writeInt(upCountFlow); dataOutput.writeInt(downCountFlow); } @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readInt(); this.downFlow = dataInput.readInt(); this.upCountFlow = dataInput.readInt(); this.downCountFlow = dataInput.readInt(); } @Override public String toString() { return this.upFlow+"t"+this.downFlow+"t"+this.upCountFlow+"t"+this.downCountFlow; } public Integer getUpFlow() { return upFlow; } public void setUpFlow(Integer upFlow) { this.upFlow = upFlow; } public Integer getDownFlow() { return downFlow; } public void setDownFlow(Integer downFlow) { this.downFlow = downFlow; } public Integer getUpCountFlow() { return upCountFlow; } public void setUpCountFlow(Integer upCountFlow) { this.upCountFlow = upCountFlow; } public Integer getDownCountFlow() { return downCountFlow; } public void setDownCountFlow(Integer downCountFlow) { this.downCountFlow = downCountFlow; } }
package demo05.flowOrder; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowOrderMap extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //分割数据 String[] strings = value.toString().split("t"); //获取上行流量,下行流量,上行总流量,下行总流量 FlowBean flowBean = new FlowBean(); flowBean.setUpFlow(Integer.parseInt(strings[1])); flowBean.setDownFlow(Integer.parseInt(strings[2])); flowBean.setUpCountFlow(Integer.parseInt(strings[3])); flowBean.setDownCountFlow(Integer.parseInt(strings[4])); context.write(flowBean,new Text(strings[0])); } }
package demo05.flowOrder; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowOrderReduce extends Reducer{ @Override protected void reduce(FlowBean key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { //这个地方输出字节流,hadoop会自己根据compareTo排序后输出 context.write(key,values.iterator().next()); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)