MapReduce编程之求每个学生的最高成绩及对应科目(使用GroupingComparator和SortComparator)

MapReduce编程之求每个学生的最高成绩及对应科目(使用GroupingComparator和SortComparator),第1张

MapReduce编程之求每个学生的最高成绩及对应科目(使用GroupingComparator和SortComparator) stuscore.csv
1,鸿飞,语文,84
2,鹏海,语文,85
3,铭晨,语文,86
4,霑千昌,语文,87
5,智宇,语文,88
6,景彰,语文,89
7,辰铭,语文,90
8,曜灿,语文,91
9,昊苍,语文,92
10,子昂,语文,93
11,景行,语文,94
12,昆皓,语文,95
13,文昂,语文,96
14,昊苍,语文,48
15,德泽,语文,49
16,鸿远,语文,50
17,昌燎,语文,51
18,昌翰,语文,52
19,鸿振,语文,53
20,鸿卓,语文,54
21,浩初,语文,55
22,运鹏,语文,56
23,新曦,语文,57
24,智阳,语文,58
25,杨伟,语文,59
1,鸿飞,数学,67
2,鹏海,数学,68
3,铭晨,数学,69
4,霑千昌,数学,70
5,智宇,数学,71
6,景彰,数学,72
7,辰铭,数学,73
8,曜灿,数学,74
9,昊苍,数学,75
10,子昂,数学,76
11,景行,数学,77
12,昆皓,数学,78
13,文昂,数学,79
14,昊苍,数学,80
15,德泽,数学,81
16,鸿远,数学,82
17,昌燎,数学,83
18,昌翰,数学,84
19,鸿振,数学,85
20,鸿卓,数学,86
21,浩初,数学,87
22,运鹏,数学,88
23,新曦,数学,89
24,智阳,数学,90
25,杨伟,数学,91
1,鸿飞,英语,23
2,鹏海,英语,24
3,铭晨,英语,25
4,霑千昌,英语,26
5,智宇,英语,27
6,景彰,英语,28
7,辰铭,英语,29
8,曜灿,英语,30
9,昊苍,英语,31
10,子昂,英语,32
11,景行,英语,33
12,昆皓,英语,34
13,文昂,英语,35
14,昊苍,英语,36
15,德泽,英语,37
16,鸿远,英语,38
17,昌燎,英语,39
18,昌翰,英语,40
19,鸿振,英语,41
20,鸿卓,英语,42
21,浩初,英语,43
22,运鹏,英语,44
23,新曦,英语,45
24,智阳,英语,46
25,杨伟,英语,47
Student.java
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Student implements WritableComparable {
    private long stuId;
    private String stuName;
    private String subject;
    private int score;

    @Override
    public String toString() {
        return "Student{" +
                "stuId=" + stuId +
                ", stuName='" + stuName + ''' +
                ", subject='" + subject + ''' +
                ", score=" + score +
                '}';
    }

    public long getStuId() {
        return stuId;
    }

    public void setStuId(long stuId) {
        this.stuId = stuId;
    }

    public String getStuName() {
        return stuName;
    }

    public void setStuName(String stuName) {
        this.stuName = stuName;
    }

    public String getSubject() {
        return subject;
    }

    public void setSubject(String subject) {
        this.subject = subject;
    }

    public int getScore() {
        return score;
    }

    public void setScore(int score) {
        this.score = score;
    }

    public Student(long stuId, String stuName, String subject, int score) {
        this.stuId = stuId;
        this.stuName = stuName;
        this.subject = subject;
        this.score = score;
    }

    public Student() {
    }

    @Override
    public int compareTo(Student o) {
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(stuId);
        dataOutput.writeUTF(stuName);
        dataOutput.writeUTF(subject);
        dataOutput.writeInt(score);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.stuId = dataInput.readLong();
        this.stuName = dataInput.readUTF();
        this.subject = dataInput.readUTF();
        this.score = dataInput.readInt();
    }
}
StudentScoreMapper.java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class StudentScoreMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println(key.get()+"      "+value);
        String[] split = value.toString().split(",");
        Student student = new Student(Integer.parseInt(split[0]),split[1],split[2], Integer.parseInt(split[3]));
        context.write(student, NullWritable.get());
    }
}
StudentScoreSortComparator.java
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class StudentScoreSortComparator extends WritableComparator {
    public StudentScoreSortComparator(){
        super(Student.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        Student a1=(Student)a;
        Student b1=(Student)b;
        if(a1.getStuId()!=b1.getStuId()){
            return (int) (a1.getStuId()-b1.getStuId());
        }else {
            return b1.getScore()-a1.getScore();
        }
    }
}
StudentScoreGroupingComparator.java
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class StudentScoreGroupingComparator extends WritableComparator {
    public StudentScoreGroupingComparator(){
        super(Student.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        Student a1=(Student)a;
        Student b1=(Student)b;
        return (int) (a1.getStuId()-b1.getStuId());
    }
}
StudentScoreReducer.java
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class StudentScoreReducer extends Reducer {

    @Override
    protected void reduce(Student key, Iterable values, Context context) throws IOException, InterruptedException {
        System.out.println(key.toString());
        context.write(NullWritable.get(),key);
    }
}
StudentScoreDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class StudentScoreDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(StudentScoreDriver.class);

        job.setMapperClass(StudentScoreMapper.class);
        job.setMapOutputKeyClass(Student.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setSortComparatorClass(StudentScoreSortComparator.class);
        job.setGroupingComparatorClass(StudentScoreGroupingComparator.class);

        job.setReducerClass(StudentScoreReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Student.class);

        FileInputFormat.setInputPaths(job,new Path("G:\kgc\KB15\code\hadoopstu\in\demo6\stuscore.csv"));

        Path path = new Path("G:\kgc\KB15\code\hadoopstu\out6");
        FileSystem fs = FileSystem.get(path.toUri(), configuration);
        if(fs.exists(path)){
            fs.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);
        job.waitForCompletion(true);
        System.out.println("programme run over");
    }
}
运行结果

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5121490.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存