- RM高可用
- 基于ZK的HA机制
- RM状态信息(元数据)与高可用机制
- 基于ZK的RM状态信息获取(支持高可用)
- 基于FileSystem的RM状态信息获取(不支持高可用)
- 基于内存的RM状态信息获取(不支持高可用)
- 状态信息恢复
- 计算高可用
- task失败
- job失败
- 参考
对于YARN的介绍,可以参考之前的文章:
大数据理论与实践4 分布式资源管理系统YARN
Hadoop官方文档对于RM的高可用是这样描述的:ResourceManager (RM) 负责跟踪集群中的资源,并调度应用程序。在Hadoop2.4之前是存在单点故障的。高可用机制是通过Active/Standby ResourceManager对的形式添加冗余,来消除这种单点故障。
当RM发生故障需要进行故障转移时,有三种方式:
- 手动转换和故障转移
管理员应该首先将 Active-RM 转换为 Standby,然后将 Standby-RM 转换为 Active。可通过命令行来进行。 - 自动故障转移
RM 可以选择嵌入基于Zookeeper的ActiveStandbyElector来决定哪个 RM 应该是 Active。不需要像HDFS那样运行单独的ZKFC守护进程,因为嵌入在RM中的ActiveStandbyElector充当故障检测器和领导选举器,而不是单独的ZKFC守护进程。
下面源码阅读主要是针对RM故障转移的。
首先在ResourceManager里面中,serviceInit()方法中,会对当前的高可用进行判断:
如果yarn.resourcemanager.ha.enabled配置参数为true(默认false),则为启用RM高可用;
protected void serviceInit(Configuration conf) throws Exception { ... // Set HA configuration should be done before login this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); if (this.rmContext.isHAEnabled()) { //验证高可用信息 HAUtil.verifyAndSetConfiguration(this.conf); } ... }
在HAUtil中,会对RM的高可用信息进行验证:
public static void verifyAndSetConfiguration(Configuration conf) throws YarnRuntimeException { //验证在配置文件的yarn.resourcemanager.ha.rm-ids中至少有两个RM-id //并且RPC地址(RPC是RM与NM、AM之间的通信协议)是指定的 //将符合这两个条件的写回到配置文件中,相当于一个过滤 verifyAndSetRMHAIdsList(conf); //验证当前的RM的id(yarn.resourcemanager.ha.id) //在过滤后的yarn.resourcemanager.ha.rm-ids是存在的 verifyAndSetCurrentRMHAId(conf); //判断是否开启了选主服务 verifyLeaderElection(conf); //判断是否配置齐了 //RM address、RM_SCHEDULER_ADDRESS、RM_ADMIN_ADDRESS等信息 verifyAndSetAllServiceAddresses(conf); }
如果开启了RM高可用,会获取到yarn.resourcemanager.ha.automatic-failover.enabled(默认true),将RM设置为启用自动故障转移。
如果yarn.resourcemanager.ha.automatic-failover.embedded为true(默认false),则会在ResourceManager初始化过程中,调用createEmbeddedElector()创建选举器。
//创建选举器,由于配置被弃用,默认创建ActiveStandbyElectorbasedElectorService选举器 protected EmbeddedElector createEmbeddedElector() throws IOException { EmbeddedElector elector; curatorEnabled = //false conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); if (curatorEnabled) { this.zkManager = createAndStartZKManager(conf); elector = new CuratorbasedElectorService(this); } else { elector = new ActiveStandbyElectorbasedElectorService(this); } return elector; }
在hadoop3.3.1中,配置yarn.resourcemanager.ha.automatic-failover.curator-leader-elector.enabled已经被弃用了,因此选举器就是ActiveStandbyElector,它是基于ZK选主实现的。
基于ZK的HA机制在ActiveStandbyElector中,被选举成为Active的RM会调用AdminService的transitionToActive()函数,把RM状态转换为Active:
public void becomeActive() throws ServiceFailedException { cancelDisconnectTimer(); try { //调用AdminService的transitionToActive()函数,把RM状态转换为Active rm.getRMContext().getRMAdminService().transitionToActive(req); } catch (Exception e) { throw new ServiceFailedException("RM could not transition to Active", e); } }
AdminService是提供管理员服务的RPC请求(类似的,ClientRMService为普通用户提供服务)。在AdminService中:
public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { if (isRMActive()) { return; } // call refreshAdminAcls before HA state transition // 先刷新AdminACL,防止adminacl被之前的activeRM修改 // for the case that adminAcls have been updated in previous active RM try { refreshAdminAcls(false); } catch (YarnException ex) { throw new ServiceFailedException("Can not execute refreshAdminAcls", ex); } //检查是否有管理员权限,如果有就返回当前的user,没有就扔异常 UserGroupInformation user = checkAccess("transitionToActive"); //检查更改此节点HA状态的请求是否有效。例如,开启了自动故障切换但是控制台发来切换非强制的指令,这个来自控制台的指令是不能工作的(扔异常) checkHaStateChange(reqInfo); try { // call all refresh*s for active RM to get the updated configurations. //全部refresh,让active RM获取到最新的配置 refreshAll(); } catch (Exception e) { rm.getRMContext() .getDispatcher() .getEventHandler() .handle( new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e, "failure to refresh configuration settings")); throw new ServiceFailedException( "Error on refreshAll during transition to Active", e); } try { //核心:rm切换到active rm.transitionToActive(); } catch (Exception e) { RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", "", "RM", "Exception transitioning to active"); throw new ServiceFailedException( "Error when transitioning to Active mode", e); } RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", "RM"); }
要想将RM转换为Active,首先需要进行各种权限检查,并且让这个RM或得到最新的各种信息。
可以看到,在这个方法中调用了ResourceManager的transitionToActive()方法:
//将RM转换到Active synchronized void transitionToActive() throws Exception { //如果本身是Active,返回 if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) { LOG.info("Already in active state"); return; } LOG.info("Transitioning to active state"); this.rmLoginUGI.doAs(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { try { //启动RMActiveServices startActiveServices(); return null; } catch (Exception e) { reinitialize(true); throw e; } } }); rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE); LOG.info("Transitioned to active state"); }
调用了startActiveServices()函数,启动RMActiveServices
void startActiveServices() throws Exception { if (activeServices != null) { clusterTimeStamp = System.currentTimeMillis(); activeServices.start(); } }
这里的start()(定义在org.apache.hadoop.service AbstractService.java)会调用RMActiveServices(ResourceManager内部类)serviceStart()函数来开启RM Active的服务。
这里涉及了RMStateStore,是实现ResourceManager状态存储的基类。它的作用稍后会进行分析,可以理解为它负责存储和读取RM的状态。
protected void serviceStart() throws Exception { //RMStateStore是实现ResourceManager状态存储的基类。 //负责异步通知和与对象的接口。 //真正的存储实现需要从中派生并实现阻塞存储和加载方法, //以实际存储和加载状态。 RMStateStore rmStore = rmContext.getStateStore(); // The state store needs to start irrespective of recoveryEnabled as apps // need events to move to further states. rmStore.start(); //如果开启了自动重启:yarn.resourcemanager.recovery.enabled:true if(recoveryEnabled) { try { LOG.info("Recovery started"); //检查版本信息 //因为存储的内容可能随着后续版本的更新而改变 //需要通过增加版本号来区分 //高版本能兼容低版本 rmStore.checkVersion(); if (rmContext.isWorkPreservingRecoveryEnabled()) { //设置Epoch值,ZK选举的时候会用到 rmContext.setEpoch(rmStore.getAndIncrementEpoch()); } //读取RM状态 RMState state = rmStore.loadState(); //恢复RM状态 recover(state); LOG.info("Recovery ended"); } catch (Exception e) { // the Exception from loadState() needs to be handled for // HA and we need to give up master status if we got fenced LOG.error("Failed to load/recover state", e); throw e; } } else { //如果开启了YARN Federation,默认关闭 if (HAUtil.isFederationEnabled(conf)) { //更新epoch long epoch = conf.getLong(YarnConfiguration.RM_EPOCH, YarnConfiguration.DEFAULT_RM_EPOCH); rmContext.setEpoch(epoch); LOG.info("Epoch set for Federation: " + epoch); } } super.serviceStart(); }
它的核心是进行数据的同步,那么具体什么数据被同步了呢,是怎样被同步的呢?
RM状态信息(元数据)与高可用机制在上面的代码中,可以看出,启动Active RM的时候,核心是获取到状态信息,然后恢复状态。
//上面代码中的一段核心代码 //读取RM状态 RMState state = rmStore.loadState(); //恢复RM状态 recover(state); LOG.info("Recovery ended");
这里的rmStore是RMStateStore虚基类的实例。
从官方文档可以知道,RMStateStore是实现ResourceManager状态存储的基类。负责异步通知和与对象的接口。真正的存储实现需要从中派生并实现阻塞存储和加载方法,以实际存储和加载状态。
在Hadoop3.3.1中,提供了五种RM状态信息的存储方式,分别是:
- MemoryRMStateStore:信息状态保存在内存中的实现类
- FileSystemRMStateStore:信息状态保存在HDFS文件系统中,如HDFS 或本地FS
- NullRMStateStore:所有函数都是空的,什么都不做,不保存应用状态信息。
- ZKRMStateStore:信息状态保存在Zookeeper中。
- LeveldbRMStateStore:基于 LevelDB 的状态存储实现
存储方式默认值为FileSystemRMStateStore。
在初始化RMActiveServiceContext(维护Active的RM状态信息的类)的过程中,StateStore会被置为NullRMStateStore();在ResourceManager初始化过程中,会读取配置信息yarn.resourcemanager.store.class,在RMStateStoreFactory类中通过反射机制,获取到指定的存储方式,并初始化。
注意:用户可以自由选择任何存储来设置 RM 重启,但必须使用基于 ZooKeeper 的状态存储来支持RM高可用。因为只有基于ZooKeeper的 state-store支持fencing机制,以避免出现裂脑情况。基于Hadoop文件系统存储的是不支持RM高可用高可用的。基于LevelDB的状态存储被认为比基于HDFS和ZooKeeper的状态存储更轻,每次状态更新的I/O *** 作更少,文件系统上的文件总数也少得多,但是也不支持高可用。
protected void serviceInit(Configuration configuration) throws Exception { //.... RMStateStore rmStore = null; if (recoveryEnabled) { //获取rmStore rmStore = RMStateStoreFactory.getStore(conf); boolean isWorkPreservingRecoveryEnabled = conf.getBoolean( YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); rmContext .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled); } else { rmStore = new NullRMStateStore(); } try { rmStore.setResourceManager(rm); rmStore.init(conf); rmStore.setRMDispatcher(rmDispatcher); } catch (Exception e) { // the Exception from stateStore.init() needs to be handled for // HA and we need to give up master status if we got fenced LOG.error("Failed to init state store", e); throw e; } //设置RMStore rmContext.setStateStore(rmStore); }
那么接下来看通过各种方式获取和保存具体状态信息的。
基于ZK的RM状态信息获取(支持高可用)在ZK中存储的状态信息如下:
The znode structure is as follows: ROOT_DIR_PATH |--- VERSION_INFO |--- EPOCH_NODE |--- RM_ZK_FENCING_LOCK |--- RM_APP_ROOT | |----- HIERARCHIES | | |----- 1 | | | |----- (#ApplicationId barring last character) | | | | |----- (#Last character of ApplicationId) | | | | | |----- (#ApplicationAttemptIds) | | | .... | | | | | |----- 2 | | | |----- (#ApplicationId barring last 2 characters) | | | | |----- (#Last 2 characters of ApplicationId) | | | | | |----- (#ApplicationAttemptIds) | | | .... | | | | | |----- 3 | | | |----- (#ApplicationId barring last 3 characters) | | | | |----- (#Last 3 characters of ApplicationId) | | | | | |----- (#ApplicationAttemptIds) | | | .... | | | | | |----- 4 | | | |----- (#ApplicationId barring last 4 characters) | | | | |----- (#Last 4 characters of ApplicationId) | | | | | |----- (#ApplicationAttemptIds) | | | .... | | | | |----- (#ApplicationId1) | | |----- (#ApplicationAttemptIds) | | | |----- (#ApplicationId2) | | |----- (#ApplicationAttemptIds) | .... | |--- RM_DT_SECRET_MANAGER_ROOT |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME | |----- 1 | | |----- (#TokenId barring last character) | | | |----- (#Last character of TokenId) | | .... | |----- 2 | | |----- (#TokenId barring last 2 characters) | | | |----- (#Last 2 characters of TokenId) | | .... | |----- 3 | | |----- (#TokenId barring last 3 characters) | | | |----- (#Last 3 characters of TokenId) | | .... | |----- 4 | | |----- (#TokenId barring last 4 characters) | | | |----- (#Last 4 characters of TokenId) | | .... | |----- Token_1 | |----- Token_2 | .... | |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME | |----- Key_1 | |----- Key_2 .... |--- AMRMTOKEN_SECRET_MANAGER_ROOT |----- currentMasterKey |----- nextMasterKey |-- RESERVATION_SYSTEM_ROOT |------PLAN_1 | |------ RESERVATION_1 | |------ RESERVATION_2 | .... |------PLAN_2 .... |-- PROXY_CA_ROOT |----- caCert |----- caPrivateKey
可以看到,主要存储了版本,Epoch信息,Application信息,安全相关(SECRET_MANAGER)的信息等。
通过之前的可以看出,通过loadState()方法来获取状态信息。
loadState()方法在ZKRMStateStore中实现如下:
@Override public synchronized RMState loadState() throws Exception { long start = clock.getTime(); //需要注意:RMState是一个静态类 //这里或得到的是一个内存中维护的RMstate对象 RMState rmState = new RMState(); // recover DelegationTokenSecretManager // 获取RM_DT_SECRET_MANAGER_ROOT下的安全信息 loadRMDTSecretManagerState(rmState); // recover RM applications // 获取RM Application,对应RM_APP_ROOT下的信息 loadRMAppState(rmState); // recover AMRMTokenSecretManager // 获取对应AMRMTOKEN_SECRET_MANAGER_ROOT的信息 loadAMRMTokenSecretManagerState(rmState); // recover reservation state // 获取对应RESERVATION_SYSTEM_ROOT的信息 loadReservationSystemState(rmState); // recover ProxyCAManager state // 获取对应PROXY_CA_ROOT的信息 loadProxyCAManagerState(rmState); opDurations.addLoadStateCallDuration(clock.getTime() - start); return rmState; }
也就是说在loadState()的时候,把ZK里面存储的所有关于RM的信息全部或取下来,放在静态的rmState里面。
基于FileSystem的RM状态信息获取(不支持高可用)和基于ZK的基本一致,loadState()函数内容完全一致,只是存储位置不同。
在FileSystemRMStateStore中存储了变量FileSystem,来管理当前的文件系统类型。
public class FileSystemRMStateStore extends RMStateStore { //... protected FileSystem fs; //... }
存储位置是存储在配置文件中yarn.resourcemanager.fs.state-store.uri指定的位置。默认值为${hadoop.tmp.dir}/yarn/system/rmstore,如果未提供文件系统名称,将使用*conf/core-site.xml中指定的fs.default.name。
基于内存的RM状态信息获取(不支持高可用)获取到RMState(静态类)的信息。
public class MemoryRMStateStore extends RMStateStore { //... RMState state = new RMState(); //... }
当需要存储信息时,会将信息直接写入这个静态类的对象。
在loadState()时,会返回一个copy
public synchronized RMState loadState() throws Exception { // 返回状态的副本以允许修改真实状态 RMState returnState = new RMState(); returnState.appState.putAll(state.appState); returnState.rmSecretManagerState.getMasterKeyState() .addAll(state.rmSecretManagerState.getMasterKeyState()); returnState.rmSecretManagerState.getTokenState().putAll( state.rmSecretManagerState.getTokenState()); returnState.rmSecretManagerState.dtSequenceNumber = state.rmSecretManagerState.dtSequenceNumber; returnState.amrmTokenSecretManagerState = state.amrmTokenSecretManagerState == null ? null : AMRMTokenSecretManagerState .newInstance(state.amrmTokenSecretManagerState); if (state.proxyCAState.getCaCert() != null) { byte[] caCertData = state.proxyCAState.getCaCert().getEncoded(); returnState.proxyCAState.setCaCert(caCertData); } if (state.proxyCAState.getCaPrivateKey() != null) { byte[] caPrivateKeyData = state.proxyCAState.getCaPrivateKey().getEncoded(); returnState.proxyCAState.setCaPrivateKey(caPrivateKeyData); } return returnState; }状态信息恢复
再通过上面两种方法来获取到RM状态之后,就可以进行状态恢复。调用recover(state)方法,来将状态同步到当前RM上。
public void recover(RMState state) throws Exception { // recover RMdelegationTokenSecretManager rmContext.getRMDelegationTokenSecretManager().recover(state); // recover AMRMTokenSecretManager rmContext.getAMRMTokenSecretManager().recover(state); // recover reservations if (reservationSystem != null) { reservationSystem.recover(state); } // recover applications rmAppManager.recover(state); // recover ProxyCA rmContext.getProxyCAManager().recover(state); setSchedulerRecoveryStartAndWaitTime(state, conf); }
可以看出,恢复的也就是存储的那几块内容。
计算高可用内容来自《 Hadoop 权威指南第四版 》
task失败task 失败的第一种情况就是用户的 map task 或者 reduce task 代码在执行的之后抛出了运行时异常。如果发生了,就做以下 *** 作:
- task JVM 会在退出之前报告失败给他的 AppMaster。
- 错误会立刻写进用户的日志中。
- AppMaster 将 task 标记为 fail。
- 释放 task 的容器,以留给其他的 task 使用。
task 失败的另一种情况就是 JVM 意外的退出,在这种情况下, *** 作如下:
- node manager 注意到处理已经退出,就报告给 AppMaster。
- AppMaster 就会将 task 标记为 failed。
AppMaster注意到task有一段时间没有发来执行状态的更新,就将其标记为 filed。JVM进程会在当前周期(超时周期一般为 10 分钟,通过 mapreduce.task.timeout 属性来设置,值的单位为毫秒)后自动的被kill掉。
如果AppMaster被告知task的尝试失败了,他将会重新调度来执行该task。 AppMaster会尽量不将task重新调度到先前失败的node manager之下。默认情况下当 task 失败了四次,就不再重新运行他。尝试次数是可以设置的,通过 mapreduce.map.maxattempt 属性设置 map 尝试次数。通过 mapreduce.reduce.maxattempt 设置 reduce 尝试次数。
job失败在一些应用中,不希望因为少数的task失败而造成job中途夭折。这时,可以设置失败task的比例(mapreduce.map.failures.maxpercents 和 mapreduce.reduce.failures.maxpercents),失败的task在此比例内,就不会触发job的失败。当超过设置的比例,整个job失败后,RM会在其他NM上重启AM(默认2次)。
参考Apache Hadoop YARN 官方文档
Apache Hadoop 3.3.1源码
YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复
yarn3.2源码分析之ResourceManager基于zk的HA机制
YARN - Task, Node manager, AppMaster, Resource manager 失败时所做的处理
RM状态存储与还原机制详解
YARN Federation的架构设计
深入理解Hadoop HA机制
醒一醒,讲到 ZooKeeper 的选举机制了
hadoop HA 详解
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)