mr执行过程及运行原理

mr执行过程及运行原理,第1张

split逻辑切分:

这里的分片不是物理分片,输入分片存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数据

例如10MB文件,切分10各1MB小文件,0-1MB位置定义为第一个切片,1MB-2MB定义为第二个分片

map阶段(每个分片对应一个map task)

每个mapper任务都是一个java进程,它读取hdfs文件中自己对应的输入分片,将分片中记录按照一定规则调用map函数解析成键值对,如<word,1>,<word,1>形式,如果有100个键值对,就调用100次map方法

键:每一行的起始位置            值:本行的文本内容

map shuffle阶段、洗牌阶段,分区partition,排序sort,combine(本地reducer),合并

map方法输出的数据,进入到内存缓冲区,缓冲区满了后,启动线程写磁盘,在启动线程写磁盘之前,对数据进行key的hash分区,对每个分区进行key值排脊早神序,设置了combiner,则对排序的数据做简单的合并重复key值 *** 作,如<word,2>,写磁盘 *** 作会产生多个文件,当map写完磁盘后则对文件进行一次合并,确保一个map task最终只生成一个数据文件

reduce shuffle阶段:copy、merge、reduce

map方法输出完成后,reduce线程会启动copy线程,请求所有map task的输出结果,如果reduce端接收的数据量小,则直接存内存中,数据量超过内存,则数据数据合并后写磁盘,在写磁盘过程中会把这些文件合并成一个更大的有序文件,最后一次合并的结果没有写磁盘,直接输入给reduce函数中

对copy过来的数据先睁配放入内存缓冲区中,如果是数据量超过缓冲区大小,则对数据合并后写磁盘,如果设置combiner,combiner也可以这个时候做合并,如果map task1中的<word,1>,map task2中的<word,2>,那么combiner之后则为<word,{1,2}>

调用reduce函数

reduce阶段分组好的<word,{1,2}>,调用reduce函数进行聚合<word,3>,将结果输出到hdfs,每个reduce进程会对应一个输出文件,名称以part-开头

词频统计mr过程:

split:由于输入文件太大,mapreduce会对其进行分割,大文件会被切分成多份

map:解析樱亏出每一行中的每个单词,并在后面记上数字1,表示此单词出现过1次

shuffle:将每一份中相同的单词分组到一起,并按默认字母顺序进行升序排序

reduce:将每一组中的单词出现的次数进行累加求和

以上复杂的运行过程,用一张图来简单说明,方便理解和记忆,如下图所示:

1.首先介册裤绍一下wordcount 早mapreduce框架中的 对应关系

大家都知道 mapreduce 分为 map 和reduce 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 reduce;

大家都明白 map接受一个参数,经过map处理后,将处理结果作为reduce的入参分发给reduce,然后在reduce中统计了word 的数量,最终输出到输出结果;

但是初看遇到的问题:

一、map的输入参数是个 Text之类的 对州塌简象,并不是 file对象

二、reduce中并没有if-else之类的判断语句 ,来说明 这个word 数量 加 一次,那个word 加一次。那么这个判断到底只是在 map中已经区分了 还是在reduce的时候才判断的

三、map过程到底做了什么,reduce过程到底做了什么?为什么它能够做到多个map多个reduce?

一、

1. 怎么将 文件参数 传递 到 job中呢?

在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]))

实际上 addInputPath 做了以下的事情(将文件路径加载到了conf中)

public static void addInputPath(Job job,

Path path) throws IOException {

Configuration conf = job.getConfiguration()

path = path.getFileSystem(conf).makeQualified(path)

String dirStr = StringUtils.escapeString(path.toString())

String dirs = conf.get(INPUT_DIR)

conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr)

}

我们再来看看 FileInputFormat 是做什么用的, FileInputFormat 实现了 InputFormat 接口 ,这个接口是hadoop用来接收客户端输入参数的。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。

我们会看到 在 InputFormat 接口中 有getSplits方法,也就是说分衫祥片 *** 作实际上实在 map之前 就已经做好了

List<InputSplit>getSplits(JobContext job)

Generate the list of files and make them into FileSplits.

具体实现参考 FileInputFormat getSplits 方法:

上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize))也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:mapreduce.input.num.files。

二、计算出来的分片有时怎么传递给 map呢 ?对于单词数量如何累加?

我们使用了 就是InputFormat中的另一个方法createRecordReader() 这个方法:

RecordReader:

RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从API接口中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。

可以看到接口中有:

public abstract boolean nextKeyValue() throws IOException, InterruptedException

public abstract KEYIN getCurrentKey() throws IOException, InterruptedException

public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException

public abstract float getProgress() throws IOException, InterruptedException

public abstract void close() throws IOException

FileInputFormat<K,V>

Direct Known Subclasses:

CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat

对于 wordcount 测试用了 NLineInputFormat和 TextInputFormat 实现类

