- 第1关:数据清洗
- 任务描述
- 编程要求
- 测试说明
- 代码实现
- 命令行
- 代码文件
- step1/com/Weather.java
- step1/com/WeatherMap.java
- step1/com/WeatherReduce.java
- step1/com/Auto.java
- step1/com/WeatherTest.java
本关任务:对数据按照一定规则进行清洗。
编程要求根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。
数据说明如下:a.txt;
数据切分方式:一个或多个空格;
数据所在位置:/user/test/input/a.txt;
2005 01 01 16 -6 -28 10157 260 31 8 0 -9999
2005 01 01 16 -6 -28 10157 260 31 8 0 -9999
年 月 日 小时 温度 湿度 气压 风向 风速 天气情况 1h降雨量 6h降雨量
sky.txt;
数据切分方式:逗号;
数据所在位置:data/sky.txt或者/user/test/input/sky.txt。
1,积云
1 积云
天气情况 cumulus
清洗规则:
将分隔符转化为逗号;
清除不合法数据:字段长度不足,风向不在[0,360]的,风速为负的,气压为负的,天气情况不在[0,10],湿度不在[0,100],温度不在[-40,50]的数据;
将a.txt与sky.txt的数据以天气情况进行join *** 作,把天气情况变为其对应的云属;
对进入同一个分区的数据排序; 排序规则: (1)同年同月同天为key; (2)按每日温度升序; (3)若温度相同则按风速升序; (4)风速相同则按压强降序。
设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /user/test/input/a.txt (HDFS); 清洗后的数据存放于:/user/test/output (HDFS)。
数据清洗后如下:
2005,01,01,16,-6,-28,10157,260,31,卷云,0,-9999
测试说明平台会对你编写的代码进行测试:
评测之前先在命令行启动hadoop:start-all.sh;
Weather:封装对象;
WeatherMap:map端 *** 作;
WeatherReduce:reduce端 *** 作;
Auto:自定义分区;
WeatherTest:测试结果类。
具体本关的预期输出请查看右侧测试集。
因为大数据实训消耗资源较大,且map/reduce运行比较耗时,所以评测时间较长,大概在60秒左右,请耐心等待。
开始你的任务吧,祝你成功!
代码实现 命令行start-all.sh代码文件 step1/com/Weather.java
package com; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Weather implements WritableComparablestep1/com/WeatherMap.java{ //年 private String year; //月 private String month; //日 private String day; //小时 private String hour; //温度 private int temperature; //湿度 private String dew; //气压/压强 private int pressure; //风向 private String wind_direction; //风速 private int wind_speed; //天气情况 private String sky_condition; //1小时降雨量 private String rain_1h; //6小时降雨量 private String rain_6h; public String getYear() { return year; } public void setYear(String year) { this.year = year; } public String getMonth() { return month; } public void setMonth(String month) { this.month = month; } public String getDay() { return day; } public void setDay(String day) { this.day = day; } public String getHour() { return hour; } public void setHour(String hour) { this.hour = hour; } public int getTemperature() { return temperature; } public void setTemperature(int temperature) { this.temperature = temperature; } public String getDew() { return dew; } public void setDew(String dew) { this.dew = dew; } public int getPressure() { return pressure; } public void setPressure(int pressure) { this.pressure = pressure; } public String getWind_direction() { return wind_direction; } public void setWind_direction(String wind_direction) { this.wind_direction = wind_direction; } public int getWind_speed() { return wind_speed; } public void setWind_speed(int wind_speed) { this.wind_speed = wind_speed; } public String getSky_condition() { return sky_condition; } public void setSky_condition(String sky_condition) { this.sky_condition = sky_condition; } public String getRain_1h() { return rain_1h; } public void setRain_1h(String rain_1h) { this.rain_1h = rain_1h; } public String getRain_6h() { return rain_6h; } public void setRain_6h(String rain_6h) { this.rain_6h = rain_6h; } @Override public String toString() { return year + "," + month + "," + day + "," + hour + "," + temperature + "," + dew + "," + pressure + "," + wind_direction + "," + wind_speed + "," + sky_condition + "," + rain_1h + "," + rain_6h; } public Weather() { } public Weather(String year, String month, String day, String hour, int temperature, String dew, int pressure, String wind_direction, int wind_speed, String sky_condition, String rain_1h, String rain_6h) { this.year = year; this.month = month; this.day = day; this.hour = hour; this.temperature = temperature; this.dew = dew; this.pressure = pressure; this.wind_direction = wind_direction; this.wind_speed = wind_speed; this.sky_condition = sky_condition; this.rain_1h = rain_1h; this.rain_6h = rain_6h; } public void readFields(DataInput in) throws IOException { year = in.readUTF(); month = in.readUTF(); day = in.readUTF(); hour = in.readUTF(); temperature = in.readInt(); dew = in.readUTF(); pressure = in.readInt(); wind_direction = in.readUTF(); wind_speed = in.readInt(); sky_condition = in.readUTF(); rain_1h = in.readUTF(); rain_6h = in.readUTF(); } public void write(DataOutput out) throws IOException { out.writeUTF(year); out.writeUTF(month); out.writeUTF(day); out.writeUTF(hour); out.writeInt(temperature); out.writeUTF(dew); out.writeInt(pressure); out.writeUTF(wind_direction); out.writeInt(wind_speed); out.writeUTF(sky_condition); out.writeUTF(rain_1h); out.writeUTF(rain_6h); } public int compareTo(Weather o) { int tmp = this.month.compareTo(o.month); if (tmp == 0) { tmp = this.day.compareTo(o.day); if (tmp == 0) { tmp = this.temperature - o.temperature; if (tmp == 0) { tmp = this.wind_speed - o.wind_speed; if (tmp == 0) { tmp = o.pressure - this.pressure; return tmp; } return tmp; } return tmp; } return tmp; } return tmp; } }
在这里插入代码片package com; import java.io.*; import java.util.HashMap; import java.util.Map.Entry; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import javax.sound.midi.Soundbank; public class WeatherMap extends Mapperstep1/com/WeatherReduce.java{ Text text = new Text(); HashMap map = new HashMap (); @Override protected void setup(Context context) throws IOException, InterruptedException { File f=new File("data/sky.txt"); InputStream inputStream = new FileInputStream(f); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); String line = null; while ((line = bufferedReader.readLine()) != null) { System.out.println(line); String[] split = line.split(","); map.put(split[0], split[1]); } bufferedReader.close(); inputStream.close(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String split[] = line.split("\s+"); String year = split[0]; String month = split[1]; String day = split[2]; String hour = split[3]; int temperature = Integer.valueOf(split[4]); String dew = split[5]; int pressure = Integer.valueOf(split[6]); String wind_direction = split[7]; int wind_speed = Integer.valueOf(split[8]); String sky_condition = split[9]; String rain_1h = split[10]; String rain_6h = split[11]; if (split.length != 12 || pressure < 0 || Integer.valueOf(wind_direction) < 0 || Integer.valueOf(wind_direction) > 360 || Integer.valueOf(sky_condition) < 0 || Integer.valueOf(sky_condition) > 10 || temperature< -40 || temperature>50 || Integer.valueOf(dew)< 0 || Integer.valueOf(dew)>100 || wind_speed<0 ) { return; } for (Entry entry : map.entrySet()) { if (sky_condition.equals(entry.getKey())) { sky_condition = entry.getValue(); } } Weather weather = new Weather(year, month, day, hour, temperature, dew, pressure, wind_direction, wind_speed, sky_condition, rain_1h, rain_6h); context.write(weather, NullWritable.get()); } }
package com; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class WeatherReduce extends Reducerstep1/com/Auto.java{ @Override protected void reduce(Weather key, Iterable values, Context context) throws IOException, InterruptedException { for (NullWritable nullWritable : values) { context.write(key, NullWritable.get()); } } }
package com; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class Auto extends Partitionerstep1/com/WeatherTest.java{ public static Map provinceDict = new HashMap (); static { int a = 0; for (int i = 1980; i <= 1981; i++) { provinceDict.put(i + "", a); a++; } } public int getPartition(Weather key, NullWritable nullWritable, int numPartitions) { Integer id = provinceDict.get(key.toString().substring(0, 4)); return id == null ? 2 : id; } }
package com; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WeatherTest { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(WeatherTest.class); job.setMapperClass(WeatherMap.class); job.setMapOutputKeyClass(Weather.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(WeatherReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Weather.class); job.setNumReduceTasks(3); job.setPartitionerClass(Auto.class); Path inPath = new Path("/user/test/input/a.txt"); Path out = new Path("/user/test/output"); FileInputFormat.setInputPaths(job, inPath); FileOutputFormat.setOutputPath(job, out); job.waitForCompletion(true); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)