MapReduce是一个分布式计算框架。
3.1.2 MapReduce底层原理MapReduce 采用的是移动计算的方式,根据split产生Map task,然后通过shuffle,将map任务的输出拷贝到不同的reduce节点。由reduce阶段进行全局汇总。
原理图
3.1.3 MapReduce执行过程 3.2 WordCountblock 是HDFS文件物理上的分割,split 是HDFS文件逻辑上的分割,严格意义上来说是一个split 产生一个map 任务。
代码详见github https://github.com/Adopat/hadoop_demo
3.3 MapReduce 高级 3.31 MapReduce日志查看
日志查看前置步骤
启动historyserver进程,开启日志聚合功能。默认情况下任务的日志是散落在nodemanager节点上的想要查看需要找到对应的nodemanager节点上去查看,这样就很不方便,通过日志聚合功能我们可以把之前本来散落在nodemanager节点上的日志统一收集到hdfs上的指定目录中,这样就可以在yarn的web界面中直接查看了。
# 修改 yarn-site.xml配置 ,增加以下参数配置 # 1.停止集群 [root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh Stopping namenodes on [bigdata01] Last login: Mon Jan 17 15:11:28 CST 2022 on pts/0 Stopping datanodes Last login: Mon Jan 17 16:08:35 CST 2022 on pts/0 Stopping secondary namenodes [bigdata01] Last login: Mon Jan 17 16:08:36 CST 2022 on pts/0 Stopping nodemanagers Last login: Mon Jan 17 16:08:38 CST 2022 on pts/0 Stopping resourcemanager Last login: Mon Jan 17 16:08:41 CST 2022 on pts/0 [root@bigdata01 hadoop-3.2.0]# jps 4136 Jps 2607 JobHistoryServer You have new mail in /var/spool/mail/root [root@bigdata01 hadoop-3.2.0]# pwd /data/soft/hadoop-3.2.0 # 修改 yarn-site.xml 在配置后面增加 yarn.log-aggregation-enable yarn.log.server.url [root@bigdata01 hadoop-3.2.0]# cat etc/hadoop/yarn-site.xml# 3.重新启动集群 [root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh Starting namenodes on [bigdata01] Last login: Mon Jan 17 16:08:43 CST 2022 on pts/0 Starting datanodes Last login: Mon Jan 17 16:15:21 CST 2022 on pts/0 Starting secondary namenodes [bigdata01] Last login: Mon Jan 17 16:15:23 CST 2022 on pts/0 Starting resourcemanager Last login: Mon Jan 17 16:15:27 CST 2022 on pts/0 Starting nodemanagers Last login: Mon Jan 17 16:15:32 CST 2022 on pts/0 You have new mail in /var/spool/mail/root [root@bigdata01 hadoop-3.2.0]# jps 4613 SecondaryNameNode 4854 ResourceManager 5174 Jps 4346 NameNode 2607 JobHistoryServer # 启动historyserver服务命令 bin/mapred --daemon start historyserver [root@bigdata01 hadoop-3.2.0]# bin/mapred --daemon start historyserver [root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls / Found 9 items -rw-r--r-- 2 root supergroup 1361 2022-01-05 16:05 /README.txt -rw-r--r-- 2 root supergroup 1361 2022-01-07 15:44 /README.txt.bak drwxr-xr-x - root supergroup 0 2022-01-05 16:08 /abc drwxr-xr-x - root supergroup 0 2022-01-10 15:44 /log drwxr-xr-x - root supergroup 0 2022-01-17 15:15 /out drwxr-xr-x - root supergroup 0 2022-01-15 15:32 /test drwx------ - root supergroup 0 2022-01-17 14:38 /tmp drwx------ - root supergroup 0 2022-01-10 09:16 /user -rw-r--r-- 3 ZhongRF supergroup 305555522 2022-01-06 15:46 /user.txt You have new mail in /var/spool/mail/root [root@bigdata01 hadoop-3.2.0]# history | grep jar 854 hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out 879 history | grep hadoop jar 893 history | grep hadoop jar 899 history | grep hadoop jar 913 history | grep hadoop jar 919 history | grep hadoop jar 921 hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out 937 hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out1 945 hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out1 950 hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out2 975 history | grep jar 979 hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out 989 history | grep jar # 重新提交 mapreduce 任务 [root@bigdata01 hadoop-3.2.0]# hadoop jar hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out1 yarn.nodemanager.aux-services mapreduce_shuffle yarn.nodemanager.env-whitelist JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME yarn.resourcemanager.hostname bigdata01 yarn.log-aggregation-enable true yarn.log.server.url http://bigdata01:19888/jobhistory/logs/
注意要将修改后的yarn-site.xml 同步到集群中的其他节点,此外还要在每一个节点中开启 historyserver服务
页面查看
进入yarn的8088界面
命令行方式查看
[root@bigdata01 hadoop-3.2.0]# yarn logs -applicationId application_1642407336043_0002 | grep k3,v3 2022-01-17 16:51:54,429 INFO client.RMProxy: Connecting to ResourceManager at bigdata01/192.168.35.100:80323.32 停止Hadoop集群中的任务= = =
yarn application -kill application_1642407336043_0002
3.33 MapReuduce扩展如果需要对执行时间很长的mapreduce任务进行修改,可以使用命令中止任务。
MapReduce任务分为Map 和Reduce ,Reduce任务并不是必需的。
//禁用reduce阶段 job.setNumReduceTasks(0)
具体代码见 https://github.com/Adopat/hadoop_demo
# 只有map 阶段的MapReduce任务输出 k2,v2 [root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out1/part-m-00000 hello 1 you 1 hello 1 me 1 [root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /out Found 2 items -rw-r--r-- 2 root supergroup 0 2022-01-18 09:24 /out/_SUCCESS -rw-r--r-- 2 root supergroup 19 2022-01-18 09:24 /out/part-r-00000 You have new mail in /var/spool/mail/root # Map + Reduce 阶段的MapReduce任务输出 k3,v3 [root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out/part-r-00000 hello 2 me 1 you 1 [root@bigdata01 hadoop-3.2.0]#
3.4 Shuffle执行过程及源码分析 3.4.1 Shuffle执行过程注意只有map 阶段的MapReduce的输出文件是带有m,如part-m-00000,含有map,reduce两个阶段的MapReduce的输出文件是带有r。如part-r-00000
shuffer是一个网络拷贝的过程,是指通过网络把数据从map 端拷贝到reduce端的过程。shuffle其实是横跨map端和reduce端的,它主要是负责把map端产生的数据通过网络拷贝到reduce阶段进行统一聚合计算。
3.4.2 Hadoop中的序列化和反序列化因为MapReduce是一种移动计算的方式,所以影响MapReduce的主要执行效率是磁盘IO。磁盘IO涉及到序列化和反序列化。所以为了优化MapReduce执行效率可以从序列化和反序列化入手,采用Hadoop序列化。
序列化
序列化是指把内存中的对象信息转换成二进制的形式,方便存储到文件中或者方便传输。
反序列化
反序列化指的是把文件中的信息加载到内存,方便计算。
JAVA 序列化
package com.imooc.mr; import java.io.*; public class JavaSerialize { public static void main(String[] args) throws Exception { //创建对象 StudentJava studentJava = new StudentJava(1L,"justin"); // 序列化 FileOutputStream fos = new FileOutputStream("E:\StudentJava.txt"); ObjectOutputStream oos = new ObjectOutputStream(fos); oos.writeObject(studentJava); oos.close(); fos.close(); //反序列化,从文件中加载到内存 FileInputStream fis = new FileInputStream("E:\StudentJava.txt"); ObjectInputStream ois = new ObjectInputStream(fis); StudentJava studentJava1 = (StudentJava) ois.readObject(); System.out.println("反序列化后的对象是:"+studentJava1); ois.close(); fis.close(); } } class StudentJava implements Serializable{ // 设置版本号 private static final long serialVersionUID = 1L; private Long id ; private String name; public StudentJava(Long id, String name) { this.id = id; this.name = name; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "StudentJava{" + "id=" + id + ", name='" + name + ''' + '}'; } }
Hadoop 序列化
package com.imooc.mr; import org.apache.hadoop.io.Writable; import java.io.*; public class HadoopSerialize { public static void main(String[] args){ // 执行序列化方法 //serialization(); // 执行反序列化方法 deserialization(); } public static void serialization(){ try (FileOutputStream fos = new FileOutputStream("E:\HadoopStudent.txt"); ObjectOutputStream oos = new ObjectOutputStream(fos) ){ // 创建对象 StudentWritable studentWritable = new StudentWritable(1L,"justin"); studentWritable.write(oos); }catch (Exception e){ e.printStackTrace(); } } public static void deserialization(){ try (FileInputStream fis = new FileInputStream("E:\HadoopStudent.txt"); ObjectInputStream ois = new ObjectInputStream(fis)){ StudentWritable studentWritable1 = new StudentWritable(); studentWritable1.readFields(ois); System.out.println(studentWritable1); }catch (Exception e){ e.printStackTrace(); } } } class StudentWritable implements Writable{ private Long id; private String name; public StudentWritable() { } public StudentWritable(Long id, String name) { this.id = id; this.name = name; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.id); out.writeUTF(this.name); } @Override public void readFields(DataInput in) throws IOException { this.id = in.readLong(); this.name = in.readUTF(); } @Override public String toString() { return "StudentWritable{" + "id=" + id + ", name='" + name + ''' + '}'; } }
Hadoop 序列化的优势
Java中的序列化会占用较大的存储空间,而Hadoop中的序列化可以节省很多存储空间,这样在海量数据计算的场景下,可以减少数据传输的大小,极大的提高计算效率。
3.4.3 InputFormat分析
类继承关系
InputFormat
InputFormat描述Map-Reduce作业的输入规范。Map-Reduce框架依赖于作业的InputFormat来:将输入文件分割成逻辑的inputsplit,然后将每个inputsplit分配给单个Mapper。提供RecordReader实现,用于从逻辑InputSplit收集输入记录,以供Mapper处理。
public abstract ListgetSplits(JobContext context ) throws IOException, InterruptedException; public abstract RecordReader createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException;
FileInputFormat( *** 作文件类型数据)
CombineFileInputFormat
处理小文件,用于解决小文件存储
TextInputFormat
是默认的处理类,处理普通文本文件,他会把文件中每一行作为一个记录,将每一行的起始偏移量作为key,每一行的内容作为value,这里的key和value就是我们之前所说的k1,v1
它默认以换行符或回车键作为一行记录
NLineInputFormat
可以动态指定一次读取多少行数据
DBInputFormat( *** 作数据库)
DelegatingInputFormat(处理多个输入)
3.4.4 InputFormat中注意事项
block 数量和InputSplit数量不相等
前面讲过一个Block块大小等于一个InputSplit大小,这种说法是不严谨的,也就是说block数量和InputSplit数量是不对等。(文件剩余大小/128)>1.1 继续切割,(文件剩余大小/128)<1.1部分割。比如一个140M和141M的文件都是两个Block,但是140M文件只会产生一个split,141M文件会产生两个split,对应两个map任务。允许10%溢出。
private static final double SPLIT_SLOP = 1.1; // 10% slop
FileInputFormat.java 中 getSplits()有定义
一行数据被拆分到两个split会不会有问题
不会。如果这个InputSplit不是第一个InputSplit,我们将会丢掉读取出来的第一行 因为我们总是通过next()方法多读取一行(会多读取下一个InputSplit的第一行)
// If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; }
3.4.5 OutputFormat分析源码LineRecorderReader.java
类继承关系
OutputFormat
OutputFormat描述Map-Reduce作业的输出规范。Map-Reduce框架依赖于作业的OutputFormat:验证作业的输出规范。例如,检查输出目录是否已经存在。提供RecordWriter实现,用于写出作业的输出文件。输出文件存储在文件系统中。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)