如何通过Java程序提交yarn的mapreduce计算任务

如何通过Java程序提交yarn的mapreduce计算任务,第1张

由于项目需求,需要通过Java程序册瞎提交Yarn的MapReduce的计算任务。与一般的通过Jar包提交MapReduce任务不同,通过程序提交MapReduce任务需要有点小变动,详见以下代码。

以下为MapReduce主程序,有几点需要提一下:

1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。

2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的<key,value>不州拿空同,这里变为了<TextPair,Value>,TextPair的格式敏搭为<key1,key2>。

3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用。

一、YARN:Hadoop集群中的同一资源调度系统。Hadoop2.0后引入,主要功能有:负责集群中资源的统一调度,响应客户端的请求。

优缺点

二、YARN核心组件及架构

1. ResourceManger(RM):全局资源管理器,集群中只有一个活跃的RM,具体功能包括:处理客户端的请求;启动监控ApplicationMaster;监控NodeManger;资源的分配和调度。

2.ApplicationMaster(AM):每个应用程序(job任务)对应一个AM,负责计算job的资源情况,并向RM申请和任务的调度。具体功能包括:(1)计算job资源使用情况,与RM协商申请job的资源情况;(2)与NodeManger通信启动/停止Container,来执行/终止任务的具体执行;(3)监控任务的运行状态及失败处理。源备

3.NodeManager(NM):节点的资源管理器,每个节点启动一个,一般与DataNode一 一对应。具体功能包括:雹悔毁(1)监控和管理当前节点的资源使用情况;(2)通过心跳向RM汇报自身的资源使用情况;(3)处理RM的请求,分配执行AM的Container;(4):处理AM的请求,启动和停止执行任务的Container。

4.Container:资源的抽象,包括一系列描述信息,任务的运行前洞资源(节点、CPU、内存等),任务运行环境,启动命令等。

架构图见 yarn-arch

三、YARN运行流程

2. RM根据内部调度器,选取一个资源空闲的NM,启动一个Container来运行AM。

3.AM计算应用程序所需资源,向RM进行资源申请,申请字段包括:

message ResourceRequestProto {  

optional PriorityProtopriority = 1 // 资源优先级  

optional stringresource_name = 2 // 期望资源所在的host  

optional ResourceProtocapability = 3 // 资源量(mem、cpu)  

optional int32num_containers = 4 // 满足条件container个数  

optional boolrelax_locality = 5  //default = true   

}  

AM会根据文件的存储地址,分析运行需要的资源等,向RM申请一个期望的资源列表,RM同时考虑各个节点资源使用情况,最终分配一个资源列表。

4. RM返回资源列表,以cotainer结构

message ContainerProto {  

optional ContainerIdProtoid = 1 //container id  

optional NodeIdProtonodeId = 2 //container(资源)所在节点  

optional stringnode_http_address = 3  

optional ResourceProtoresource = 4 //分配的container数量  

optional PriorityProtopriority = 5 //container的优先级  

optional hadoop.common.TokenProtocontainer_token = 6 //container token,用于安全认证  

}

5. AM与NM通信,分配Container并执行任务,以 ContainerLaunchContext 结构发出请求。同时监控各个节点的运行情况(定期心跳),如果失败,AM可将该节点的任务调度到其他节点运行。

一个NN可以启动多个Container。

ContainerLaunchContext结构:

