文章目录导语
MapReduce作为Hadoop核心编程模型,在Hadoop中,数据处理的核心就是MapReduce程序设计模型。下面就来分享一下MapReduce都有那些值得我们注意的事情。
- MapReduce编程模型
- MapReduce执行流程
- MapReduce数据本地化(Data-Local)
- MapReduce的工作原理
Map 和Reduce 的概念是从函数式编程中借鉴而来的,而函数式编程也是现在比较流行的一个话题。整个的MapReduce计算过程其实是被分为Map阶段和Reduce阶段的。也就是映射与缩减两个独立的阶段过程。在Map中进行数据的读取与预处理,然后将预处理结果发送到Reduce中进行有效的合并。
创建Map的代码
在使用MapReduce的过程中,第一个测试示例就是统计一篇文章中每个单词出现的次数。下面就来实现以下
import sys word_list =[] for line in sys.stdin: word_list = line.strip().split(''); if len(word_list)<=0: continue; for word in word_list: w = word.strip(); if len(w)<=0: continue; print "t".join([w,"1"])
上面这段代码主要的工作就是从文章中,对单词进行以空格的方式分割。
word_list = line.strip().split(’ '),这个代码将当前读取的一整行数据按照空格进行分割,然后将分割后的结果存入word_list数组中,通过遍历,获取到每个单词,然后将对应次数加一。
创建Reduce代码
import sys cur_word = None sum_of_word = 0 for line in sys.stdin: ss = line.strip().split('t') if len(ss)!=2: continue word = ss[0].strip() count = ss[1].strip() if cur_word = None: cur_word = word if cur_word != word: print 't'.join([cur_word,str(sum_of_word)]) sum_of_word = 0 cur_word = word sum_of_word += int(count) print 't'.join([cur_word,str(sum_of_word)]) sum_of_word = 0
上面这段代码对Map阶段的数组进行汇总处理,从Map到Reduce的过程中,存在一个缓存的分组阶段,保证同一个Word记录会连续传入到reduce中,所以在Reduce阶段,需要的是对连续的相同Word进行累加。如图所示
MapReduce执行流程 上面介绍的是MapReduce计算框架的一般的执行流程。如下图所示
1、输入和拆分
这个过程不属于Map和Reduce的主要过程,他属于整个的计算框架中最消耗时间的一部分 *** 作,这个过程是为Map准备数据。
分片 *** 作
split只要将源文件的内容分片形成一系列的InputSplit,每个InputSplit中存储着对应分片的数据信息(例如一些文件快信息、起始位置、数据长度、所在节点列表等等)它并不是将源文件分割成多个小文件,每个InputSplit都是由一个Mapper进行后续处理的。
splitSize是参数是通过如下的三个值来确定的
- minSize:splitSize的最小值,是由mapred-site.xml配置文件中的mapred.min.split.size参数确定。
- maxSize:splitSize的最大值,是由mapred-size.xml配置文件中mapreduce.jobtracker.split.metainfo.maxsize参数确定。
- blockSize:HDFS中文件存储块大小,是由hafs-site.xml配置文件中dfs.block.size参数确定。
splitSize的确定规则:splitSize=max{minSize,min{maxSize,blockSize}}
数据格式化 *** 作
将划分好的InputSplit格式化层对应的键值对,其中key是偏移量,value是每一行的内容。
需要注意的是,在Map执行任务的过程中,数据格式化的 *** 作会一直执行,每生成一个键值对就会将其传入到map中,进行处理,所以Map和数据格式化 *** 作并不会存在前后时间差,而是同时进行的 *** 作。
2、Map映射
这个 *** 作是在Hadoop并行发挥的 *** 作,根据用户指定的Map过程,MapReduce尝试在数据所在的机器上执行对应的Map程序,在HDFS中,文件数据被复制成了多份,这个在之前的分享中介绍过,所以计算将会选择拥有数据的最空闲的节点进行 *** 作。
在这个过程中,Map内部的过程可以由用户自定义。
3、Shuffle派发
Shuffle过程是指Mapper产生的直接输出结果,经过一系列的处理,成为最终的Reducer直接输入数据为止的整个过程。这是MapReduce的核心处理过程,整个的过程分为两个阶段。
Mapper端的Shuffle:由Mapper产生的结果并不会直接写入到磁盘中,而是有限存储在内存中,当内存中的数据量达到一定的阈值之后,一次性的写入到本地磁盘中,并且同时进行sort、combine、partition等等的 *** 作。sort是将Mapper中产生的数据按照key值进行排序;combine是将key值相同的记录进行合并;partition是把数据均衡的分配给Reducer。
Reducer端的Shuffle:由Mapper和Reduce在不同的节点上运行,所以Reducer需要从多个节点上下载Mapper的结果数据,并且对这些数据进行处理,然后才能被Reducer处理。
4、Reduce缩减
Reducer接收形式的数据流,形成的输出,具体的过程可以由用户自定义,最终结果直接写入hdfs。每个Reduce进程会对应一个输出的文件,名称是以part-开头。
HDFS和MapReduce是Hadoop的核心设计,对于HDFS,是存储的基础,在数据层面上提供海量数据存储的支持,而MapReduce,在数据的上层,通过编写MapReduce程序对海量数据进行计算处理。
在之前的分享中,HDFS中,NameNode是文件系统的名字节点进程,DataNode是文件系统的数据节点进程。
MapReduce计算框架负责计算任务调度JobTracker对应HDFS的NameNode。只不过一个是负责计算任务的调度,一个是负责存储任务的调度。
考虑到“本地化的原则”,一般将NameNode和JobTracker部署到同一台机器上,各个DataNode和TaskNode也同样部署到同一个机器上。
这样做的目的是将Map的任务分配给含有Map处理的数据块的TaskTracker上,同时也是将程序的Jar包复制到了含有TaskTracker上执行,这叫做“计算移动,数据不移动”。分配Reduce任务的时候并不考虑数据本地化。
通过Client、JobTrask和TaskTracker的角度来分析MapReduce的工作原理。
首先客户端启动一个Job,向JobTracker请求一个JobID。将支持运行的所有的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件以及支持客户算计算的的输入划分的信息。这些文件都是存放到在JobTracker专门创建的文件夹中,这个文件夹的名称就是JobID ,JAR文件默认有10个副本,这个是由mapred.submit.replication 属性开工至,输入划分的信息告诉了JobTracker应该为这个作业启动多少个Map任务等信息。
JobTracker接收到作业后,将Job放入到一个队列中,等待Job调度器对于Job进行调用。当调度器根据自己的调度算法调度到对应的Job的时候,则会根据输入划分信息为每个划分创建一个Map任务,并且将Map人任务分配给TaskTracker执行。对于Map和Reduce任务,TaskTracker,根据主机核的数量和内存的大小有固定数量的Map和Reduce。
这里需要注意的就是Map任务不是随随便便就分配给了某个TaskTracker,这就是涉及到了上面说的本地化数据。
TaskTracker每隔一段时间就会给JobTracker发送心跳,告诉JobTracker它依然在运行,同时心跳中还携带了很多的信息。例如当前Map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息的时候 ,就会把作业设置成成功。当JobClient查询状态的时候,它就会知道任务已经完成,便将消息发送给用户。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)