- 分区
- 排序
- 规约
- 分组
- 主类代码
将数据分成若干个块,每个块可以按照约定形成文件
步骤:
1 继承 Partitioner 类 并重写 getPartition方法
2 在主类中设置启用分区 job.setPartitionerClass(OrderParition.class);
注意:
1 在继承 Partitioner
时的两个类型 分别对应了 K2 V2 也就是 Mapper的输出类型 2 getPartition 中的参数 i 是为RedeceTask的个数 在主类中以 job.setNumReduceTasks(1);设置
3 setNumReduceTasks 设置了几个RedeceTask 就会产生几个结果文件
代码展示 partition 类
package ordersTop; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class OrderParition extends Partitioner排序{ public int getPartition(OrderBean orderBean, Text text, int i) { return (orderBean.getName().hashCode() & Integer.MAX_VALUE) % i; } }
- 步骤:
- 1 创建key的实体类 实现 WritableComparable 接口
- 2 排序主要依靠接口下 compareTo 方法 定义自己的排序规则
- 3 write 以及 readFields 为根据字段类型的统一写法
- 注意:
- 1 排序字段必须包含在key中
- 2 主类不需要其他设置
key实体类代码
package ordersTop; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable规约{ private String order; private String name; private int amount; @Override public String toString() { return order + ',' + name + ',' + amount + ','; } public String getOrder() { return order; } public void setOrder(String order) { this.order = order; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public int compareTo(OrderBean o) { if (this.name.compareTo(o.getName()) == 0){ return o.getAmount() - this.amount; } return this.name.compareTo(o.getName()); } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(order); dataOutput.writeUTF(name); dataOutput.writeInt(amount); } public void readFields(DataInput dataInput) throws IOException { this.order = dataInput.readUTF(); this.name = dataInput.readUTF(); this.amount = dataInput.readInt(); } }
分组规约其实就是一个 Reduce
和Reduce写法一致 之只是在主类中设置不同
job.setCombinerClass(OrderReduce.class);
- 步骤:
- 1 创建自定义分组规则类继承 WritableComparator
- 2 重写compare方法 为你需要的分组规则
package ordersTop; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderGroup extends WritableComparator { public OrderGroup() { super(OrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean bean1 = (OrderBean) a; OrderBean bean2 = (OrderBean) b; return bean1.getName().compareTo(bean2.getName()); } }主类代码
package ordersTop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; public class Main { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"ordertop"); // 输入路径 与 读取方式 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("file:///C:\Users\Administrator\Desktop\Hadoop学习\orders.txt")); job.setMapperClass(OrderMapper.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(Text.class); // job.setPartitionerClass(OrderParition.class); //分区 用时取消注释 // job.setCombinerClass(OrderReduce.class); //规约 用时取消注释 // job.setGroupingComparatorClass(OrderGroup.class); //分组 用时取消注释 job.setReducerClass(OrderReduce.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // job.setNumReduceTasks(3);//设置ReduceTasks个数 用时取消注释 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("file:///C:\Users\Administrator\Desktop\Hadoop学习\ordertop")); boolean flag = job.waitForCompletion(true); System.exit(flag?0:1); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)