第三章 MapReduce

第三章 MapReduce,第1张

第三章 MapReduce 3.1 MapReuduce 3.1.1 MapReduce概念

MapReduce是一个分布式计算框架。

3.1.2 MapReduce底层原理

MapReduce 采用的是移动计算的方式,根据split产生Map task,然后通过shuffle,将map任务的输出拷贝到不同的reduce节点。由reduce阶段进行全局汇总。
原理图

block 是HDFS文件物理上的分割,split 是HDFS文件逻辑上的分割,严格意义上来说是一个split 产生一个map 任务。

3.1.3 MapReduce执行过程

3.2 WordCount

代码详见github https://github.com/Adopat/hadoop_demo

3.3 MapReduce 高级 3.31 MapReduce日志查看

日志查看前置步骤

启动historyserver进程,开启日志聚合功能。默认情况下任务的日志是散落在nodemanager节点上的想要查看需要找到对应的nodemanager节点上去查看,这样就很不方便,通过日志聚合功能我们可以把之前本来散落在nodemanager节点上的日志统一收集到hdfs上的指定目录中,这样就可以在yarn的web界面中直接查看了。

# 修改 yarn-site.xml配置 ,增加以下参数配置
# 1.停止集群
[root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh
Stopping namenodes on [bigdata01]
Last login: Mon Jan 17 15:11:28 CST 2022 on pts/0
Stopping datanodes
Last login: Mon Jan 17 16:08:35 CST 2022 on pts/0
Stopping secondary namenodes [bigdata01]
Last login: Mon Jan 17 16:08:36 CST 2022 on pts/0
Stopping nodemanagers
Last login: Mon Jan 17 16:08:38 CST 2022 on pts/0
Stopping resourcemanager
Last login: Mon Jan 17 16:08:41 CST 2022 on pts/0
[root@bigdata01 hadoop-3.2.0]# jps
4136 Jps
2607 JobHistoryServer
You have new mail in /var/spool/mail/root
[root@bigdata01 hadoop-3.2.0]# pwd
/data/soft/hadoop-3.2.0
# 修改 yarn-site.xml 在配置后面增加 yarn.log-aggregation-enable yarn.log.server.url
[root@bigdata01 hadoop-3.2.0]# cat etc/hadoop/yarn-site.xml 



 
     
        yarn.nodemanager.aux-services 
        mapreduce_shuffle 
     
     
        yarn.nodemanager.env-whitelist 
        JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME
    
     
        yarn.resourcemanager.hostname
        bigdata01
    
     
        yarn.log-aggregation-enable
        true
    
    
        yarn.log.server.url
        http://bigdata01:19888/jobhistory/logs/
     

# 3.重新启动集群
[root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh
Starting namenodes on [bigdata01]
Last login: Mon Jan 17 16:08:43 CST 2022 on pts/0
Starting datanodes
Last login: Mon Jan 17 16:15:21 CST 2022 on pts/0
Starting secondary namenodes [bigdata01]
Last login: Mon Jan 17 16:15:23 CST 2022 on pts/0
Starting resourcemanager
Last login: Mon Jan 17 16:15:27 CST 2022 on pts/0
Starting nodemanagers
Last login: Mon Jan 17 16:15:32 CST 2022 on pts/0
You have new mail in /var/spool/mail/root
[root@bigdata01 hadoop-3.2.0]# jps
4613 SecondaryNameNode
4854 ResourceManager
5174 Jps
4346 NameNode
2607 JobHistoryServer
# 启动historyserver服务命令 bin/mapred --daemon start historyserver
[root@bigdata01 hadoop-3.2.0]# bin/mapred --daemon start historyserver
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /
Found 9 items
-rw-r--r--   2 root    supergroup       1361 2022-01-05 16:05 /README.txt
-rw-r--r--   2 root    supergroup       1361 2022-01-07 15:44 /README.txt.bak
drwxr-xr-x   - root    supergroup          0 2022-01-05 16:08 /abc
drwxr-xr-x   - root    supergroup          0 2022-01-10 15:44 /log
drwxr-xr-x   - root    supergroup          0 2022-01-17 15:15 /out
drwxr-xr-x   - root    supergroup          0 2022-01-15 15:32 /test
drwx------   - root    supergroup          0 2022-01-17 14:38 /tmp
drwx------   - root    supergroup          0 2022-01-10 09:16 /user
-rw-r--r--   3 ZhongRF supergroup  305555522 2022-01-06 15:46 /user.txt
You have new mail in /var/spool/mail/root
[root@bigdata01 hadoop-3.2.0]# history | grep jar
  854  hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out
  879  history | grep hadoop jar
  893  history | grep hadoop jar
  899  history | grep hadoop jar
  913  history | grep hadoop jar
  919  history | grep hadoop jar
  921  hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out
  937  hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out1
  945  hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out1
  950  hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out2
  975  history | grep jar
  979   hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out
  989  history | grep jar
  # 重新提交 mapreduce 任务
[root@bigdata01 hadoop-3.2.0]# hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out1

注意要将修改后的yarn-site.xml 同步到集群中的其他节点,此外还要在每一个节点中开启 historyserver服务

页面查看

进入yarn的8088界面

命令行方式查看

[root@bigdata01 hadoop-3.2.0]# yarn logs -applicationId application_1642407336043_0002 | grep k3,v3
2022-01-17 16:51:54,429 INFO client.RMProxy: Connecting to ResourceManager at bigdata01/192.168.35.100:8032
=
=
=

3.32 停止Hadoop集群中的任务
yarn application -kill application_1642407336043_0002

如果需要对执行时间很长的mapreduce任务进行修改,可以使用命令中止任务。

3.33 MapReuduce扩展

MapReduce任务分为Map 和Reduce ,Reduce任务并不是必需的。

//禁用reduce阶段 
job.setNumReduceTasks(0)

具体代码见 https://github.com/Adopat/hadoop_demo

# 只有map 阶段的MapReduce任务输出 k2,v2 
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out1/part-m-00000
hello   1
you     1
hello   1
me      1
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /out
Found 2 items
-rw-r--r--   2 root supergroup          0 2022-01-18 09:24 /out/_SUCCESS
-rw-r--r--   2 root supergroup         19 2022-01-18 09:24 /out/part-r-00000
You have new mail in /var/spool/mail/root
# Map + Reduce 阶段的MapReduce任务输出 k3,v3
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out/part-r-00000
hello   2
me      1
you     1
[root@bigdata01 hadoop-3.2.0]# 

注意只有map 阶段的MapReduce的输出文件是带有m,如part-m-00000,含有map,reduce两个阶段的MapReduce的输出文件是带有r。如part-r-00000

3.4 Shuffle执行过程及源码分析 3.4.1 Shuffle执行过程

shuffer是一个网络拷贝的过程,是指通过网络把数据从map 端拷贝到reduce端的过程。shuffle其实是横跨map端和reduce端的,它主要是负责把map端产生的数据通过网络拷贝到reduce阶段进行统一聚合计算。

3.4.2 Hadoop中的序列化和反序列化

因为MapReduce是一种移动计算的方式,所以影响MapReduce的主要执行效率是磁盘IO。磁盘IO涉及到序列化和反序列化。所以为了优化MapReduce执行效率可以从序列化和反序列化入手,采用Hadoop序列化。

序列化

序列化是指把内存中的对象信息转换成二进制的形式,方便存储到文件中或者方便传输。

反序列化

反序列化指的是把文件中的信息加载到内存,方便计算。

JAVA 序列化

package com.imooc.mr;

import java.io.*;


public class JavaSerialize {
    public static void main(String[] args) throws Exception {
        //创建对象
        StudentJava studentJava = new StudentJava(1L,"justin");
        // 序列化
        FileOutputStream fos = new FileOutputStream("E:\StudentJava.txt");
        ObjectOutputStream oos = new ObjectOutputStream(fos);
        oos.writeObject(studentJava);
        oos.close();
        fos.close();
        //反序列化,从文件中加载到内存
        FileInputStream fis = new FileInputStream("E:\StudentJava.txt");
        ObjectInputStream ois = new ObjectInputStream(fis);
        StudentJava studentJava1 = (StudentJava) ois.readObject();
        System.out.println("反序列化后的对象是:"+studentJava1);
        ois.close();
        fis.close();


    }
}


class StudentJava implements Serializable{
    // 设置版本号
    private static final long serialVersionUID = 1L;
    private Long id ;
    private String name;

    public StudentJava(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "StudentJava{" +
                "id=" + id +
                ", name='" + name + ''' +
                '}';
    }
}

Hadoop 序列化

package com.imooc.mr;

import org.apache.hadoop.io.Writable;

import java.io.*;


public class HadoopSerialize {
    public static void main(String[] args){
        // 执行序列化方法
        //serialization();
        // 执行反序列化方法
        deserialization();
    }
    
    public static  void serialization(){
        try (FileOutputStream fos = new FileOutputStream("E:\HadoopStudent.txt");
             ObjectOutputStream oos = new ObjectOutputStream(fos)
                ){
            // 创建对象
            StudentWritable studentWritable = new StudentWritable(1L,"justin");
            studentWritable.write(oos);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
    
    public static void deserialization(){
        try (FileInputStream fis = new FileInputStream("E:\HadoopStudent.txt");
             ObjectInputStream ois = new ObjectInputStream(fis)){
            StudentWritable studentWritable1 = new StudentWritable();
            studentWritable1.readFields(ois);
            System.out.println(studentWritable1);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}
class StudentWritable implements Writable{
    private Long id;
    private String name;

    public StudentWritable() {
    }

    public StudentWritable(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }


    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(this.id);
        out.writeUTF(this.name);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readLong();
        this.name = in.readUTF();

    }

    @Override
    public String toString() {
        return "StudentWritable{" +
                "id=" + id +
                ", name='" + name + ''' +
                '}';
    }
}

Hadoop 序列化的优势

Java中的序列化会占用较大的存储空间,而Hadoop中的序列化可以节省很多存储空间,这样在海量数据计算的场景下,可以减少数据传输的大小,极大的提高计算效率。

3.4.3 InputFormat分析

类继承关系

InputFormat

InputFormat描述Map-Reduce作业的输入规范。Map-Reduce框架依赖于作业的InputFormat来:将输入文件分割成逻辑的inputsplit,然后将每个inputsplit分配给单个Mapper。提供RecordReader实现,用于从逻辑InputSplit收集输入记录,以供Mapper处理。

public abstract 
    List getSplits(JobContext context
                               ) throws IOException, InterruptedException;

public abstract 
    RecordReader createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

FileInputFormat( *** 作文件类型数据)

CombineFileInputFormat

处理小文件,用于解决小文件存储

TextInputFormat

是默认的处理类,处理普通文本文件,他会把文件中每一行作为一个记录,将每一行的起始偏移量作为key,每一行的内容作为value,这里的key和value就是我们之前所说的k1,v1
它默认以换行符或回车键作为一行记录

NLineInputFormat

可以动态指定一次读取多少行数据

DBInputFormat( *** 作数据库)

DelegatingInputFormat(处理多个输入)

3.4.4 InputFormat中注意事项

block 数量和InputSplit数量不相等

前面讲过一个Block块大小等于一个InputSplit大小,这种说法是不严谨的,也就是说block数量和InputSplit数量是不对等。(文件剩余大小/128)>1.1 继续切割,(文件剩余大小/128)<1.1部分割。比如一个140M和141M的文件都是两个Block,但是140M文件只会产生一个split,141M文件会产生两个split,对应两个map任务。允许10%溢出。

private static final double SPLIT_SLOP = 1.1;   // 10% slop

FileInputFormat.java 中 getSplits()有定义

一行数据被拆分到两个split会不会有问题

不会。如果这个InputSplit不是第一个InputSplit,我们将会丢掉读取出来的第一行 因为我们总是通过next()方法多读取一行(会多读取下一个InputSplit的第一行)

   // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

源码LineRecorderReader.java

3.4.5 OutputFormat分析

类继承关系

OutputFormat

OutputFormat描述Map-Reduce作业的输出规范。Map-Reduce框架依赖于作业的OutputFormat:验证作业的输出规范。例如,检查输出目录是否已经存在。提供RecordWriter实现,用于写出作业的输出文件。输出文件存储在文件系统中。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5708785.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存