- 手动设置多个ReduceTask
- 在idea中启动MapReduce
- 手动设置Combiner
- 通过jar包在linux终端执行
先来看只有一个ReduceTask时的词频统计的结果
当没有手动设置ReduceTask的数量时,默认只有一个reduceTask
数据为
package Demo.mr.WordCount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ @Override protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String datas = value.toString(); String[] split = datas.split(","); for (String data : split){ context.write(new Text(data),new IntWritable(1)); } } }
package Demo.mr.WordCount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer{ @Override protected void reduce(Text key,Iterable values,Context context) throws IOException,InterruptedException{ int sum=0; for(IntWritable val:values){ sum=sum + val.get(); } context.write(key,new IntWritable(sum)); } }
package Demo.mr.WordCount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //向yarn申请一个job任务用于执行mapreduce程序 Job job = Job.getInstance(new Configuration()); //设置入口类 job.setJarByClass(WordCountDriver.class); //设置mapper类 job.setMapperClass(WordCountMapper.class); //设置reduce类 job.setReducerClass(WordCountReducer.class); //设置Mapper类的输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置reduce类的输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置要处理的文件 FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/data/words.txt")); //设置输出路径 FileOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/output")); //启动 job.waitForCompletion(true); } }在idea中启动MapReduce
这里是因为给出路径时,“hdfs://master:9000/data/words.txt” 直接连接到了hdfs中的文件路径,所以可以在idea中直接运行
结果为
part-r-00000的内容为
hadoop 4
hive 2
java 3
python 2
word 2
然后手动设置ReduceTask的数量为2
在WordCountDriver类,也就是主方法中设置,只需要一条语句 job.setNumReduceTasks(2);
public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(WordCountDriver.class); //设置reduceTask数量 job.setNumReduceTasks(2); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/data/words.txt")); FileOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/output")); job.waitForCompletion(true); } }
Map端和Reduce端的代码不需要改变
看一下结果
这里就会发现,每个ReduceTask都会产生一个自己的结果文件,这里的两个ReduceTask分别产生了part-r-00000以及part-r-00001文件
分别打开这两个文件
part-r-00000的内容为
hadoop 4
part-r-00000的内容为
hive 2
java 3
python 2
word 2
这里的part-r-00000以及part-r-00001两个文件的内容合在一起才是上面单个ReduceTask任务结果文件的内容。
这里是因为不同key的键值对的partition值不一样,因此会被传入不同的reduceTask中
简单的测试一下partiton
package Demo.mr; public class Test { public static void main(String[] args) { int h = "hadoop".hashCode(); System.out.println(h%2); int h1 = "hive".hashCode(); int h2 ="java".hashCode(); int h3 ="python".hashCode(); int h4 = "word".hashCode(); System.out.println(h1%2); System.out.println(h2%2); System.out.println(h3%2); System.out.println(h4%2); } }
会发现hadoop的值,与剩下四个hive,java,python,word都不相同
所以key为hadoop的键值对单独进入一个reduceTask里面,然后计算后被输出在当前reduceTask对应的结果文件part-r-00000里面
key为hive,java,python,word的这些键值对会被送往另一个接收partition值为0的reduceTask中,然后被输出在文件part-r-00001里面
手动设置CombinerCombiner类如果不自己定义的话,默认的shuffle过程中是不会combine的
先来看看没有combine的执行情况
package Demo.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class Gender { public static class GenderMapper extends Mapper通过jar包在linux终端执行{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(","); if("男".equals(split[3])){ context.write(new Text(split[3]),new IntWritable(1)); } } } public static class GenderReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key,new IntWritable(sum)); } } public static void main(String[] args) throws Exception{ Job job = Job.getInstance(); job.setNumReduceTasks(2); job.setJobName("class age sum"); job.setJarByClass(ClazzAgeSum.class); //map端 job.setMapperClass(GenderMapper.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //reduce端 job.setReducerClass(GenderReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定输入输出路径 Path input = new Path("/data/students.txt"); FileInputFormat.addInputPath(job,input); Path output = new Path("/output"); FileSystem fs = FileSystem.get(new Configuration()); if(fs.exists(output)){ fs.delete(output,true); } FileOutputFormat.setOutputPath(job,output); //启动job job.waitForCompletion(true); } }
这里没有给出确切的hdfs的文件位置,所以不能像上面设置多个reduceTask的代码一样直接在idea里面运行,需要打成jar包然后传到linux里面用命令运行
hadoop jar /usr/local/jar/hdfs-1.0-SNAPSHOT.jar Demo.mr.Gender
格式为 hadoop jar jar的位置 jar包里面具体执行的类名
运行过程的信息如图所示
再来看看写了combine的情况
需要写具体的Combiner类,还要在主方法里面加上一句job.setCombinerClass(CombineReducer.class);
package Demo.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class Gender { //Map端 public static class GenderMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(","); if("男".equals(split[3])){ context.write(new Text(split[3]),new IntWritable(1)); } } } //Combiner预聚合 public static class CombineReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key,new IntWritable(sum)); } } //Reduce端 public static class GenderReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key,new IntWritable(sum)); } } public static void main(String[] args) throws Exception{ Job job = Job.getInstance(); job.setNumReduceTasks(2); job.setJobName("class age sum"); job.setJarByClass(ClazzAgeSum.class); job.setMapperClass(GenderMapper.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //combine端 预聚合 job.setCombinerClass(CombineReducer.class); job.setReducerClass(GenderReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path input = new Path("/data/students.txt"); FileInputFormat.addInputPath(job,input); Path output = new Path("/output"); FileSystem fs = FileSystem.get(new Configuration()); if(fs.exists(output)){ fs.delete(output,true); } FileOutputFormat.setOutputPath(job,output); job.waitForCompletion(true); } }
写好后重新打包,打包的时候继续双击package即可,会自动覆盖原先的旧的jar包,然后再重新上传
之前没有设置Combiner时,红线标出来的地方 Combine的input和output后面的值都为0,说明没有combine过程。这里设置Combiner后,就有值了,说明经过了combine过程
再来看一下具体的数据,Combine input records读取的数据量为507,而上面几行的Map output records的值同样为507。Combine就相当于一个发生在reduce之前的reduce端,接收一个MapTask输出的值进行combine过程后,等待map的shuffle阶段结束,将不同map的combine输出结果传送到对应的reduceTask那里
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)