在 InputFormat 构建一个RecordReader 出来,然后调用RecordReader initialize 的方法,初始化RecordReader 对象

那么 到底 Map是怎么调用 的呢? 通过前边我们 已经将 文件分片了,并且将文件分片的内容存放到了RecordReader中,

下面继续看看这些RecordReader是如何被MapReduce框架使用的

终于 说道 Map了 ,我么如果要实现Map 那么 一定要继承 Mapper这个类

public abstract class Context

implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{

}

protected void setup(Context context) throws IOException, InterruptedException

protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException { }

protected void cleanup(Context context ) throws IOException, InterruptedException { }

public void run(Context context) throws IOException, InterruptedException { }

我们写MapReduce程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value)

最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始 *** 作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从context.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。

我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext。

public MapContextImpl(Configuration conf, TaskAttemptID taskid,

RecordReader<KEYIN,VALUEIN>reader,

RecordWriter<KEYOUT,VALUEOUT>writer,

OutputCommitter committer,

StatusReporter reporter,

InputSplit split) {

super(conf, taskid, writer, committer, reporter)

this.reader = reader

this.split = split

}

RecordReader 看来是在这里构造出来了, 那么 是谁调用这个方法,将这个承载着关键数据信息的 RecordReader 传过来了 ?

我们可以想象 这里 应该被框架调用的可能性比较大了,那么mapreduce 框架是怎么分别来调用map和reduce呢?

还以为分析完map就完事了,才发现这里仅仅是做了mapreduce 框架调用前的一些准备工作,

还是继续分析 下 mapreduce 框架调用吧:

1.在 job提交 任务之后 首先由jobtrack 分发任务,

在 任务分发完成之后 ,执行 task的时候,这时 调用了 maptask 中的 runNewMapper

在这个方法中调用了 MapContextImpl, 至此 这个map 和框架就可以联系起来了。

众所周知,一个“常见”的MapReduce程序的Map阶段,是把输入文件分割穗源毁成很多份,然后一行一行读取,然后进行编程 *** 作。但是这个说法实际上是不严谨的。实际上,在一个“常见”的MapReduce任务的Map阶段,启动时是做了以下步骤:

1. 把输入文件分割成M块,一台机器负责其中的几块,这一块叫一个“输入分片”

2. 对于每一块,启动一个任务,运行一个子任务,然后把这一块里面的每一行,一行行输入到Map方法中

这里M的决定十分的复杂,有很多因素能够影响它。例如,若HDFS是按照64M作为一块存储数据的,那么较好的想法就是不要分割这里面的数据块,那么上面M个块的大小有可能会被设为64M。

而这里就会遇到两个问题:

1. 对于某一行输入数据,我该分配这个数据给这M个任务中的哪一个?这些任务又由哪台机器负责处理?

2. 如果发生了某一行输入数据,一半在机器A存放,而另一半在机器B存放,怎么搞?

对于第一个问题,一个比较好的解决方法是,谁有数据谁去做。例如有一行数据,在设置三个复本的情况下,它可能存放于机器A,B,C中,那么我们可以指定这行数据(甚至其所属的块)由机器A去做(实际上更复杂)。

对于第二个问题,最简单的方法就是,在开始的时候,重新划分一下输入分片,然后让某台机器把那半行数据给另一台机器。

而这在“常见”的情况中,是通过记录以下信息实现的:

1. 输入分片所在的文件

2. 输入分片的偏移量

3. 输入分片的长度

4. 输入分片所在机器

而通过这种方法,系统就可以通过上面的信息告诉负责这一个输入分片的机器,要去哪里读取数据。然后,启动的子任务就会把这一个输入分片按行分割,然后同样用偏移量的方法,一行行处理数据。在这种情况中,Map方法的输入KEY就是偏移量了。

那么关于能否不按行读取的问题,实际上,这里有一个implement,叫“InputFormat”,而用户在启动MapReduce的时候需要指定一个它的一个实现。而通过重写里面的方法,是可以修改读取数据的方式的。上面我说的“常见”的情况,其实就是在没有指定InputFormat的实现的时候,调用的默认方法。

而具体怎么重写这个方法,并不是我想要讨猜备论的。这里我在尽量不涉及源代码的情况下,用相当不严谨但是较为易懂的语言说了这么多,是想说裂哪明一件事:在MapReduce程序中,网络负载是一个很重要的瓶颈。为什么要尽量用本地的数据进行处理?为什么会有机架感知功能?这实际上就是为了减轻网络负载。不过实际上,即使是本地的数据,在运作的过程中其也是先发送出去,然后兜一圈返回本机再进行计算。

所以:

1. MapReduce编程的一大问题,是如何减轻网络负载。

2. HDFS和MapReduce架构的设计,确实十分精妙地考虑了这个问题。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/12200602.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-21
下一篇 2023-05-21

发表评论

登录后才能评论

评论列表(0条)

保存