Hadoop实战 一 Shuffle 阶段

Hadoop实战 一 Shuffle 阶段,第1张

Hadoop实战 一 Shuffle 阶段

Shuffle 阶段
  • 分区
  • 排序
  • 规约
  • 分组
  • 主类代码

分区

将数据分成若干个块,每个块可以按照约定形成文件

  • 步骤:

  • 1 继承 Partitioner 类 并重写 getPartition方法

  • 2 在主类中设置启用分区 job.setPartitionerClass(OrderParition.class);

  • 注意:

  • 1 在继承 Partitioner 时的两个类型 分别对应了 K2 V2 也就是 Mapper的输出类型

  • 2 getPartition 中的参数 i 是为RedeceTask的个数 在主类中以 job.setNumReduceTasks(1);设置

  • 3 setNumReduceTasks 设置了几个RedeceTask 就会产生几个结果文件

代码展示 partition 类

package ordersTop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;


public class OrderParition extends Partitioner {

    public int getPartition(OrderBean orderBean, Text text, int i) {
        return (orderBean.getName().hashCode() & Integer.MAX_VALUE) % i;
    }

}

排序
  • 步骤:
  • 1 创建key的实体类 实现 WritableComparable 接口
  • 2 排序主要依靠接口下 compareTo 方法 定义自己的排序规则
  • 3 write 以及 readFields 为根据字段类型的统一写法
  • 注意:
  • 1 排序字段必须包含在key中
  • 2 主类不需要其他设置

key实体类代码

package ordersTop;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable {

    private String order;
    private String name;
    private int amount;

    @Override
    public String toString() {
        return  order + ',' +
                name + ',' +
                amount + ',';
    }

    public String getOrder() {
        return order;
    }

    public void setOrder(String order) {
        this.order = order;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public int compareTo(OrderBean o) {
        if (this.name.compareTo(o.getName()) == 0){
            return o.getAmount() - this.amount;
        }
        return  this.name.compareTo(o.getName());
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(order);
        dataOutput.writeUTF(name);
        dataOutput.writeInt(amount);
    }

    public void readFields(DataInput dataInput) throws IOException {
        this.order = dataInput.readUTF();
        this.name = dataInput.readUTF();
        this.amount = dataInput.readInt();
    }
}

规约

规约其实就是一个 Reduce
和Reduce写法一致 之只是在主类中设置不同
job.setCombinerClass(OrderReduce.class);

分组
  • 步骤:
  • 1 创建自定义分组规则类继承 WritableComparator
  • 2 重写compare方法 为你需要的分组规则
package ordersTop;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroup extends WritableComparator {

    public OrderGroup() {
        super(OrderBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean bean1 = (OrderBean) a;
        OrderBean bean2 = (OrderBean) b;
        return bean1.getName().compareTo(bean2.getName());
    }
}

主类代码
package ordersTop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class Main {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"ordertop");

        // 输入路径 与 读取方式
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///C:\Users\Administrator\Desktop\Hadoop学习\orders.txt"));

        job.setMapperClass(OrderMapper.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(Text.class);

//        job.setPartitionerClass(OrderParition.class); //分区 用时取消注释
//        job.setCombinerClass(OrderReduce.class);  //规约 用时取消注释
//        job.setGroupingComparatorClass(OrderGroup.class); //分组 用时取消注释

        job.setReducerClass(OrderReduce.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

//        job.setNumReduceTasks(3);//设置ReduceTasks个数 用时取消注释

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///C:\Users\Administrator\Desktop\Hadoop学习\ordertop"));

        boolean flag = job.waitForCompletion(true);
        System.exit(flag?0:1);
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5686033.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存