message ContainerLaunchContextProto {  

repeated StringLocalResourceMapProtolocalResources = 1 //该Container运行的程序所需的在资源,例如:jar包  

optional bytestokens = 2//Security模式下的SecurityTokens  

repeated StringBytesMapProtoservice_data = 3  

repeated StringStringMapProtoenvironment = 4 //Container启动所需的环境变量  

repeated stringcommand = 5 //该Container所运行程序的命令,比如运行的为java程序,即$JAVA_HOME/bin/java org.ourclassrepeated ApplicationACLMapProto application_ACLs = 6//该Container所属的Application的访问控制列表  

RM负责AM的启动和监控,若异常可重新运行。

AM负责真个job任务的运行、监控,及失败处理 *** 作。

四、YARN的调度器

先进先出FIFO

Cap 容量调度器

Fire 公平调度器

前言:

上节课我们讲了 MR job的提交YARN的工作流程 与 YARN的架构,本次课程详细讲讲YARN,多多总结。

YARN(主从) 资源  + 作业调度管理

YARN:是一种新的 Hadoop资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调侍族度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

                 ResourceManager(RM):主要接收客户端任务请求,接收和监控NodeManager(NM)的资源情况汇报,负责资源的分配与调度,启动和监控ApplicationMaster(AM)。

               ApplicationManager(作业):应用程序管理,它是负责系统中所有的job,包括job的提交与调度器协商资源来启动ApplicationMaster(AM)和监控(AM)运行状态,并且失败的时候能够重新启动它,更新分配给一个新的Container容器的进度或者状态,除了资源它不管,它就负责job                                             

               Scheduler(调度器):更具容量队列的限制条件将我们系统中的资源分配给正在运用的一个应用程序先进先出调度器 :一个作业运行完了,另一个才能运行

yarn的内置调度器:

1.FIFO先进先出,一个的简单调度器,适合低负载集群。(适合任务数量不多的情况下使用)

2.Capacity调度器,给不同队列(即用户或用户组)分配一个预期最小容量,在每个队列内部用层次化的FIFO来调度多个应用程序。(适用于有很多小的任务跑,需要占很多队列,不使用队列,会造成资源的浪费)

3.Fair公平调度器,针对不同的应用(也可以为用户或用户组),每个应用属于一个队列,主旨是让每个应用分配的资源大体相当。(当然可以设置权重),若是只有一个应用,那集群所有资源都是他的。 适用情况:共享大集群、队列之间有较大差别。(生产使用)

capacity调度器的启用:

在ResourceManager节点上的yarn-site.xml设置

Property===>yarn.resourcemanager.scheduler.class

Value=====>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler

capacity调度器的配置:

在目录$HADOOP_HOME/hadoop/etc/hadoop/capacity-scheduler.xml

修改完成后,需要执行下面的命令:

$HADOOP_YARN_HOME/bin/yarn rmadmin -refreshQueues    使功能动态生效。

NodeManager:主要是节点上的资源和作业管理器,启动衡清Container运行task计算,上报资源、container情况给RM和任务处理情况给AM,整个集群有多个。

ApplicationMaster: 它是负责我们作业的监控并跟踪老拦弊应用执行状态来重启失败任务的,主要是单个Application(Job)的task管理和调度,向RM进行资源的申请,向NM发出launchContainer指令,接收NM的task处理状态信息。一个job只有一个主程序。                                     

Container: 是YARN中资源的抽象,它封装了某个节点上一定量的资源(CPU和内存两类资源)。

Memory:

yarn.nodemanager.resource.memory-mb:64*0.8G=50G  (如果内存是64G,Yarn只能用到内存的80%也就是50G)

yarn.scheduler.minimum-allocation-mb: 1G

yarn.scheduler.maximum-allocation-mb: 1G   50/1=50(并行度) 数量是多了,并行度大了  

一个作业200 MapTask 4轮才能结束,速度快了  作业可能挂了

yarn.scheduler.maximum-allocation-mb: 16G (生产设16G)   50/16=3(并行度) 数量是少了,并行度小了  

一个作业200 MapTask 70轮才能结束,速度慢了  作业时间长 稳定不会挂

工作中一个job可以指定 yarn.scheduler.maximum-allocation-mb的值,但一般不指定。

【若泽大数据实战】使用YARN跑一个jar包

先启动Yarn

进入hadoop的bin目录 在hdfs上创建一个新文件夹

创建一个test.log文件

将当前目录中的某个test.log文件复制到hdfs中(注意要确保当前目录中有该文件)

查看hdfs中是否有我们刚复制进去的文件

进入share的上层目录,提交单词统计任务,实验环境下我们的提交差不多在15秒左右

生产环境中,应该是30~50之间,调优可以压到10秒之内

登录网页查看相关信息:http://192.168.137.30:8088/cluste

Yarn的常用命令

客户端提交job给 Applications Manager 连接Node Manager去申请一个Container的容器,这个容器运行作业的App Mstr的主程序,启动后向App Manager进行注册,然后可以访问URL界面,然后App Mastr向 Resource Scheduler申请资源,拿到一个资源的列表,和对应的NodeManager进行通信,去启动对应的Container容器,去运行 Reduce Task 和 Map Task (两个先后运行顺序随机运行),它们是向App Mstr进行汇报它们的运行状态, 当所有作业运行完成后还需要向Applications Manager进行汇报并注销和关闭

yarn中,它按照实际资源需求为每个任务分配资源,比如一个任务需要1GB内存,1个CPU,则为其分配对应的资源,而资源是用container表示的,container是一个抽象概念,它实际上是一个JAVA对象,里面有资源描述(资源所在节点,资源优先级,资源量,比如CPU量,内存量等)。当一个applicationmaster向RM申请资源时,RM会以container的形式将资源发送给对应的applicationmaster,applicationmaster收到container后,与对应的nodemanager通信,告诉它我要利用这个container运行某个任务。

基于以上考虑,YARN允许用户配置每个节点上可用的物理内存资源,注意,这里是“可用的”,因为一个节点上的内存会被若

干个服务共享,比如一部分给YARN,一部分给HDFS,一部分给HBase等,YARN配置的只是自己可以使用的,配置参数如下:

(1)yarn.nodemanager.resource.memory- - mb

表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而

YARN不会智能的探测节点的物理内存总量。

(2)yarn.nodemanager.vmem- - pmem- - ratio

任务每使用1MB物理内存,最多可使用虚拟内存量,默认是2.1。

(3)yarn.nodemanager.pmem- - check- - enabled

是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true。

(4) yarn.nodemanager.vmem- - check- - enabled

是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。

(5)yarn.scheduler.minimum- - allocation- - mb

单个任务可申请的最少物理内存量,默认是1024(MB),如果一个任务申请的物理内存量少于该值,则该对应的值改为这个数。

(6)yarn.scheduler.maximum- - allocation- - mb

单个任务可申请的最多物理内存量,默认是8192(MB)。

默认情况下,YARN采用了线程监控的方法判断任务是否超量使用内存,一旦发现超量,则直接将其杀死。由于Cgroups对内存的控

制缺乏灵活性(即任务任何时刻不能超过内存上限,如果超过,则直接将其杀死或者报OOM),而Java进程在创建瞬间内存将翻

倍,之后骤降到正常值,这种情况下,采用线程监控的方式更加灵活(当发现进程树内存瞬间翻倍超过设定值时,可认为是正常

现象,不会将任务杀死),因此YARN未提供Cgroups内存隔离机制。

CPU资源的调度和隔离:

目前的CPU被划分成虚拟CPU(CPU virtual Core),这里的虚拟CPU是YARN自己引入的概念,

初衷是,考虑到不同节点的CPU性能可能不同,每个CPU具有的计算能力也是不一样的,比如某个物

理CPU的计算能力可能是另外一个物理CPU的2倍,这时候,你可以通过为第一个物理CPU多配置几个

虚拟CPU弥补这种差异。用户提交作业时,可以指定每个任务需要的虚拟CPU个数。在YARN中,CPU

相关配置参数如下:

(1) yarn.nodemanager.resource.cpu- - vcores

表示该节点上YARN可使用的虚拟CPU个数,默认是4,注意,目前推荐将该值设值为与物理CPU核数

数目相同。如果你的节点CPU核数不够8个,则需要调减小这个值,而YARN不会智能的探测节点的物

理CPU总数。

(2)  yarn.scheduler.minimum- - allocation- - vcores

单个任务可申请的最小虚拟CPU个数,默认是1,如果一个任务申请的CPU个数少于该数,则该对应

的值改为这个数。

(3) yarn.scheduler.maximum- - allocation- - vcores

单个任务可申请的最多虚拟CPU个数,默认是32。

默认情况下,YARN是不会对CPU资源进行调度的,你需要配置相应的资源调度器。

【若泽大数据】生产场景:

内存改完参数 Yarn是要重启的

1.计算及时性要求比较高:memory不够,cpu是足够的,作业肯定是要挂掉的,立即手工调整oom,设置大,快速出结果,

2.计算机及时性不高:memory够,cpu不够,计算慢,

需求5分钟出1个job:job运行1分钟的时候,oom了内存不够,shell脚本里面可以改参数,修改脚本内存就自动加,

生产:cpu物理和虚拟的比例是1:2的关系(默认), 有的生产会设置1:1


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/12479570.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-25
下一篇 2023-05-25

发表评论

登录后才能评论

评论列表(0条)

保存