hadoop按照上行流量进行排序

hadoop按照上行流量进行排序,第1张

hadoop按照上行流量进行排序
package demo05.flowOrder;

import demo04.flow.FlowNum;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class FlowBean implements WritableComparable {
    private Integer upFlow;
    private Integer downFlow;
    private Integer upCountFlow;
    private Integer downCountFlow;

    @Override
    public int compareTo(FlowNum o) {
        return this.upFlow > o.getUpFlow()?-1:1;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(upFlow);
        dataOutput.writeInt(downFlow);
        dataOutput.writeInt(upCountFlow);
        dataOutput.writeInt(downCountFlow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readInt();
        this.downFlow = dataInput.readInt();
        this.upCountFlow = dataInput.readInt();
        this.downCountFlow = dataInput.readInt();
    }

    @Override
    public String toString() {
        return this.upFlow+"t"+this.downFlow+"t"+this.upCountFlow+"t"+this.downCountFlow;
    }

    public Integer getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Integer upFlow) {
        this.upFlow = upFlow;
    }

    public Integer getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Integer downFlow) {
        this.downFlow = downFlow;
    }

    public Integer getUpCountFlow() {
        return upCountFlow;
    }

    public void setUpCountFlow(Integer upCountFlow) {
        this.upCountFlow = upCountFlow;
    }

    public Integer getDownCountFlow() {
        return downCountFlow;
    }

    public void setDownCountFlow(Integer downCountFlow) {
        this.downCountFlow = downCountFlow;
    }


}

package demo05.flowOrder;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowOrderMap extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        //分割数据
        String[] strings = value.toString().split("t");
        //获取上行流量,下行流量,上行总流量,下行总流量
        FlowBean flowBean = new FlowBean();
        flowBean.setUpFlow(Integer.parseInt(strings[1]));
        flowBean.setDownFlow(Integer.parseInt(strings[2]));
        flowBean.setUpCountFlow(Integer.parseInt(strings[3]));
        flowBean.setDownCountFlow(Integer.parseInt(strings[4]));

        context.write(flowBean,new Text(strings[0]));
    }
}

package demo05.flowOrder;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowOrderReduce extends Reducer {
    @Override
    protected void reduce(FlowBean key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
        //这个地方输出字节流,hadoop会自己根据compareTo排序后输出
        context.write(key,values.iterator().next());
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5611067.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存