package com.cn.demo_groupTopN; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class MyGroupCompactor extends WritableComparator { @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean first = (OrderBean) a; OrderBean second = (OrderBean) b; return first.getOrder_id().compareTo(second.getOrder_id()); } public MyGroupCompactor() { super(OrderBean.class,true); } }
package com.cn.demo_groupTopN; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable{ private String order_id; private Double price; @Override public int compareTo(OrderBean orderBean) { //如果订单号相同比较价格,否则比较无意义 if (this.order_id.compareTo(orderBean.getOrder_id())==0) { return this.price.compareTo(orderBean.getPrice()); } return 0; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(order_id); dataOutput.writeDouble(price); } @Override public void readFields(DataInput dataInput) throws IOException { this.order_id = dataInput.readUTF(); this.price = dataInput.readDouble(); } public String getOrder_id() { return order_id; } public void setOrder_id(String order_id) { this.order_id = order_id; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } @Override public String toString() { return this.order_id + "t" + this.price; } }
package com.cn.demo_groupTopN; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MyGroupMap extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("t"); OrderBean orderBean = new OrderBean(); orderBean.setOrder_id(splits[0]); orderBean.setPrice(Double.parseDouble(splits[2])); context.write(orderBean,new DoubleWritable(Double.parseDouble(splits[2]))); } }
package com.cn.demo_groupTopN; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MyGroupReduce extends Reducer{ @Override protected void reduce(OrderBean key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { int i = 0; for (DoubleWritable value: values) { i++; if(i<=2){ context.write(key,value); }else { break; } } } }
package com.cn.demo_groupTopN; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartion extends Partitioner{ @Override public int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int i) { return (orderBean.getOrder_id().hashCode() & Integer.MAX_VALUE)%i; } }
package com.cn.demo_groupTopN; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyGroupMain extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf(),"group_demo"); job.setJarByClass(MyGroupMain.class); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop、大数据离线第五天、大数据离线第五天\自定义groupingComparator\input")); job.setMapperClass(MyGroupMap.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(DoubleWritable.class); //设置分区类 job.setPartitionerClass(MyPartion.class); //设置分区数量 job.setNumReduceTasks(2); //设置分组类 job.setGroupingComparatorClass(MyGroupCompactor.class); //设置reduce类 job.setReducerClass(MyGroupReduce.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(DoubleWritable.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop、大数据离线第五天、大数据离线第五天\自定义groupingComparator\output_TOPN")); boolean b = job.waitForCompletion(true); return b?0:1; } public static void main(String[] args) throws Exception { int run = ToolRunner.run(new Configuration(),new MyGroupMain(),args); System.exit(run); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)