package MaxTemperature_05; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MyMap extends MapperReduce类{ private static final int MISSING = 9999; @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { //读取一条数据 String line = value.toString(); //获取年份 String year = line.substring(15,19); int airTem; if(line.charAt(45) == '+'){ //判断温度正负 airTem = Integer.parseInt(line.substring(46,50)); }else{ airTem = Integer.parseInt(line.substring(45,50)); } String quality = line.substring(50,51); System.out.println("quality:"+quality); //判断温度是否异常 if(airTem != MISSING && quality.matches("[01459]]")){ context.write(new Text(year),new IntWritable(airTem)); } } }
package MaxTemperature_05; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MyReduce extends ReducerJob类{ protected void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException { //把Integer.MIN_VALUE作为maxValue的初始值 int maxValue = Integer.MIN_VALUE; int minValue = Integer.MAX_VALUE; //循环取出最大值,最小值 for (IntWritable value:values) { maxValue = Math.max(maxValue,value.get()); //another type //if(value.get() > maxValue){ // maxValue = value.get(); //} minValue = Math.min(minValue,value.get()); //another type //if(value.get() < minValue){ // minValue= value.get(); //} } //最高温度 context.write(key,new IntWritable(maxValue)); //最低温度 //context.write(key,new IntWritable(minValue)); } }
package MaxTemperature_05; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.io.Text; import java.io.IOException; public class TestJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); //1、获取作业对象 Job job = Job.getInstance(conf); //2、设置主类 job.setJarByClass(TestJob.class); //3、设置job参数 job.setMapperClass(MyMap.class); job.setReducerClass(MyReduce.class); //4 set map reduce output type job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //5、设置job输入输出 FileInputFormat.setInputPaths(job,new Path("file:///simple/source.txt")); FileOutputFormat.setOutputPath(job,new Path("file:///simple/output")); //6 commit job System.out.println(job.waitForCompletion(true) ? 0 : 1);; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)