MapReduce笔记 —— 手动设置多个ReduceTask以及设置Combiner(两种方式运行MapReduce)

MapReduce笔记 —— 手动设置多个ReduceTask以及设置Combiner(两种方式运行MapReduce),第1张

MapReduce笔记 —— 手动设置多个ReduceTask以及设置Combiner(两种方式运行MapReduce)

目录
      • 手动设置多个ReduceTask
          • 在idea中启动MapReduce
      • 手动设置Combiner
          • 通过jar包在linux终端执行

手动设置多个ReduceTask

先来看只有一个ReduceTask时的词频统计的结果
当没有手动设置ReduceTask的数量时,默认只有一个reduceTask

数据为

package Demo.mr.WordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper {
    @Override
    protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
        String datas = value.toString();
        String[] split = datas.split(",");
        for (String data : split){
            context.write(new Text(data),new IntWritable(1));
        }
    }
}

package Demo.mr.WordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer {
    @Override
    protected void reduce(Text key,Iterable values,Context context) throws IOException,InterruptedException{
        int sum=0;
        for(IntWritable val:values){
            sum=sum + val.get();
        }
        context.write(key,new IntWritable(sum));
    }
}

package Demo.mr.WordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //向yarn申请一个job任务用于执行mapreduce程序
        Job job = Job.getInstance(new Configuration());
        //设置入口类
        job.setJarByClass(WordCountDriver.class);
        //设置mapper类
        job.setMapperClass(WordCountMapper.class);
        //设置reduce类
        job.setReducerClass(WordCountReducer.class);
        //设置Mapper类的输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reduce类的输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置要处理的文件
        FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/data/words.txt"));
        //设置输出路径
        FileOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/output"));
        //启动
        job.waitForCompletion(true);
    }
}
在idea中启动MapReduce

这里是因为给出路径时,“hdfs://master:9000/data/words.txt” 直接连接到了hdfs中的文件路径,所以可以在idea中直接运行

结果为

part-r-00000的内容为
hadoop 4
hive 2
java 3
python 2
word 2

然后手动设置ReduceTask的数量为2
在WordCountDriver类,也就是主方法中设置,只需要一条语句 job.setNumReduceTasks(2);

public class WordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(WordCountDriver.class);

        //设置reduceTask数量
        job.setNumReduceTasks(2);

        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/data/words.txt"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/output"));
        job.waitForCompletion(true);
    }
}

Map端和Reduce端的代码不需要改变

看一下结果

这里就会发现,每个ReduceTask都会产生一个自己的结果文件,这里的两个ReduceTask分别产生了part-r-00000以及part-r-00001文件
分别打开这两个文件

part-r-00000的内容为
hadoop 4

part-r-00000的内容为
hive 2
java 3
python 2
word 2

这里的part-r-00000以及part-r-00001两个文件的内容合在一起才是上面单个ReduceTask任务结果文件的内容。
这里是因为不同key的键值对的partition值不一样,因此会被传入不同的reduceTask中

简单的测试一下partiton

package Demo.mr;

public class Test {
    public static void main(String[] args) {
        int h = "hadoop".hashCode();
        System.out.println(h%2);

        int h1 = "hive".hashCode();
        int h2 ="java".hashCode();
        int h3 ="python".hashCode();
        int h4 = "word".hashCode();
        System.out.println(h1%2);
        System.out.println(h2%2);
        System.out.println(h3%2);
        System.out.println(h4%2);
    }
}


会发现hadoop的值,与剩下四个hive,java,python,word都不相同
所以key为hadoop的键值对单独进入一个reduceTask里面,然后计算后被输出在当前reduceTask对应的结果文件part-r-00000里面

key为hive,java,python,word的这些键值对会被送往另一个接收partition值为0的reduceTask中,然后被输出在文件part-r-00001里面

手动设置Combiner

Combiner类如果不自己定义的话,默认的shuffle过程中是不会combine的

先来看看没有combine的执行情况

package Demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Gender {
    public static class GenderMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            if("男".equals(split[3])){
                context.write(new Text(split[3]),new IntWritable(1));
            }
        }
    }

    public static class GenderReducer extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception{
        Job job = Job.getInstance();
        job.setNumReduceTasks(2);
        job.setJobName("class age sum");
        job.setJarByClass(ClazzAgeSum.class);

        //map端
        job.setMapperClass(GenderMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //reduce端
        job.setReducerClass(GenderReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定输入输出路径
        Path input = new Path("/data/students.txt");
        FileInputFormat.addInputPath(job,input);
        Path output = new Path("/output");

        FileSystem fs = FileSystem.get(new Configuration());

        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);

        //启动job
        job.waitForCompletion(true);

    }
}

通过jar包在linux终端执行

这里没有给出确切的hdfs的文件位置,所以不能像上面设置多个reduceTask的代码一样直接在idea里面运行,需要打成jar包然后传到linux里面用命令运行

hadoop jar /usr/local/jar/hdfs-1.0-SNAPSHOT.jar Demo.mr.Gender
格式为 hadoop   jar   jar的位置   jar包里面具体执行的类名

运行过程的信息如图所示

再来看看写了combine的情况

需要写具体的Combiner类,还要在主方法里面加上一句job.setCombinerClass(CombineReducer.class);

package Demo.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Gender {
	//Map端
    public static class GenderMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            if("男".equals(split[3])){
                context.write(new Text(split[3]),new IntWritable(1));
            }
        }
    }

	//Combiner预聚合
    public static class CombineReducer extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }

	//Reduce端
    public static class GenderReducer extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception{
        Job job = Job.getInstance();
        job.setNumReduceTasks(2);
        job.setJobName("class age sum");
        job.setJarByClass(ClazzAgeSum.class);
        job.setMapperClass(GenderMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //combine端 预聚合
        job.setCombinerClass(CombineReducer.class);

        job.setReducerClass(GenderReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        Path input = new Path("/data/students.txt");
        FileInputFormat.addInputPath(job,input);
        Path output = new Path("/output");
        FileSystem fs = FileSystem.get(new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);
        job.waitForCompletion(true);

    }
}

写好后重新打包,打包的时候继续双击package即可,会自动覆盖原先的旧的jar包,然后再重新上传

之前没有设置Combiner时,红线标出来的地方 Combine的input和output后面的值都为0,说明没有combine过程。这里设置Combiner后,就有值了,说明经过了combine过程

再来看一下具体的数据,Combine input records读取的数据量为507,而上面几行的Map output records的值同样为507。Combine就相当于一个发生在reduce之前的reduce端,接收一个MapTask输出的值进行combine过程后,等待map的shuffle阶段结束,将不同map的combine输出结果传送到对应的reduceTask那里

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5573137.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存