在idea运行一个很普通的wordcount
所需环境,反正也不知道是不是这样,就都配了
没配之前一直报错
Could not locate executabl ....hadoop-2.7.3hadoop-2.7.3binwinutils.exe in the Hadoop binaries
配置完毕之后重启电脑就OK了.
下面是pom依赖
4.0.0 org.example MapReduce1.0-SNAPSHOT 8 8 org.apache.hadoop hadoop-client2.7.3 org.projectlombok lombok1.16.18
完整代码
package com.czx.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; public class WordCount { public static class WordCountMapper extends Mapper{ Text keyOut = new Text(); LongWritable valueOut = new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("mapper input==>" + key.get() + ", " + value.toString()); // 获取一行的字符串 // one world String line = value.toString(); // one, world String[] arr = line.split(" "); for(String word : arr){ keyOut.set(word); valueOut.set(1); // 通过该方法输出数据 one,1 context.write(keyOut, valueOut); System.out.println("mapper output==>" + keyOut.toString() + ", " + valueOut.get()); } } } public static class WordCountReducer extends Reducer { LongWritable valueOut = new LongWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // one, [1,1,1,1] ---> one,4 long sum = 0L; StringBuilder sb = new StringBuilder("reducer input==>"); sb.append(key).append(", ["); for(LongWritable w : values){ sb.append(w.get()).append(","); sum += w.get(); } sb.deleteCharAt(sb.length()-1).append("]"); System.out.println(sb.toString()); valueOut.set(sum); context.write(key, valueOut); System.out.println("reducer output==>" + key + ", " + sum); } } // /tmp/mr/input /tmp/mr/output public static void main(String[] args) throws Exception { // 加载 core-default.xml 和 core-site.xml Configuration conf = new Configuration(); // 创建运行mapreduce任务的Job对象 Job job = Job.getInstance(conf, "wordcount"); // 设置运行的类(linux 运行用) job.setJarByClass(WordCount.class); // 设置mapperclass job.setMapperClass(WordCountMapper.class); // 设置reducerclass job.setReducerClass(WordCountReducer.class); // 设置reducer个数, 不设置默认是1 job.setNumReduceTasks(1); // 设置mapper输出keyclass job.setMapOutputKeyClass(Text.class); // 设置mapper输出valueclass job.setMapOutputValueClass(LongWritable.class); // 设置reducer输出keyclass job.setOutputKeyClass(Text.class); // 设置reducer输出的valueclass job.setOutputValueClass(LongWritable.class); // 设置读取的输入文件的inputformatclass,默认是文本,可以不设置 job.setInputFormatClass(TextInputFormat.class); // 设置写入文件的outputformatclass,默认是文本,可以不设置 job.setOutputFormatClass(TextOutputFormat.class); // 设置输入目录 FileInputFormat.addInputPath(job, new Path(args[0])); Path outputPath = new Path(args[1]); // 设置输出目录 FileOutputFormat.setOutputPath(job, outputPath); // 自动删除输出目录 FileSystem fs = FileSystem.get(conf); // 如果输出目录存在,就递归删除输出目录 if(fs.exists(outputPath)){ // 递归删除输出目录 fs.delete(outputPath, true); System.out.println("delete outputPath==> 【" + outputPath.toString() + "】 success!"); } // 提交job boolean status = job.waitForCompletion(false); System.exit(status ? 0 : 1); } }
运行代码
设置参数
运行结果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)