目录
1.前情回顾
2.需求
3.编码
3.1 新建 Maven 项目 YarnDemo
3.2 新建 com.xxxx.yarn包名
3.3 创建类 WordCount 并实现 Tool 接口
3.3 新建WordCountDriver类
1.前情回顾
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop jar wc.jar com.atguigu.mapreduce.wordcount2.WordCountDriver /input /output1
期望可以动态传参,结果报错,误认为是第一个输入参数。
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop jar wc.jar com.atguigu.mapreduce.wordcount2.WordCountDriver -Dmapreduce.job.queuename=root.test /input /output12.需求
自己写的程序也可以动态修改参数。编写 Yarn 的 Tool 接口
3.编码 3.1 新建 Maven 项目 YarnDemo3.2 新建 com.xxxx.yarn包名 3.3 创建类 WordCount 并实现 Tool 接口4.0.0 com.atguigu.hadoop yarn_tool_test1.0-SNAPSHOT org.apache.hadoop hadoop-client3.1.3
package com.yangmin.yarn; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; ;import java.io.IOException; public class WordCount implements Tool { private Configuration conf; public int run(String[] args) throws Exception { Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public void setConf(Configuration configuration) { this.conf = configuration; } public Configuration getConf() { return conf; } //map函数 public static class WordCountMapper extends Mapper3.3 新建WordCountDriver类{ private Text outK = new Text(); private IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words) { outK.set(word); context.write(outK, outV); } } } public static class WordCountReducer extends Reducer { private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } outV.set(sum); context.write(key, outV); } } }
package com.yangmin.yarn; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.util.Arrays; public class WordCountDriver { private static Tool tool; public static void main(String[] args) throws Exception { //创建配置 Configuration conf = new Configuration(); switch (args[0]){ case "wordcount": tool = new WordCount(); break; default: throw new RuntimeException("no such tool" + args[0]); } //执行炒程序 int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length)); System.exit(run); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)