目录
1.需求
2.需求分析
3.代码
(1)在之前的序列化案例实 *** 的基础上,增加一个分区类
(2) 在driver类中增加自定义数据分区设置和ReduceTask设置
1.需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据:txt文件
(2)期望输出数据:
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
2.需求分析在之前的序列化案例实 *** 上进行修改。
3.代码 (1)在之前的序列化案例实 *** 的基础上,增加一个分区类public class ProvincePartitioner extends Partitioner(2) 在driver类中增加自定义数据分区设置和ReduceTask设置{ @Override public int getPartition(Text text, FlowBean flowBean, int i) { //test是手机号 String phone = text.toString(); //取前三位,substring(0,3)包含左边,不包含右边 String prePhone = phone.substring(0, 3); int partition; if("136".equals(prePhone)){ partition = 0; }else if("137".equals(prePhone)){ partition = 1; }else if("138".equals(prePhone)){ partition = 2; }else if("139".equals(prePhone)){ partition = 4; }else{ partition = 5; } return partition; } }
public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowDriver.class); job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); job.setPartitionerClass(ProvincePartitioner.class); job.setNumReduceTasks(6); FileInputFormat.setInputPaths(job,new Path("D:\code\Hadoop\input\inputflow")); FileOutputFormat.setOutputPath(job,new Path("D:\code\Hadoop\output55555")); boolean result = job.waitForCompletion(true); System.exit(result? 0:1); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)