MR自定义分组获取TopN

MR自定义分组获取TopN,第1张

MR自定义分组获取TopN
package com.cn.demo_groupTopN;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;


public class MyGroupCompactor extends WritableComparator {

    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean first = (OrderBean) a;
        OrderBean second = (OrderBean) b;
        return first.getOrder_id().compareTo(second.getOrder_id());
    }

    
    public MyGroupCompactor() {
        super(OrderBean.class,true);
    }
}

package com.cn.demo_groupTopN;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class OrderBean implements WritableComparable {
    private String order_id;
    private Double price;

    
    @Override
    public int compareTo(OrderBean orderBean) {
        //如果订单号相同比较价格,否则比较无意义
        if (this.order_id.compareTo(orderBean.getOrder_id())==0) {
            return this.price.compareTo(orderBean.getPrice());
        }
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(order_id);
        dataOutput.writeDouble(price);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.order_id = dataInput.readUTF();
        this.price = dataInput.readDouble();
    }

    public String getOrder_id() {
        return order_id;
    }

    public void setOrder_id(String order_id) {
        this.order_id = order_id;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    
    @Override
    public String toString() {
        return this.order_id + "t" + this.price;
    }
}

package com.cn.demo_groupTopN;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class MyGroupMap extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        String[] splits = value.toString().split("t");
        OrderBean orderBean = new OrderBean();
        orderBean.setOrder_id(splits[0]);
        orderBean.setPrice(Double.parseDouble(splits[2]));
        context.write(orderBean,new DoubleWritable(Double.parseDouble(splits[2])));
    }
}

package com.cn.demo_groupTopN;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class MyGroupReduce extends Reducer {
    @Override
    protected void reduce(OrderBean key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {

        
        int i = 0;
        for (DoubleWritable value: values) {
            i++;
            if(i<=2){
                context.write(key,value);
            }else {
                break;
            }
        }


    }


}

package com.cn.demo_groupTopN;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;


public class MyPartion extends Partitioner {
    @Override
    public int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int i) {
        
        return (orderBean.getOrder_id().hashCode() & Integer.MAX_VALUE)%i;
    }
}

package com.cn.demo_groupTopN;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyGroupMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(super.getConf(),"group_demo");
        job.setJarByClass(MyGroupMain.class);

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop、大数据离线第五天、大数据离线第五天\自定义groupingComparator\input"));

        job.setMapperClass(MyGroupMap.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        //设置分区类
        job.setPartitionerClass(MyPartion.class);
        //设置分区数量
        job.setNumReduceTasks(2);

        //设置分组类
        job.setGroupingComparatorClass(MyGroupCompactor.class);

        //设置reduce类
        job.setReducerClass(MyGroupReduce.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(DoubleWritable.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop、大数据离线第五天、大数据离线第五天\自定义groupingComparator\output_TOPN"));

        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(),new MyGroupMain(),args);
        System.exit(run);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5653712.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存