大数据-MapReduce计算框架

大数据-MapReduce计算框架,第1张

数据-MapReduce计算框架

导语
  MapReduce作为Hadoop核心编程模型,在Hadoop中,数据处理的核心就是MapReduce程序设计模型。下面就来分享一下MapReduce都有那些值得我们注意的事情。

文章目录
    • MapReduce编程模型
    • MapReduce执行流程
    • MapReduce数据本地化(Data-Local)
    • MapReduce的工作原理

MapReduce编程模型

  Map 和Reduce 的概念是从函数式编程中借鉴而来的,而函数式编程也是现在比较流行的一个话题。整个的MapReduce计算过程其实是被分为Map阶段和Reduce阶段的。也就是映射与缩减两个独立的阶段过程。在Map中进行数据的读取与预处理,然后将预处理结果发送到Reduce中进行有效的合并。

创建Map的代码

  在使用MapReduce的过程中,第一个测试示例就是统计一篇文章中每个单词出现的次数。下面就来实现以下

import sys
word_list =[]
for line in sys.stdin:
   word_list = line.strip().split('');
   if len(word_list)<=0:
      continue;
   for word in word_list:
      w = word.strip();
      if len(w)<=0:
        continue;
      print "t".join([w,"1"])     

  上面这段代码主要的工作就是从文章中,对单词进行以空格的方式分割。

  word_list = line.strip().split(’ '),这个代码将当前读取的一整行数据按照空格进行分割,然后将分割后的结果存入word_list数组中,通过遍历,获取到每个单词,然后将对应次数加一。

创建Reduce代码

import sys

cur_word = None
sum_of_word = 0

for line in sys.stdin:
   ss = line.strip().split('t')
   if len(ss)!=2:
   	  continue
   word = ss[0].strip()
   count = ss[1].strip()
   if cur_word = None:
      cur_word = word
   if cur_word != word:
      print 't'.join([cur_word,str(sum_of_word)])
      sum_of_word = 0
      cur_word = word
   sum_of_word += int(count)
print 't'.join([cur_word,str(sum_of_word)])
sum_of_word = 0         	  

  上面这段代码对Map阶段的数组进行汇总处理,从Map到Reduce的过程中,存在一个缓存的分组阶段,保证同一个Word记录会连续传入到reduce中,所以在Reduce阶段,需要的是对连续的相同Word进行累加。如图所示

MapReduce执行流程

  上面介绍的是MapReduce计算框架的一般的执行流程。如下图所示

1、输入和拆分

  这个过程不属于Map和Reduce的主要过程,他属于整个的计算框架中最消耗时间的一部分 *** 作,这个过程是为Map准备数据。

分片 *** 作

  split只要将源文件的内容分片形成一系列的InputSplit,每个InputSplit中存储着对应分片的数据信息(例如一些文件快信息、起始位置、数据长度、所在节点列表等等)它并不是将源文件分割成多个小文件,每个InputSplit都是由一个Mapper进行后续处理的。

  splitSize是参数是通过如下的三个值来确定的

  • minSize:splitSize的最小值,是由mapred-site.xml配置文件中的mapred.min.split.size参数确定。
  • maxSize:splitSize的最大值,是由mapred-size.xml配置文件中mapreduce.jobtracker.split.metainfo.maxsize参数确定。
  • blockSize:HDFS中文件存储块大小,是由hafs-site.xml配置文件中dfs.block.size参数确定。

  splitSize的确定规则:splitSize=max{minSize,min{maxSize,blockSize}}

数据格式化 *** 作

  将划分好的InputSplit格式化层对应的键值对,其中key是偏移量,value是每一行的内容。

  需要注意的是,在Map执行任务的过程中,数据格式化的 *** 作会一直执行,每生成一个键值对就会将其传入到map中,进行处理,所以Map和数据格式化 *** 作并不会存在前后时间差,而是同时进行的 *** 作。

2、Map映射

  这个 *** 作是在Hadoop并行发挥的 *** 作,根据用户指定的Map过程,MapReduce尝试在数据所在的机器上执行对应的Map程序,在HDFS中,文件数据被复制成了多份,这个在之前的分享中介绍过,所以计算将会选择拥有数据的最空闲的节点进行 *** 作。

  在这个过程中,Map内部的过程可以由用户自定义。

3、Shuffle派发

  Shuffle过程是指Mapper产生的直接输出结果,经过一系列的处理,成为最终的Reducer直接输入数据为止的整个过程。这是MapReduce的核心处理过程,整个的过程分为两个阶段。

Mapper端的Shuffle:由Mapper产生的结果并不会直接写入到磁盘中,而是有限存储在内存中,当内存中的数据量达到一定的阈值之后,一次性的写入到本地磁盘中,并且同时进行sort、combine、partition等等的 *** 作。sort是将Mapper中产生的数据按照key值进行排序;combine是将key值相同的记录进行合并;partition是把数据均衡的分配给Reducer。

Reducer端的Shuffle:由Mapper和Reduce在不同的节点上运行,所以Reducer需要从多个节点上下载Mapper的结果数据,并且对这些数据进行处理,然后才能被Reducer处理。

4、Reduce缩减
  Reducer接收形式的数据流,形成的输出,具体的过程可以由用户自定义,最终结果直接写入hdfs。每个Reduce进程会对应一个输出的文件,名称是以part-开头。

MapReduce数据本地化(Data-Local)

  HDFS和MapReduce是Hadoop的核心设计,对于HDFS,是存储的基础,在数据层面上提供海量数据存储的支持,而MapReduce,在数据的上层,通过编写MapReduce程序对海量数据进行计算处理。

  在之前的分享中,HDFS中,NameNode是文件系统的名字节点进程,DataNode是文件系统的数据节点进程。

  MapReduce计算框架负责计算任务调度JobTracker对应HDFS的NameNode。只不过一个是负责计算任务的调度,一个是负责存储任务的调度。

  考虑到“本地化的原则”,一般将NameNode和JobTracker部署到同一台机器上,各个DataNode和TaskNode也同样部署到同一个机器上。

  这样做的目的是将Map的任务分配给含有Map处理的数据块的TaskTracker上,同时也是将程序的Jar包复制到了含有TaskTracker上执行,这叫做“计算移动,数据不移动”。分配Reduce任务的时候并不考虑数据本地化。

MapReduce的工作原理

  通过Client、JobTrask和TaskTracker的角度来分析MapReduce的工作原理。

  首先客户端启动一个Job,向JobTracker请求一个JobID。将支持运行的所有的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件以及支持客户算计算的的输入划分的信息。这些文件都是存放到在JobTracker专门创建的文件夹中,这个文件夹的名称就是JobID ,JAR文件默认有10个副本,这个是由mapred.submit.replication 属性开工至,输入划分的信息告诉了JobTracker应该为这个作业启动多少个Map任务等信息。
  JobTracker接收到作业后,将Job放入到一个队列中,等待Job调度器对于Job进行调用。当调度器根据自己的调度算法调度到对应的Job的时候,则会根据输入划分信息为每个划分创建一个Map任务,并且将Map人任务分配给TaskTracker执行。对于Map和Reduce任务,TaskTracker,根据主机核的数量和内存的大小有固定数量的Map和Reduce。

  这里需要注意的就是Map任务不是随随便便就分配给了某个TaskTracker,这就是涉及到了上面说的本地化数据。

  TaskTracker每隔一段时间就会给JobTracker发送心跳,告诉JobTracker它依然在运行,同时心跳中还携带了很多的信息。例如当前Map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息的时候 ,就会把作业设置成成功。当JobClient查询状态的时候,它就会知道任务已经完成,便将消息发送给用户。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5698621.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存