package com.shujia.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class MR01 { public static class WordMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line =value.toString();//相对每行数据处理,Text转化为String int v=1; context.write(new Text(line),new LongWritable(v)); } } public static class WordReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int count=0; for (LongWritable value : values) { count+=value.get();//.get()是将LongWritable转为int类型 } context.write(key,new LongWritable(count)); } } public static void main(String[] args) throws Exception{ //配置mapreduce Job job = Job.getInstance(); job.setJobName("第一个mr程序 单词统计"); job.setJarByClass(MR01.class); //map段所在类的位置 job.setMapperClass(WordMapper.class); //reduce段所在类的位置 job.setReducerClass(WordReducer.class); //指定map段kv的输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //指定reduce段kv的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //指定路径 Path input = new Path("/words.txt"); Path out = new Path("/output"); //输出路径不能已存在,手动加上 已存在删除 FileSystem fs = FileSystem.get(new Configuration()); if(fs.exists(out)){ fs.delete(out,true); } FileInputFormat.addInputPath(job,input); FileOutputFormat.setOutputPath(job,out); //启动 job.waitForCompletion(true); System.out.println("正在运行mr"); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)