MapReduce案例——求每年最高气温

MapReduce案例——求每年最高气温,第1张

MapReduce的逻辑数据流

第一步、需要一个 map函数:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
//hadoop 本身提供了一套可优化网络序列化传输的基本类型,而不直接使用java内嵌的类型
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    //Mapper类是一个泛型类型,分别指定map函数的输入键,输入值,输出键,输出值的类型,
    private static final int MISSING = 9999;
    @Override
    public void map(LongWritable key,Text value,Context context)
        throws IOException,InterruptedException{
        String line = value.toString();
        String year = line.substring(15, 19);
        int airTemperature;
        if (line.charAt(87) == '+'){//返回指定索引处的字符
            airTemperature = Integer.parseInt(line.substring(88,92));
        }else{
            airTemperature = Integer.parseInt(line.substring(87,92));
        }
        String quality = line.substring(92,93);
        if (airTemperature != MISSING && quality.matches("[01459]")){
            context.write(new Text(year),new IntWritable(airTemperature));
        }
    }
}

第二步、需要一个reduce函数:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    //reduce函数同样也有四个形式参数类型用于指定输入和输出类型
    //reduce函数的输入类型必须匹配map函数的输出类型:即text和IntWritable
    //输出同样
    @Override
    public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
        int maxValue = Integer.MIN_VALUE;
        for(IntWritable value:values){
            maxValue = Math.max(maxValue,value.get());
        }
        context.write(key,new IntWritable(maxValue));
    }
}

第三步、用来作业的代码:

import javafx.scene.text.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;

import java.io.FileOutputStream;
import java.io.IOException;

public class MaxTemperature {
    public static  void main(String[] args) throws IOException {
        if (args.length !=2){
            System.out.println("Usage:MaxTemperature  ");
            System.exit(-1);
        }
        Configuration config = new Configuration();
        Job job = Job.getInstance (config);
//        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max Temperature");
        //路径可以是单个文件,也可以是一个目录或符合特定文件模式的一系列文件
        //可多次调用实现多路径输入
        FileInputFormat.addInputPath(job,new Path(args[0]));
        //输出前目录应该是不存在的
        FileOutputFormat.setOutputPath(job,new Path(args[1]);
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        try {
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/724279.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存