个人学习整理,所有资料来自尚硅谷
B站学习连接:添加链接描述
- 问题引出:
要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(5个分区)。
- 默认Partitioner分区
public class HashPartitionerextends Partitioner { public HashPartitioner(){ } public int getPartition(K key,V value,int numReduceTasks){ return (key.hashCode() & 2147483467) % numReduceTasks; } }
默认分区是根据key的hashCode对ReduceTasks个数取模得到。用户需自定义Partitioner控制key的存储分区。
- 自定义Partitioner步骤
(1)自定义类继承Partitioner,重写getPartition()方法
public class CustomPartitioner extends Partitioner{ @Override public int getPartition(Text key,FlowBean value,int numPartitions){ //控制分区逻辑代码 return partition; } }
(2)在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.Class);
(3)自定义Partition后,要根据自定义的Partitioner的逻辑设置相应数量的ReduceTask
Job.setNumReduceTasks(5);
- 分区总结
(1)如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
(2)如果1 (3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都会交给这一个ReduceTask,最终也就会产生一个结果文件part-r-00000; 可以看到当partitions数量为1时,则最终都会交给一个ReduceTask,那么最终也就会产生于一个结果文件。 (4)分区号必须从零开始,逐一累加。 (5)例如假设自定义分区为5,则 (1)需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区) (2)希望输出数据:手机号136、137、138、139开头都分别到一个独立的4个文件中,其他开头的放到一个文件中。 (3)分析: 欢迎分享,转载请注明来源:内存溢出int partition;
if ("136".equals(prePhone)){
partition=0;
}else if("137".equals(prePhone)){
partition=1;
}else if("138".equals(prePhone)){
partition=2;
}else if("139".equals(prePhone)){
partition=3;
}else{
partition=4;
}
return partition;
//上述partition必须从0开始,逐一累加
job.setNumReduceTasks(1);//会正常运行,只不过会产生一个输出文件
job.setNumReduceTasks(2);//会报错
job.setNumReduceTasks(6);//大于5,程序会正常运行,会产生空文件
1.2 Partition分区案例实 ***
package com.atguigu.mapreduce.partitioner2;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {//实现Writable接口
private long upFlow;
private long downFlow;
private long sumFlow;
//反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//重写序列化方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
//重写反序列方法
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
//反序列化的顺序必须和序列化的顺序相同
@Override
public String toString() {
return upFlow + "t" + downFlow + "t" + sumFlow;
}
}
package com.atguigu.mapreduce.partitioner2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper
package com.atguigu.mapreduce.partitioner2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer
package com.atguigu.mapreduce.partitioner2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner
package com.atguigu.mapreduce.partitioner2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、关联本Driver的jar
job.setJarByClass(FlowDriver.class);
//3、关联Mapper和Reducer的jar
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4、设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5、设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//8.指定自定义分区
job.setPartitionerClass(ProvincePartitioner.class);
//9.同时指定相应数量的ReduceTask
job.setNumReduceTasks(5);
//6、设置输入和输出路径
FileInputFormat.setInputPaths(job,new Path("D:\downloads\hadoop-3.1.0\data_input\inputflow"));
FileOutputFormat.setOutputPath(job,new Path("D:\downloads\hadoop-3.1.0\data\output\output5"));
// FileInputFormat.setInputPaths(job,new Path(args[0]));
// FileOutputFormat.setOutputPath(job,new Path(args[1]));
//7、提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
评论列表(0条)