Hadoop技术内幕:深入解析YARN架构设计与实现原理
董西成
◆ 第2章 YARN设计理念与基本架构
由于MRv2将资源管理功能抽象成了一个独立的通用系统YARN,直接导致下一代MapReduce的核心从单一的计算框架MapReduce转移为通用的资源管理系统YARN。
将JobTracker中的资源管理和作业控制功能分开,分别由组件ResourceManager和ApplicationMaster实现,其中,ResourceManager负责所有应用程序的资源分配,而ApplicationMaster仅负责管理一个应用程序,进而诞生了全新的通用资源管理框架YARN。
MRv1的运行时环境主要由两类服务组成,分别是JobTracker和TaskTracker。其中,JobTracker负责资源和任务的管理与调度,TaskTracker负责单个节点的资源管理和任务执行。MRv1将资源管理和应用程序管理两部分混杂在一起,使得它在扩展性、容错性和多框架支持等方面存在明显缺陷。而MRv2则通过将资源管理和应用程序管理两部分剥离开,分别由YARN和ApplicationMaster负责,其中,YARN专管资源管理和调度,而ApplicationMaster则负责与具体应用程序相关的任务切分、任务调度和容错等
基本设计思想是将MRv1中的JobTracker拆分成了两个独立的服务:一个全局的资源管理器ResourceManager和每个应用程序特有的ApplicationMaster。其中ResourceManager负责整个系统的资源管理和分配,而ApplicationMaster负责单个应用程序的管理。
在整个资源管理框架中,ResourceManager为Master,NodeManager为Slave,ResourceManager负责对各个NodeManager上的资源进行统一管理和调度。
在YARN上的应用程序主要分为两类:短应用程序和长应用程序,其中,短应用程序是指一定时间内(可能是秒级、分钟级或小时级,尽管天级别或者更长时间的也存在,但非常少)可运行完成并正常退出的应用程序,比如MapReduce作业(将在第8章介绍)、Tez DAG作业(将在第9章介绍)等,长应用程序是指不出意外,永不终止运行的应用程序,通常是一些服务,比如Storm Service(主要包括Nimbus和Supervisor两类服务),HBase Service(包括Hmaster和RegionServer两类服务)[插图]等,而它们本身作为一个框架提供了编程接口供用户使用。尽管这两类应用程序作用不同,一类直接运行数据处理程序,一类用于部署服务(服务之上再运行数据处理程序),但运行在YARN上的流程是相同的。
YARN将分两个阶段运行该应用程序:第一个阶段是启动ApplicationMaster;第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成
◆ 第3章 YARN基础库
YARN的基础库
通常编写一个Protocol Buffers应用需要以下三步:1)定义消息格式文件,通常以proto作为扩展名;2)使用Google提供的Protocol Buffers编译器生成特定语言(目前支持C++、Java、Python三类语言)的代码文件;3)使用Protocol Buffers库提供的API来编写应用程序。
在YARN中,所有RPC函数的参数均采用Protocol Buffers定义的,相比MRv1中基于Writable序列化的方法,Protocol Buffers的引入使得YARN在向后兼容性和性能方面向前迈进了一大步。
Hadoop RPC主要分为四个部分,分别是序列化层、函数调用层、网络传输层和服务器端处理框架
RPC协议是客户端和服务器端之间的通信接口,它定义了服务器端对外提供的服务接口。
Reactor是并发编程中的一种基于事件驱动的设计模式,它具有以下两个特点:通过派发/分离I/O *** 作事件提高系统的并发性能;提供了粗粒度的并发控制,使用单线程实现,避免了复杂的同步处理。
Hadoop RPC Server处理流程
Hadoop RPC对外提供了一些可配置参数
YARN采用了基于事件驱动的并发模型,该模型能够大大增强并发性,从而提高系统整体性能。为了构建该模型,YARN将各种处理逻辑抽象成事件和对应事件调度器,并将每类事件的处理过程分割成多个步骤,用有限状态机表示。
在YARN中,所有核心服务实际上都是一个中央异步调度器,包括ResourceManager、NodeManager、MRAppMaster(MapReduce应用程序的ApplicationMaster)等,它们维护了事先注册的事件与事件处理器,并根据接收的事件类型驱动服务的运行。
◆ 第4章 YARN应用程序设计方法
YARN自身对应用程序类型没有任何限制,它可以是处理短类型任务的MapReduce作业,也可以是部署长时间运行的服务的应用程序
函数指针的调用,即是一个通过函数指针调用的函数;
如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用来调用其所指向的函数时,就说这是回调函数。
由于YARN应用程序编写比较复杂,且需对YARN本身的架构有一定了解,因此通常由专业人员开发,通过回调的形式供其他普通用户使用。
由于YARN应用程序编写比较复杂,且需对YARN本身的架构有一定了解,因此通常由专业人员开发,通过回调的形式供其他普通用户使用。
YARN是一个资源管理系统,负责集群资源的管理和调度。如果想要将一个新的应用程序运行在YARN之上,通常需要编写两个组件Client(客户端)和ApplicationMaster。
在实际应用环境中,为了减轻ResourceManager的负载,一旦应用程序的ApplicationMaster成功启动后,客户端通常直接与ApplicationMaster通信,以查询它的运行状态或者控制它的执行流程(比如杀死一个任务等)。
以MapReduce的客户端为例,当用户提交一个MapReduce应用程序时,需通过RPC协议ApplicationClientProtocol与ReourceManager通信,而一旦MapReduce的ApplicationMaster—MRAppMaster成功启动后,客户端通过另外一个RPC协议—MRClientProtocol直接与MRAppMaster通信,以查询应用程序运行状况和控制应用程序的执行
YarnClient。该库对用常用函数进行了封装,并提供了重试、容错等机制,用户使用该库可以快速开发一个包含应用程序提交、状态查询和控制等逻辑的YARN客户端,目前YARN本身自带的各类客户端均使用该编程库实现。
ApplicationMaster(AM)需要与ResourceManager(RM)和NodeManager(NM)两个服务交互,通过与ResourceManager交互,ApplicationMaster可获得任务计算所需的资源;通过与NodeManager交互,ApplicationMaster可启动计算任务(container),并监控它直到运行完成。
同一个节点上相同优先级的资源请求只能存在一种,比如可以全是<1vcore 2048mb>的Container,否则前面的资源会被后面申请的资源覆盖掉。
一旦ApplicationMaster收到一个Container后,将启动一个独立线程与对应的NodeManager通信,以运行任务;与此同时,如果发现某个Container被杀死,ApplicationMaster会为它重新申请资源。
◆ 第5章 ResourceManager 剖析
YARN也采用了Master/Slave结构,其中,Master实现为ResourceManager,负责整个集群资源的管理与调度;Slave实现为NodeManager,负责单个节点的资源管理与任务启动。
NodeManager与ResourceManager之间采用了“pull模型”(与MRv1类似),NodeManager总是周期性地主动向ResourceManager发起请求,并通过领取下达给自己的命令。
ResourceManager与ApplicationMaster之间也采用了“pull模型”。
ResourceManager主要完成以下几个功能:❑与客户端交互,处理来自客户端的请求;❑启动和管理ApplicationMaster,并在它运行失败时重新启动它;❑管理NodeManager,接收来自NodeManager的资源汇报信息,并向NodeManager下达管理指令(比如杀死Container等);❑资源管理与调度,接收来自ApplicationMaster的资源申请请求,并为之分配资源。
ResourceManager共维护了4类状态机,分别是RMApp、RMAppAttempt、RMContainer和RMNode
ResourceScheduler是资源调度器,它按照一定的约束条件(比如队列容量限制等)将集群中的资源分配给各个应用程序,当前主要考虑内存和CPU资源。
YARN自带了一个批处理资源调度器—FIFO(First In First Out)和两个多用户调度器—Fair Scheduler和Capacity Scheduler(默认资源调度器[插图])
AM运行过程中,需要周期性地通过RPC函数ApplicationMasterProtocol#allocate与RM通信,这主要有以下三个作用:❑请求资源;❑获取新分配的资源;❑形成周期性心跳,告诉RM自己还活着。
NM启动时,它所做的第一件事是向RM注册,这是通过RPC函数ResourceTracker#registerNodeManager实现的,注册信息包括节点可用资源总量、对外开放的HTTP端口号等。
一个节点总的可用资源量是在NodeManager注册时汇报给ResourceManager的,之后整个运行过程中不能动态修改(若修改需要重启NodeManager)
当一个AM获得一个Container后,YARN不允许AM长时间不对其使用,因为这会降低整个集群的利用率。当AM收到RM新分配的一个Container后,必须在一定的时间(默认为10min,管理员可通过参数yarn.resourcemanager.rm.container-allocation.expiry-interval-ms修改)内在对应的NM上启动该Container,否则RM将强制回收该Container。
在YARN ResourceManager内部,共有4类状态机,分别是RMApp、RMAppAttempt、RMContainer和RMNode。其中,前2类状态机维护了一个应用程序相关的生命周期,包括Application生命周期、一次运行尝试的生命周期;RMContainer则维护了分配出去的各个资源的使用状态;RMNode维护了一个NodeManager(一个节点上可以有多个NodeManager)的生命周期。
每个Application可能会尝试运行多次,每次称为一次“运行尝试”(Application Attempt,也可称为运行实例),而每次运行尝试的生命周期则由状态机RMAppAttemptImpl维护,如果一次运行尝试(实例)运行失败,RMApp会创建另外一个运行尝试,直到某次运行尝试运行成功或者达到运行尝试上限。对于每次运行尝试,ResourceManager将为它分配一个Container,Container是运行环境的抽象,内部封装了任务的运行环境和资源等信息,而一个应用程序的ApplicationMaster就运行在这个Container中。ApplicationMaster启动之后,会不断向ResourceManager申请Container以运行各类任务。
Application Attempt的生命周期与ApplicationMaster的生命周期基本上是一致的:一个Application内部所有任务均由ApplicationMaster维护和管理,ApplicationMaster本身需要占用一个Container,而这个Container由ResourceManager为其申请和启动。
RMApp是ResourceManager中用于维护一个Application生命周期的数据结构,它的实现是RMAppImpl类,该类维护了一个Application状态机,记录了一个Application可能存在的各个状态(RMAppState)以及导致状态间转换的事件(RMAppEvent)
在RM看来,每个Application有9种基本状态(RMAppState)和12种导致这9种状态之间发生转移的事件(RMAppEventType),RMAppImpl的作用是等待接收其他对象发出的RMAppEventType类型的事件,然后根据当前状态和事件类型,将当前状态转移到另外一种状态,同时触发一种行为(实际就是执行一个函数,该函数可能会再次发出某种类型的事件)
RMApp状态机
RMAppAttempt是ResourceManager中用于维护一个Application 运行尝试(或者称为"Application Attempt")的生命周期的数据结构,它的实现是RMAppAttemptImpl,该类维护了一个状态机,记录了一个Application Attempt可能存在的各个状态以及导致状态间转换的事件。当某个事件发生时,RMAppAttemptImpl会根据实际情况进行Application Attempt状态转移,同时触发一个行为。除了维护状态机外,RMAppAttempt还保存了本次运行尝试的基本信息,包括当前使用的Container信息、ApplicationMaster Container对外tracking URL和RPC端口号等。由于在一次运行尝试中,最重要的组件是ApplicationMaster,它的当前状态可代表整个应用程序的当前状态,因此,RMAppAttemptImpl本质上是维护的ApplicationMaster生命周期。
RMContainer是ResourceManager中用于维护一个Container生命周期的数据结构,它的实现是RMContainerImpl类,该类维护了一个Container状态机,记录了一个Container可能存在的各个状态以及导致状态间转换的事件,当某个事件发生时,RMContainerImpl会根据实际情况进行Container状态转移,同时触发一个行为。
◆ 第6章 资源调度器
YARN自带了FIFO、Capacity Scheduler和Fair Scheduler三种常用资源调度器,当然,用户可按照接口规范编写一个新的资源调度器,并通过简单的配置使它运行起来。
YARN的资源分配过程是异步的,也就是说,资源调度器将资源分配给一个应用程序后,它不会立刻push给对应的ApplicationMaster,而是暂时放到一个缓冲区中,等待ApplicationMaster通过周期性的心跳主动来取。也就是说,YARN采用了pull-based通信模型,而不是push-based模型,这与MRv1是一致的。
资源抢占是通过杀死正在使用的资源Container实现的,由于这些Container已经处于运行状态,直接杀死Container会导致已经完成的计算白白浪费。为了尽可能避免资源浪费,YARN优先选择优先级低的Container作为资源抢占对象,且不会立刻杀死Container,而是将释放资源的任务留给应用程序自己:ResourceManager将待杀死的Container列表发送给对应的ApplicationMaster,以期望它采取一定的机制自行释放这些Container占用的资源,比如先进行一些状态保存工作后,再将对应的Container杀死,以避免计算浪费,如果一段时间后,ApplicationMaster尚未主动杀死这些Container,则ResourceManager再强制杀死这些Container。
YARN采用了层次结构组织队列,这将队列结构转换成了树形结构,这样资源分配过程实际上就是基于优先级的多叉树遍历的过程。在选择队列时,YARN采用了基于优先级的深度优先遍历方法,具体如下:从根队列开始,按照它的子队列资源使用率(相当于优先级)由小到大依次遍历各个子队列。如果子队列为叶子队列,则依次按照步骤2和步骤3中方法在队列中选择一个Container(请求),否则以该子队列为根队列,重复以上过程,直到找到一个合适的Container(请求)并退出。
同Capacity Scheduler类似,它以队列为单位划分资源,每个队列可设定一定比例的资源最低保证和使用上限,同时,每个用户也可设定一定的资源使用上限以防止资源滥用;当一个队列的资源有剩余时,可暂时将剩余资源共享给其他队列。
Fair策略是一种基于最大最小公平算法[插图]实现的资源多路复用方式,默认情况下,每个队列内部采用该方式分配资源。
Fair Scheduler允许用户将队列信息专门放到一个配置文件(默认是fair-scheduler.xml)
自学习调度器(Learning Scheduler)[插图]是一种基于贝叶斯分类算法的资源感知调度器,与现有的调度器不同,它更适用于异构Hadoop集群。该调度器的创新之处是将贝叶斯分类算法应用到调度器设计中。
动态优先级调度器的核心思想是在一定的预算约束下,根据用户提供的消费率按比例分配资源。管理员可根据集群资源总量为每个用户分配一定的预算和一个时间单元的长度(通常为10s~1min),而用户可根据自己的需要动态调整自己的消费率,即每个时间单元内单个Slot的价钱。
介绍了几种常见的多用户作业调度器,相比于FIFO调度器,多用户调度器能够更好地满足不同应用程序的服务质量要求。
◆ 第7章 NodeManager剖析
NodeManager(NM)是YARN中单个节点上的代理,它管理Hadoop集群中单个计算节点,功能包括与ResourceManger保持通信、管理Container的生命周期、监控每个Container的资源使用(内存、CPU等)情况、追踪节点健康状况、管理日志和不同应用程序用到的附属服务(auxiliaryservice)。
NodeManager与ResourceManager之间采用了“pull模型”(与MRv1类似),NodeManager 总是周期性地主动向ResourceManager发起请求,并领取下达给自己的命令。
应用程序的ApplicationMaster通过该RPC协议向NodeManager发起针对Container的相关 *** 作,包括启动Container、杀死Container、获取Container执行状态等。
当NodeManager启动时,该组件负责向ResourceManager注册,并汇报节点上总的可用资源(该值在运行过程中不再汇报);之后,该组件周期性与ResourceManager通信,汇报各个Container的状态更新,包括节点上正运行的Container、已完成的Container等信息,同时ResourceManager会为之返回待清理Container列表、待清理应用程序列表、诊断信息、各种Token等信息。
启动Container请求是由ApplicationMaster发起的,而杀死Container请求则可能来自ApplicationMaster或者ResourceManager
当NodeManager认为自己的健康状况“欠佳”时,可通知ResourceManager不再为之分配新任务,待健康状况好转时,再分配任务。该机制不仅可帮助及时发现存在问题的NodeManager,避免不必要的任务分配,也可以用于动态升级(通过脚本指示ResourceManager不再分配任务,等到NodeManager上面的任务运行完成后,对它进行升级)。
NodeManager上有专门一个服务判断所在节点的健康状况,该服务通过两种策略判断节点健康状况,第一种是通过管理员自定义的Shell脚本(NodeManager上专门有一个周期性任务执行该脚本,一旦该脚本输出以"ERROR"开头的字符串,则认为节点处于不健康状态),另一种是判断磁盘好坏(NodeManager上专门有一个周期性任务检测磁盘的好坏,如果坏磁盘数目达到一定的比例,则认为节点处于不健康状态)。
法
在Hadoop中,分布式缓存并不是将文件缓存到集群中各个节点的内存中,而是将文件缓存到各节点的本地磁盘上,以便执行任务时直接从本地磁盘上读取文件。
分布式缓存机制是由各个NodeManager实现的,主要功能是将应用程序需要的文件资源(一般是只读的)缓存到本地,以方便后续任务运行。资源缓存是用时触发的,也就是说,由第一个用到该资源的任务触发的,后续同类任务无须再次进行缓存,直接使用已经缓存好的即可。
NodeManager上的目录可分为两种:数据目录和日志目录,其中数据目录用于存放执行Container所需的数据(比如可执行程序或JAR包、配置文件等)和运行过程中产生的临时数据,由参数yarn.nodemanager.local-dirs指定,而日志目录则用于存放Container运行时输出日志,由参数yarn.nodemanager.log-dirs指定。
NodeManager在每个磁盘上为该作业创建了相同的目录结构,且采用轮询的调度方式将目录(磁盘)分配给不同的Container的不同模块以避免干扰。
为了避免大量Container日志“撑爆”磁盘空间,NodeManager将定期清理日志文件,该功能由组件LogHandler(当前存在两种实现:NonAggregatingLogHandler和LogAggregationService)完成。总起来说,NodeManager提供了定期删除(由NonAggregatingLogHandler实现)和日志聚集转存(由LogAggregation-Service实现)两种日志清理机制,默认情况下,采用的是定期删除机制。
日志聚集转存[插图],管理员可通过将配置参数yarn.log-aggregation-enable置为true启用该功能。该机制将HDFS作为日志聚集仓库,它将应用程序产生的日志上传到HDFS上,以便统一管理和维护。
一旦日志全部上传到HDFS后,本地磁盘上的日志文件将被删除。
NodeManager允许用户指定要上传的日志类型。当前支持的日志类型有三种:ALL_CONTAINERS(上传所有Container日志)、APPLICATION_MASTER_ONLY(仅上传ApplicationMaster产生的日志)和AM_AND_FAILED_CONTAINERS_ONLY(上传ApplicationMaster和运行失败的Container产生的日志),默认情况下采用ALL_CONTAINERS。
日志目录在HDFS上组织方式
到HDFS上的日志的生命周期不再由NodeManager负责,而是由JobHistory服务管理。比如对于MapReduce计算框架而言[插图],它专有的JobHistory负责定期清理MapReduce作业转存到HDFS上的日志,每个日志文件最多存留时间为yarn.log-aggregation.retain-seconds(单位是秒,默认为3×60×60,即3小时)。
NodeManager维护了三类状态机,分别是Application、Container和LocalizedResource,它们均直接或者间接参与维护一个应用程序的生命周期。
Container启动命令是由各个ApplicationMaster通过RPC函数ContainerManagement-Protocol#startContainer向NodeManager发起的,NodeManager中的ContainerManager组件(组件实现为ContainerManagerImpl)负责接收并处理该请求。
主要指前面提到的分布式缓存机制完成的工作,功能包括初始化各种服务组件、创建工作目录、从HDFS下载运行所需的各种资源(比如文本文件、JAR包、可执行文件)等。
分布式缓存完成的主要功能是文件下载,涉及大量的磁盘读写,因此整个过程采用了异步并发模型加快文件下载速度,以避免同步模型带来的性能开销。
Container资源清理是指Container运行完成之后(可能成功或者失败),NodeManager需回收它占用的资源,这些资源主要是Container运行时使用的临时文件,它们的来源主要是ResourceLocalizationService和ContainerExecutor两个服务/组件
由于应用程序可能涉及多种类型的计算任务,且这些计算任务之间有依赖关系,因此,NodeManager不能在Container运行完成之后立刻清理它占用的所有资源,尤其是产生的中间数据,而只有当所有Container运行完成之后,才能够全部清空这些资源。
资源隔离是指为不同任务提供可独立使用的计算资源以避免它们相互干扰。
YARN提供了两种可选方案:线程监控方案和基于轻量级资源隔离技术Cgroups的方案[插图]。默认情况下,YARN采用了进程监控的方案控制内存使用,即每个NodeManager会启动一个额外监控线程监控每个Container内存资源使用量,一旦发现它超过约定的资源量,则将其杀死。采用这种机制的另一个原因是Java中创建子进程采用了"fork()+exec()"的方案,子进程启动瞬间,它使用的内存量与父进程一致。从外面看来,一个进程使用的内存量可能瞬间翻倍,然后又降下来,采用线程监控的方法可防止这种情况下导致swap *** 作。另一种可选的方案则基于轻量级资源隔离技术Cgroups,Cgroups是Linux内核提供的d性资源隔离机制,可以严格限制内存使用上限,一旦进程使用资源量超过事先定义的上限值,则可将其杀死[插图](截至本书结稿时,该方案尚未合并到YARN内核中)。对于CPU资源,它是一种d性资源,它的量的大小不会直接影响应用程序的死活,因此采用了Cgroups。
Cgroups(Control groups)[插图]是Linux内核提供的一种可以限制、记录、隔离进程组所使用的物理资源(如CPU、内存、IO等)的机制
Cgroups提供了以下功能:❑限制进程组使用的资源量。比如,内存子系统可以为进程组设定一个内存使用上限,一旦进程组使用的内存达到限额再申请内存,就会出发OOM。❑进程组的优先级控制。比如,可以使用CPU子系统为某个进程组分配特定cpu share(下面将会介绍,Hadoop YARN正是使用了该功能)。❑对进程组使用的资源量进行记账。比如,可以记录某个进程组使用的CPU时间,以便对不同进程组拥有者计费。❑进程组控制。比如,可将某个进程组挂起和恢复。
Memory子系统可用于限定一个进程的内存使用上限,一旦超过该限制,将认为它OOM,会将其杀死。
相比于线程监控,Cgroups是一种更加严格和有效的资源限制方法,相比于虚拟机(VirtualMachine,VM),Cgroups是一种轻量级资源隔离方案,且已被越来越广泛地使用。YARN采用了Cgroups对CPU资源进行隔离。
NodeManager未启用任何CPU资源隔离机制,如果想启用该机制,需使用LinuxContainerExecutor,它能够以应用程序提交者的身份创建文件、运行Container和销毁Container。相比于DeafultContainerExecutor采用NodeManager启动者的身份执行这些 *** 作,LinuxContainerExecutor的这种方式要安全得多。
◆ 第8章 离线计算框架MapReduce
作业的每个任务资源需求可描述为5元组
,分别表示作业优先级、期望资源所在的host、资源量(当前支持内存和CPU两种资源)、Container数目、是否松弛本地性。
MapReduce客户端是MapReduce用户与YARN(和MRAppMaster)进行通信的唯一途径,通过该客户端,用户可以向YARN提交作业,获取作业的运行状态和控制作业(比如杀死作业、杀死任务等)。
对于小作业,为了降低其延迟,可采用Uber模式,在该模式下,所有Map Task和Reduce Task在同一个Container(MRAppMaster所在Container)中顺次执行;对于大作业,则采用Non-Uber 模式,在该模式下,MRAppMaster先为Map Task申请资源,当Map Task运行完成数目达到一定比例后再为Reduce Task申请资源。
对于小作业而言,MRAppMaster无须再为每个任务分别申请资源,而是让其重用一个Container,并按照先Map Task后Reduce Task的运行方式串行执行每个任务。
与MRv1一样,MRAppMaster根据InputFormat组件的具体实现(通常是根据数据量切分数据),将作业分解成若干个Map Task和Reduce Task,其中每个Map Task处理一片InputSplit数据,而每个Reduce Task则进一步处理Map Task产生的中间结果。每个Map/Reduce Task只是一个具体计算任务的描述,真正的任务计算工作则是由运行实例TaskAttempt完成的,每个Map/Reduce Task可能顺次启动多个运行实例,比如第一个运行实例失败了,则另起一个实例重新计算,直到这一份数据处理完成或者达到尝试次数上限;也可能同时启动多个运行实例,让它们竞争同时处理一片数据。
作业表示方式
Task状态机维护了一个任务的生命周期,即从创建到运行结束整个过程。一个任务可能存在多次运行尝试,每次运行尝试被称为一个“运行实例”,Task状态机则负责管理这些运行实例。
在YARN中,任务实例是运行在Container中的,因此,Container状态变化往往伴随任务实例的状态变化,比如任务实例运行完成后,会清理Container占用的空间,而Container空间的清理实际上就是任务实例空间的清理。
MRAppMaster仍采用了MRv1中的数据处理引擎,分别由数据处理引擎MapTask和ReduceTask完成Map任务和Reduce任务的处理
在Hadoop 2.0中,MRAppMaster改用Netty—另一种开源的客户/服务器端编程框架,由于它内部采用了Java NIO技术,故其相比Jetty更加高效。Netty社区也比Jetty的更加活跃,且稳定性更好。
MRv1版本中,在Shuffle过程中,Reduce Task会为每个数据分片建立一个专门的HTTP连接(One-connection-per-map),即使多个分片同时出现在一个TaskTracker上也是如此。为了提高数据复制效率,Hadoop 2.0尝试采用批拷贝技术:不再为每个Map Task建立一个HTTP连接,而是为同一个TaskTracker上的多个Map Task建立一个HTTP连接,进而能够一次读取多个数据分片
MapReduce固有的实现中采用了基于排序的数据聚集算法,且排序统一采用了自己实现的快速排序算法。
◆ 第9章 DAG计算框架Tez
在一些应用场景中,为了套用MapReduce模型解决问题,不得不将问题分解成若干个有依赖关系的子问题,每个子问题对应一个MapReduce作业,最终所有这些作业形成一个有向图(Directed Acyclic Graph,DAG)。在该DAG中,由于每个节点是一个MapReduce作业,因此它们均会从HDFS上读一次数据和写一次数据(默认写三份),即使中间节点的产生数据仅是临时数据。很显然,这种表达作业依赖关系的方式低效的问题,进而会产生大量不必要的磁盘和网络IO的浪费。
Tez是从MapReduce计算框架演化而来的通用DAG计算框架,可作为MapReduceR/Pig/Hive等系统的底层数据处理引擎,它天生融入Hadoop2.0中的资源管理平台YARN,且由Hadoop 2.0核心人员精心打造,这势必会使其成为计算框架中的后起之秀。
以上运行DAG作业的方式是低效的,根本原因是作业之间的数据不是直接流动的,而是借助HDFS作为共享数据存储系统,即一个作业将处理后产生的数据写入HDFS,另一个依赖于该作业的作业需再从HDFS上重新读取数据进行处理
DAG计算框架Tez。它直接源于MapReduce框架,核心思想是将Map和Reduce两个 *** 作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output,Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等,这样,这些分解后的元 *** 作可以灵活组合,产生新的 *** 作。这些 *** 作经过一些控制程序组装后,可形成一个大的DAG作业。
在Shuffle中,Shuffle阶段和Merge阶段是并行进行的,当远程复制数据量达到一定阈值后,便会触发相应的合并线程对数据进行合并。
Tez的引入正是为了提高存在依赖关作业的运行效率,使用Tez改写这两个MapReduce程序,只需编写一个Tez DAG作业GroupByOrderEmployee,该作业的计算逻辑是Map-Reduce-Reduce
Tez提供的MR到DAG转换工具只适合"MAP-REDUCE+"类型的作业,也就是说,在传统的Map和Reduce两阶段之间穿插了若干个Reduce阶段。
对于DAG中的一个顶点(Vertex),它由一定数目的任务(Task)组成,这些任务分别处理输入数据集中的一份数据,一旦所有任务运行完成,则意味着该顶点运行完成;对于任何一个任务,它可能对应多个运行实例TaskAttempt,比如任务刚开始运行时,会创建一个运行实例,如果该实例运行失败,则会再启动一个实例重新运行;如果该实例运行速度过慢,则会为它再启动一个相同实例同时处理一份数据,这些机制均借鉴了MRv1中的设计。
当用户向YARN中提交一个DAG应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是启动DAGAppMaster;第二个阶段是由DAGAppMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行成功。
Apache Tez提出了一系列优化技术,其中值得一说的是ApplicationMaster缓冲池、预先启动Container、Container重用三项优化技术
在Apache Tez中,用户并不是直接将作业提交到ResouceManager上,而是提交到一个称为AMPoolServer的服务上。该服务启动后,会预启动若干个ApplicationMaster,形成一个ApplicationMaster缓冲池,这样,当用户提交作业时,直接将作业提交到某个已经启动的ApplicationMaster上即可。
Hortonworks正在尝试将Tez应用到Hive引擎中[插图](下一代Hive引擎被称为Stinger),从而依靠Tez数据处理引擎的更灵活的表达方式为Hive带来性能的提升。
◆ 第10章 实时/内存计算框架Storm/Spark
Hadoop MapReduce适合离线批处理场景,而不擅长实时计算和近实时计算相关的应用场景。为了克服MapReduce这方面的不足,Twitter开源了实时计算框架Storm,伯克利开发了基于内存的MapReduce实现Spark,这两种计算框架可用于实时计算和交互式计算领域,与HadoopMapReduce的离线批处理应用场景互补。
Storm的数据源是动态的,即收到一条便处理一条,而MapReduce的数据源是静态的,即数据被处理前整个数据集已经确定,且计算过程中不能被修改。实时计算框架能够解决很多实际应用问题,比如广告推荐、用户行为日志实时分析等。
Storm仍采用了Master/Slave架构,但是Master与Slave不直接通信,而是通过Zookeeper间接通信(见图10-3),这使得Storm在容错性方面表现异常优秀
Storm由两种节点组成:(一个)控制节点和(多个)工作节点。控制节点上面运行一个名为Nimbus的服务,它的作用类似于Hadoop MapReduce(MRv1)中的JobTracker,Nimbus负责在集群里面分发代码、分配计算任务给机器并监控这些计算机的状态;每一个工作节点上面运行一个名为Supervisor的服务,Supervisor作用是根据需要启动/关闭工作进程(Worker)。
在YARN上开发一个应用程序,通常而言,需要开发两个组件,分别是客户端和ApplicationMaster,其中客户端的主要作用是将应用程序提交到YARN上,并与YARN和ApplicationMaster交互,完成用户发送的一些指令;而ApplicationMaster则负责向YARN申请资源,并与NodeManager通信,以启动任务。
不修改Storm任何源代码即可让Storm运行在YARN上,最简单的实现方法是将Storm的各个服务组件(包括Nimbus和Supervisor)作为单独的任务运行在YARN上,而Zookeeper则作为一个公共的服务运行在YARN集群之外的几个节点上。
Spark[插图]发源于美国加州大学伯克利分校AMPLab实验室的集群计算平台,它克服了MapReduce在迭代式计算和交互式计算方面的不足,通过引入RDD(Resilient Distributed Datasets)数据表示模型,能够很好地解决MapReduce不易解决的问题。相比于MapReduce,Spark能够充分利用内存资源提高计算效率,因此本书将Spark归为“内存计算框架”
Spark引入了d性分布式数据集(RDD),它是一个有容错机制、可以被并行 *** 作的数据集合,能够被缓存到内存中,供其他计算使用,而不必像MapReduce那样每次都从HDFS上重新加载数据。
Spark中最主要的抽象是将数据集抽象成RDD,这是一种分布到各个节点上的数据集合,且能够被并行处理。RDD是通过HDFS上的文件或者其他Scala数据集创建的,并可以通过一定的逻辑转换成另外一种RDD。此外,RDD可被缓存到内存中,进而被多个并行执行的任务重用。最后,RDD是能够容错的,当一个节点宕掉后,丢失的RDD可被重构。
Spark中最核心的概念是d性分布式数据集(RDD),一个有容错机制,可以被并行 *** 作的集合。目前主要有两种类型的RDD:并行集合,接收一个已经存在的Scala集合,并在它上面运行各种并发计算;Hadoop数据集,类似于MapReduce模式,可在一个数据片的每条记录上,运行各种函数,但比MapReduce更加灵活。这两种集合继承自相同的父类,可以通过一系列相同的算子进行 *** 作。
每次RDD转换是通过重新计算得到结果的,然而,为了便于RDD数据集重复利用,用户可使用persist或cache方法将RDD数据集缓存到内存或者磁盘中。当然,用户可灵活选择将RDD数据集缓存到内存中、磁盘上或其他节点上。缓存是Spark中构造迭代算法的关键工具,甚至可以在解释器中交互使用。
RDD是能够容错的,如果一个RDD分片丢失了,Spark能通过最初构建它的转换算法重构它。此外,用户可使用不同的缓存级别对RDD进行缓存。比如,可将数据集缓存到磁盘上,或者以Java序列化对象的形式缓存到内存中,甚至复制到多个节点上。
广播变量允许用户保留一个只读的变量并将之缓存到每一台机器上(并非每个任务单独保存一份副本)。广播变量是从变量v创建的,通过调用SparkContext.broadcast(v)方法实现变量广播。这个广播变量是一个v的分装器,它只可通过调用value方法获得。
累加器是只支持加法 *** 作的变量,可以高效地并行化。用户可以用它实现计数器和变量求和。Spark原生支持Int和Double类型的计数器,用户可根据需要添加新的类型。
Spark中每个应用程序维护了自己的一套运行时环境,该运行时环境在应用程序开始运行时构建,在运行结束时销毁。
窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区和两个父RDD的分区对应于一个子RDD的分区。
宽依赖指子RDD的分区依赖于父RDD的所有分区
应用程序运行过程中,Spark按RDD依赖关系将应用程序划分成若干个Stage,每个Stage启动一定数目的任务进行并行处理。Spark采用了贪心算法划分阶段,即如果子RDD的分区到父RDD的分区是窄依赖,就可以实施经典的fusion优化,把对应 *** 作划分到一个阶段。如果连续的变换算子序列都是窄依赖,就可以把很多个 *** 作并到一个阶段,直到遇到一个宽依赖。这不但减少了大量的全局barrier,而且无须物化很多中间结果RDD,这将极大地提升性能。Spark把这个称为流水线(pipeline)优化。
Spark On YARN和Storm On YARN体现了YARN的两种不同使用方式,从资源分配粒度上讲,前者可以仿照Spark On Mesos实现细粒度资源分配机制[插图],这样,它运行的是应用程序,一个应用程序申请自己需要的资源,用完后立刻归还给YARN,这有利于应用程序间的资源共享;后者则是一种粗粒度的资源分配,它运行的是Storm服务,即将Storm自动化部署到YARN上,使Storm集群变成一个d性实时计算平台(计算资源可根据需要动态伸缩),由于服务启动时通常会将所需要的资源全部占下,以便用户提交应用程序时直接使用,因此是一种粗粒度的资源分配,不利于资源共享。从运行的应用程序类型看,前者运行的是短作业,运行时间通常是分钟或者小时级别的;而后者则是长作业,实际上是服务,通常永远不会终止,除非管理员主动杀死它或者服务故障。
Storm用于解决实时计算问题,即数据源源不断地流入系统而被实时分析和处理,常见的应用场景是广告推荐系统、实时日志分析等;Spark可用于迭代式计算和交互式计算等场景,它基于RDD的模型允许用户对访问频繁的数据集进行缓存以减少计算开销。
◆ 第11章 Facebook Corona剖析
Corona是Facebook于2012年11月开源的下一代MapReduce框架,它的设计目标与YARN类似
Corona也采用了Master/Slave结构,它们的基本思想也是一致的,即将JobTracker拆分成了两个独立的服务:全局的资源管理器ClusterManager和每个应用程序特有的CoronaJobTracker,其中ClusterManager负责整个系统的资源管理和分配,而CoronaJobTracker负责单个应用程序的管理。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)