MapReduce的逻辑数据流
第一步、需要一个 map函数:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
//hadoop 本身提供了一套可优化网络序列化传输的基本类型,而不直接使用java内嵌的类型
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//Mapper类是一个泛型类型,分别指定map函数的输入键,输入值,输出键,输出值的类型,
private static final int MISSING = 9999;
@Override
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException{
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+'){//返回指定索引处的字符
airTemperature = Integer.parseInt(line.substring(88,92));
}else{
airTemperature = Integer.parseInt(line.substring(87,92));
}
String quality = line.substring(92,93);
if (airTemperature != MISSING && quality.matches("[01459]")){
context.write(new Text(year),new IntWritable(airTemperature));
}
}
}
第二步、需要一个reduce函数:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
//reduce函数同样也有四个形式参数类型用于指定输入和输出类型
//reduce函数的输入类型必须匹配map函数的输出类型:即text和IntWritable
//输出同样
@Override
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
int maxValue = Integer.MIN_VALUE;
for(IntWritable value:values){
maxValue = Math.max(maxValue,value.get());
}
context.write(key,new IntWritable(maxValue));
}
}
第三步、用来作业的代码:
import javafx.scene.text.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import java.io.FileOutputStream;
import java.io.IOException;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length !=2){
System.out.println("Usage:MaxTemperature " );
System.exit(-1);
}
Configuration config = new Configuration();
Job job = Job.getInstance (config);
// Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max Temperature");
//路径可以是单个文件,也可以是一个目录或符合特定文件模式的一系列文件
//可多次调用实现多路径输入
FileInputFormat.addInputPath(job,new Path(args[0]));
//输出前目录应该是不存在的
FileOutputFormat.setOutputPath(job,new Path(args[1]);
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
try {
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)