部分原文来自 support.pivotal.io 的翻译,对于该篇文章中感觉概念模糊不清的地方我做了修正,并扩充了我自己的部分理解,有不正确的地方还望大家指正
Yarn Container就是一个yarn的java进程(这里容易被误解成类似Linux Container的概念),在Mapreduce中的AM,MapTask,ReduceTask, spark的driver和executor等等都作为Container在Yarn的框架上执行,你可以在RM的网页上看到Container的状态。
从上面的图可以看出map,reduce,AM container的JVM,“JVM”矩形代表服务进程,“Max heap”,“Max virtual”矩形代表NodeManager对JVM进程的最大内存和虚拟内存的限制。
以map container内存分配(“mapreduce.map.memory.mb“)设置为1536M为例,AM将会为container向RM请求2048mb的内存资源(原因见上)。这是一种逻辑上的分配,这个值被NodeManager用来监控改进程内存资源的使用率, 如果Task进程树(包括task启动子进程占用的内存,这样可以解决hadoop streaming任务内存跑飞的情况,实际上是对内存使用的一种软限制,至于为什么没有使用Cgroups做限制,大家可以自行查阅资料)的使用超过了2048MB ,NM将会把这个task给杀掉。
mapreduce.map.java.opts和mapreduce.map.memory.mb区别:JVM进程跑在container中, mapreduce.{map|reduce}.java.opts能够通过Xmx设置JVM最大的heap的使用,一般设置为0.75倍的 mapreduce.{map|reduce}.memory.mb ,因为需要为java code,非JVM内存使用等预留些空间,同理:spark executor在申请内存是也会为堆外内存预留一些空间,参数由 spark.yarn.executor.memoryOverhead 控制,算法为 max(384m, 0.07*spark.executor.memory) **
当一个mapreduce job完成时,你将会看到一系列的计数器被打印出来,下面的三个计数器展示了多少物理内存和虚拟内存被分配
默认的(“ yarn.nodemanager.vmem-pmem-ratio “)设置为2.1,意味则map container或者reduce container分配的虚拟内存超过2.1倍的(“ mapreduce.reduce.memory.mb “)或(“ mapreduce.map.memory.mb “)就会被NM给KILL掉,如果 (“ mapreduce.map.memory.mb ”) 被设置为1536M那么总的虚拟内存为2.1*1536=3225.6MB
当container的内存超出要求的,log将会打印一下信息
在 3.2 中,提到 yarn.scheduler.increment-allocation-mb 参数用于控制container内存增量,如果需要更细粒度控制container内存增量,则需要修改该参数,那么接写来分析一下这个参数如何工作的
先看下该参数在 FairSchedulerConfiguration.java 中的定义(顺带上cpu增量)
在 FairScheduler.java 中 initScheduler 方法中,初始化了一个 incrAllocation 对象,表明资源使用的增量
在具体 FairScheduler#allocate 方法中使用(allocate是每次资源分配过程中入口方法,在此不再赘述,有兴趣的同学自己可以下来看源码)
接下来,我们看下在 Sanity check 中发生了什么
看注释,实际上 normalizeRequests 方法对申请的资源进行了一个检查。
我们看到最终调用了 normalizeRequest 方法,再往下追,最终发现调用到 ResourceCalculator#normalize 方法,ResourceCalculator实例对象为 DominantResourceCalculator (参见allocate方法)
其中 stepFactor 对象为之前提到的 incrAllocation 对象,所以可以看出,在这里进行了一个计算资源请求的 *** 作。
至此,这篇分析文章就要结束了,期间涉及到的一些细节并没有赘述,有兴趣的同学可以查阅源码做更深入的了解。
下篇文章,内容预告:
YARN允许用户配置每个节点上可用的物理内存资源,注意,这里是“可用的”,因为一个节点上的内存会被若干个服务共享,比如一部分给YARN,一部分给HDFS,一部分给HBase等,YARN配置的只是自己可以使用的,配置参数如下:
(1)yarn.nodemanager.resource.memory-mb
表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
(2)yarn.scheduler.minimum-allocation-mb
单个容器可申请的最少物理内存量,默认是1024(MB),如果一个容器申请的物理内存量少于该值,则该对应的值改为这个数。
(3) yarn.scheduler.maximum-allocation-mb
单个容器可申请的最多物理内存量,默认是8192(MB)
目前的CPU被划分成虚拟CPU(CPU virtual Core),这里的虚拟CPU是YARN自己引入的概念,初衷是,考虑到不同节点的CPU性能可能不同,每个CPU具有的计算能力也是不一样的,比如某个物理CPU的计算能力可能是另外一个物理CPU的2倍,这时候,你可以通过为第一个物理CPU多配置几个虚拟CPU弥补这种差异。用户提交作业时,可以指定每个任务需要的虚拟CPU个数。在YARN中,CPU相关配置参数如下:
(1)yarn.nodemanager.resource.cpu-vcores
表示该节点上YARN可使用的虚拟CPU个数,默认是8,注意,目前推荐将该值设值为与物理CPU核数数目相同。如果你的节点CPU核数不够8个,则需要调减小这个值,而YARN不会智能的探测节点的物
理CPU总数。
(2)yarn.scheduler.minimum-allocation-vcores
单个容器可申请的最小虚拟CPU个数,默认是1,如果一个容器申请的CPU个数少于该数,则该对应的值改为这个数
(3)yarn.scheduler.maximum-allocation-vcores
单个容器可申请的最多虚拟CPU个数,默认是4
3.mapreduce---Memory调优
(1)yarn.app.mapreduce.am.resource.mb
MR AppMaster需要的内存,默认是1536M
(2)yarn.app.mapreduce.am.command-opts
MR AppMaster的Java opts ,默认是 -Xmx1024m
(3)mapreduce.map.memory.mb
每个map task所需要的内存,默认是1024M。应该是大于或者等于Container的最小内存
(4)mapreduce.reduce.memory.mb
每个reduce task所需要的内存,默认是1024M
(5)mapreduce.map.java.opts
map task进程的java.opts,默认是 -Xmx200m
(6)mapreduce.reduce.java.opts
reduce task进程的java.opts,默认是 -Xmx200m
特别注意:
mapreduce.map.memory.mb >mapreduce.map.java.opts
mapreduce.reduce.memory.mb >mapreduce.reduce.java.opts
mapreduce.map.java.opts / mapreduce.map.memory.mb
=0.70~0.80
mapreduce.reduce.java.opts / mapreduce.reduce.memory.mb
=0.70~0.80
在yarn container这种模式下,JVM进程跑在container中,mapreduce.{map|reduce}.java.opts 能够通过Xmx设置JVM最大的heap的使用,
一般设置为0.75倍的memory.mb,
则预留些空间会存储java,scala code等
4.mapreduce---CPU调优
(1)mapreduce.map.cpu.vcores
map task的虚拟核数,默认为1
(2)mapreduce.reduce.cpu.vcores
reduce task的虚拟核数,默认为1
(3)yarn.app.mapreduce.am.resource.cpu-vcores
am的虚拟核数,默认为1
假设机器的物理配置 64G 16cores
装完系统还剩 62G
预留15~20% 14G:DN 4G + NM 1G=5G 9G
DN进程: 生产4G
1000m
hadoop-env.sh
HADOOP_NAMENODE_OPTS=-Xmx1024m
HADOOP_DATANODE_OPTS=-Xmx4096m
NM进程: 生产1G
yarn-env.sh
export YARN_RESOURCEMANAGER_HEAPSIZE=1024
export YARN_NODEMANAGER_HEAPSIZE=1024
部署同一台: 数据本地化
NN RM 经常性部署同一台 说白了 集群节点少
yarn.nodemanager.resource.memory-mb : 48G 计算总内存 固定经验计算值
yarn.nodemanager.resource.cpu-vcores : 24
yarn.scheduler.minimum-allocation-mb : 4G
yarn.scheduler.minimum-allocation-vcores: 2
yarn.scheduler.maximum-allocation-mb : 8G
yarn.scheduler.maximum-allocation-vcores : 4 固定经验值(不要超过5个)
http://blog.itpub.net/30089851/viewspace-2127851/
http://blog.itpub.net/30089851/viewspace-2127850/
客户端(client)
提交MapReduce作业
JobTracker
1.作业调度:将一个作业(Job)分成若干个子任务分发到taskTraker中去执行
2.任务监控:TaskTracker发送心跳给JobTracker报告自己的运行状态,以让JobTracker能够监控到他
3.资源管理:每个任务向JobTracker申请资源
4.监控过程中发现失败或者运行过慢的任务,对他进行重新启动
TaskTracker
主动发送心跳给jobTracker并与JobTracker通信,从而接受到JobTracker发送过来需要执行的任务
资源表示模型
用于描述资源表示形式,Hadoop1.0使用“槽位(slot)”组织各个节点的资源,为了简化资源的管理,Hadoop将各个节点上资源(CPU、内存、网络IO、磁盘IO等等)等量切分成若干份,每一份用“slot”表示,同时规定一个task可根据实际情况需要占用多个”slot”。
简单的说:hadoop1.0将多维度的资源进行了抽象,使用“slot”来表示,从而简化对资源的管理。
资源分配模型
而资源分配模型则决定如何将资源分配给各个作业/任务,在Hadoop中,这一部分由一个插拔式的调度器完成。
更进一步说,slot相当于运行的“许可证”,一个任务只有获得“许可证”后,才能够获得运行的机会,这也意味着,每一个节点上的slot的数量决定了当前节点能够并发执行多少个任务。Hadoop1.0为了区分MapTask跟ReduceTask所使用资源的差异,进一步将slot分为MapSlot跟ReduceSlot,他们分别只能被MapTask跟ReduceTask使用。
Hadoop集群管理员可根据各个节点硬件配置和应用特点为它们分配不同的map slot数(由参数mapred.tasktracker.map.tasks.maximum指定)和reduce slot数(由参数mapred.tasktrackerreduce.tasks.maximum指定)
静态资源配置 。 采用了静态资源设置策略,即每个节点事先配置好可用的slot总数,这些slot数目一旦启动后无法再动态修改。
资源无法共享 。 Hadoop 1.0将slot分为Map slot和Reduce slot两种,且不允许共享。对于一个作业,刚开始运行时,Map slot资源紧缺而Reduce slot空闲,当Map Task全部运行完成后,Reduce slot紧缺而Map slot空闲。很明显,这种区分slot类别的资源管理方案在一定程度上降低了slot的利用率。
资源划分粒度过大 。资源划分粒度过大,往往会造成节点资源利用率过高或者过低 ,比如,管理员事先规划好一个slot代表2GB内存和1个CPU,如果一个应用程序的任务只需要1GB内存,则会产生“资源碎片”,从而降低集群资源的利用率,同样,如果一个应用程序的任务需要3GB内存,则会隐式地抢占其他任务的资源,从而产生资源抢占现象,可能导致集群利用率过高。
没引入有效的资源隔离机制 。Hadoop 1.0仅采用了基于jvm的资源隔离机制,这种方式仍过于粗糙,很多资源,比如CPU,无法进行隔离,这会造成同一个节点上的任务之间干扰严重。
主要是InputFormat。InputFormat类有2个重要的作用:
1)将输入的数据切分为多个逻辑上的InputSplit,其中每一个InputSplit作为一个map的输入。
2)提供一个RecordReader,用于将InputSplit的内容转换为可以作为map输入的k,v键值对。
系统默认的RecordReader是 LineRecordReader ,它是 TextInputFormat (FileInputFormat的子类)对应的RecordReaderMap读入的Key值是偏移量,Value是行内容。
两个Mapper各自输入一块数据,由键值对构成,对它进行加工(加上了个字符n),然后按加工后的数据的键进行分组,相同的键到相同的机器。这样的话,第一台机器分到了键nk1和nk3,第二台机器分到了键nk2。
接下来再在这些Reducers上执行聚合 *** 作(这里执行的是是count),输出就是nk1出现了2次,nk3出现了1次,nk2出现了3次。从全局上来看,MapReduce就是一个分布式的GroupBy的过程。
从上图可以看到,Global Shuffle左边,两台机器执行的是Map。Global Shuffle右边,两台机器执行的是Reduce。
Hadoop会将输入数据划分成等长的数据块,成为数据分片。Hadoop会为每个分片构建一个map任务。并行的处理分片时间肯定会少于处理整个大数据块的时间,但由于各个节点性能及作业运行情况的不同,每个分片的处理时间可能不一样,因此, 把数据分片切分的更细可以得到更好的负载均衡 。
但另一方面,分片太小的话,管理分片和构建map任务的时间将会增多。因此,需要在hadoop分片大小和处理分片时间之间做一个权衡。对大多数作业来说,一个分片大小为64MB比较合适,其实,Hadoop的默认块大小也是64MB。
我们上面看到了hadoop的数据块大小与最佳分片大小相同,这样的话,数据分片就不容易跨数据块存储,因此,一个map任务的输入分片便可以直接读取本地数据块,这就 避免了再从其它节点读取分片数据 ,从而节省了网络开销。
map的任务输出是 写入到本地磁盘而非HDFS的 。那么为什么呢?因为map任务输出的是中间结果,一旦map任务完成即会被删除,如果把它存入HDFS中并实现备份容错,未免有点大题小做。如果一个map任务失败,hadoop会再另一个节点重启map一个map任务。
而reduce任务并不具备数据本地化优势——单个reduce任务的输入通常来自所有mapper输出。一般排序过的map输出需要通过 网络传输 发送到运行reduce任务的节点,并在reduce端进行合并。reduce的输出通常需要存储到HDFS中以实现可靠存储。每个reduce输出HDFS块第一个复本会存储在本地节点,而其它复本则存储到其它节点,因此reduce输出也需要占用网络带宽。
1.调整reduce个数方法(1)
(1)每个Reduce处理的数据量默认是256MB
(2)每个任务最大的reduce数,默认为1009
(3)计算reducer数的公式
2.调整reduce个数方法(2)
在hadoop的mapred-default.xml文件中修改,设置每个job的Reduce个数
3.reduce个数并不是越多越好
(1)过多的启动和初始化reduce也会消耗时间和资源;
(2)另外,有多少个reduce,就会有多少个输出文件,如果产生了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置reduce个数的时候也需要考虑这两个原则:处理大数据利用适合的reduce数;使单个reduce任务处理数据大小要合适;
在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切,我们没有设置分片的范围的时候,分片大小是由block块大小决定的,和它的大小一样。
比如把一个258MB的文件上传到HDFS上,假设block块大小是128MB,那么它就会被分成三个block块,与之对应产生三个split,所以最终会产生三个map task。我又发现了另一个问题,第三个block块里存的文件大小只有2MB,而它的block块大小是128MB,那它实际占用Linux file system的多大空间?答案是实际的文件大小,而非一个块的大小。最后一个问题是: 如果hdfs占用Linux file system的磁盘空间按实际文件大小算,那么这个”块大小“有必要存在吗?其实块大小还是必要的,一个显而易见的作用就是当文件通过append *** 作不断增长的过程中,可以通过来block size决定何时split文件。
1.每个输入分片会让一个map任务来处理,map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。
2.在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行 排序 ,如果此时设置了Combiner,将排序后的结果进行Combiner *** 作,主要是在map计算出中间文件前做一个简单的合并重复key值的 *** 作,这样做的目的是让尽可能少的数据写入到磁盘。
3.当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和Combiner *** 作,目的有两个:1.尽量减少每次写入磁盘的数据量;2.尽量减少下一复制阶段网络传输的数据量。最后合并成了一个 已分区且已排序 的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了。
4.将分区中的数据 拷贝 (网络传输)给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。
Reduce端:
1.Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对 数据合并 后 溢写 到磁盘中。
2.随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并 *** 作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。
3.合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且 最后一次合并的结果 并没有写入磁盘,而是直接输入到reduce函数。
Read阶段 :MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value
Map阶段 :该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
Collect收集阶段 :在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollection.collect()输出结果。在该函数内部,它会将生成的 key/value分区 (调用Partitioner),并写入一个环形内存缓冲区中。
Spill阶段 :即“溢写”,当环形缓冲区满后,MapReduce会将数据写入本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地 排序 ,并在必要时对数据进行 combiner 、 压缩 等 *** 作。
溢写阶段详情:
合并阶段 :当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让一个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)