1,鸿飞,语文,84 2,鹏海,语文,85 3,铭晨,语文,86 4,霑千昌,语文,87 5,智宇,语文,88 6,景彰,语文,89 7,辰铭,语文,90 8,曜灿,语文,91 9,昊苍,语文,92 10,子昂,语文,93 11,景行,语文,94 12,昆皓,语文,95 13,文昂,语文,96 14,昊苍,语文,48 15,德泽,语文,49 16,鸿远,语文,50 17,昌燎,语文,51 18,昌翰,语文,52 19,鸿振,语文,53 20,鸿卓,语文,54 21,浩初,语文,55 22,运鹏,语文,56 23,新曦,语文,57 24,智阳,语文,58 25,杨伟,语文,59 1,鸿飞,数学,67 2,鹏海,数学,68 3,铭晨,数学,69 4,霑千昌,数学,70 5,智宇,数学,71 6,景彰,数学,72 7,辰铭,数学,73 8,曜灿,数学,74 9,昊苍,数学,75 10,子昂,数学,76 11,景行,数学,77 12,昆皓,数学,78 13,文昂,数学,79 14,昊苍,数学,80 15,德泽,数学,81 16,鸿远,数学,82 17,昌燎,数学,83 18,昌翰,数学,84 19,鸿振,数学,85 20,鸿卓,数学,86 21,浩初,数学,87 22,运鹏,数学,88 23,新曦,数学,89 24,智阳,数学,90 25,杨伟,数学,91 1,鸿飞,英语,23 2,鹏海,英语,24 3,铭晨,英语,25 4,霑千昌,英语,26 5,智宇,英语,27 6,景彰,英语,28 7,辰铭,英语,29 8,曜灿,英语,30 9,昊苍,英语,31 10,子昂,英语,32 11,景行,英语,33 12,昆皓,英语,34 13,文昂,英语,35 14,昊苍,英语,36 15,德泽,英语,37 16,鸿远,英语,38 17,昌燎,英语,39 18,昌翰,英语,40 19,鸿振,英语,41 20,鸿卓,英语,42 21,浩初,英语,43 22,运鹏,英语,44 23,新曦,英语,45 24,智阳,英语,46 25,杨伟,英语,47Student.java
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class Student implements WritableComparableStudentScoreMapper.java{ private long stuId; private String stuName; private String subject; private int score; @Override public String toString() { return "Student{" + "stuId=" + stuId + ", stuName='" + stuName + ''' + ", subject='" + subject + ''' + ", score=" + score + '}'; } public long getStuId() { return stuId; } public void setStuId(long stuId) { this.stuId = stuId; } public String getStuName() { return stuName; } public void setStuName(String stuName) { this.stuName = stuName; } public String getSubject() { return subject; } public void setSubject(String subject) { this.subject = subject; } public int getScore() { return score; } public void setScore(int score) { this.score = score; } public Student(long stuId, String stuName, String subject, int score) { this.stuId = stuId; this.stuName = stuName; this.subject = subject; this.score = score; } public Student() { } @Override public int compareTo(Student o) { return 0; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(stuId); dataOutput.writeUTF(stuName); dataOutput.writeUTF(subject); dataOutput.writeInt(score); } @Override public void readFields(DataInput dataInput) throws IOException { this.stuId = dataInput.readLong(); this.stuName = dataInput.readUTF(); this.subject = dataInput.readUTF(); this.score = dataInput.readInt(); } }
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class StudentScoreMapper extends MapperStudentScoreSortComparator.java{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println(key.get()+" "+value); String[] split = value.toString().split(","); Student student = new Student(Integer.parseInt(split[0]),split[1],split[2], Integer.parseInt(split[3])); context.write(student, NullWritable.get()); } }
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class StudentScoreSortComparator extends WritableComparator { public StudentScoreSortComparator(){ super(Student.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { Student a1=(Student)a; Student b1=(Student)b; if(a1.getStuId()!=b1.getStuId()){ return (int) (a1.getStuId()-b1.getStuId()); }else { return b1.getScore()-a1.getScore(); } } }StudentScoreGroupingComparator.java
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class StudentScoreGroupingComparator extends WritableComparator { public StudentScoreGroupingComparator(){ super(Student.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { Student a1=(Student)a; Student b1=(Student)b; return (int) (a1.getStuId()-b1.getStuId()); } }StudentScoreReducer.java
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class StudentScoreReducer extends ReducerStudentScoreDriver.java{ @Override protected void reduce(Student key, Iterable values, Context context) throws IOException, InterruptedException { System.out.println(key.toString()); context.write(NullWritable.get(),key); } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class StudentScoreDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(StudentScoreDriver.class); job.setMapperClass(StudentScoreMapper.class); job.setMapOutputKeyClass(Student.class); job.setMapOutputValueClass(NullWritable.class); job.setSortComparatorClass(StudentScoreSortComparator.class); job.setGroupingComparatorClass(StudentScoreGroupingComparator.class); job.setReducerClass(StudentScoreReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Student.class); FileInputFormat.setInputPaths(job,new Path("G:\kgc\KB15\code\hadoopstu\in\demo6\stuscore.csv")); Path path = new Path("G:\kgc\KB15\code\hadoopstu\out6"); FileSystem fs = FileSystem.get(path.toUri(), configuration); if(fs.exists(path)){ fs.delete(path,true); } FileOutputFormat.setOutputPath(job,path); job.waitForCompletion(true); System.out.println("programme run over"); } }运行结果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)