大数据理论与实践 源码分析 YARN高可用机制

大数据理论与实践 源码分析 YARN高可用机制,第1张

大数据理论与实践 源码分析 YARN高可用机制

YARN高可用机制
  • RM高可用
    • 基于ZK的HA机制
  • RM状态信息(元数据)与高可用机制
    • 基于ZK的RM状态信息获取(支持高可用)
    • 基于FileSystem的RM状态信息获取(不支持高可用)
    • 基于内存的RM状态信息获取(不支持高可用)
    • 状态信息恢复
  • 计算高可用
    • task失败
    • job失败
  • 参考

对于YARN的介绍,可以参考之前的文章:
大数据理论与实践4 分布式资源管理系统YARN

RM高可用

Hadoop官方文档对于RM的高可用是这样描述的:ResourceManager (RM) 负责跟踪集群中的资源,并调度应用程序。在Hadoop2.4之前是存在单点故障的。高可用机制是通过Active/Standby ResourceManager对的形式添加冗余,来消除这种单点故障。
当RM发生故障需要进行故障转移时,有三种方式:

  1. 手动转换和故障转移
    管理员应该首先将 Active-RM 转换为 Standby,然后将 Standby-RM 转换为 Active。可通过命令行来进行。
  2. 自动故障转移
    RM 可以选择嵌入基于Zookeeper的ActiveStandbyElector来决定哪个 RM 应该是 Active。不需要像HDFS那样运行单独的ZKFC守护进程,因为嵌入在RM中的ActiveStandbyElector充当故障检测器和领导选举器,而不是单独的ZKFC守护进程。

下面源码阅读主要是针对RM故障转移的。
首先在ResourceManager里面中,serviceInit()方法中,会对当前的高可用进行判断:
如果yarn.resourcemanager.ha.enabled配置参数为true(默认false),则为启用RM高可用;

protected void serviceInit(Configuration conf) throws Exception {
	...
    // Set HA configuration should be done before login
    this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
    if (this.rmContext.isHAEnabled()) {
    	//验证高可用信息
    	HAUtil.verifyAndSetConfiguration(this.conf);
    }
    ...
}

在HAUtil中,会对RM的高可用信息进行验证:

  
public static void verifyAndSetConfiguration(Configuration conf)
    throws YarnRuntimeException {
    //验证在配置文件的yarn.resourcemanager.ha.rm-ids中至少有两个RM-id
    //并且RPC地址(RPC是RM与NM、AM之间的通信协议)是指定的
    //将符合这两个条件的写回到配置文件中,相当于一个过滤
    verifyAndSetRMHAIdsList(conf);
    //验证当前的RM的id(yarn.resourcemanager.ha.id)
    //在过滤后的yarn.resourcemanager.ha.rm-ids是存在的
    verifyAndSetCurrentRMHAId(conf);
    //判断是否开启了选主服务
    verifyLeaderElection(conf);
    //判断是否配置齐了
    //RM address、RM_SCHEDULER_ADDRESS、RM_ADMIN_ADDRESS等信息
    verifyAndSetAllServiceAddresses(conf);
}

如果开启了RM高可用,会获取到yarn.resourcemanager.ha.automatic-failover.enabled(默认true),将RM设置为启用自动故障转移。
如果yarn.resourcemanager.ha.automatic-failover.embedded为true(默认false),则会在ResourceManager初始化过程中,调用createEmbeddedElector()创建选举器。

//创建选举器,由于配置被弃用,默认创建ActiveStandbyElectorbasedElectorService选举器
protected EmbeddedElector createEmbeddedElector() throws IOException {
    EmbeddedElector elector;
    curatorEnabled =
    	//false
        conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
            YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
    if (curatorEnabled) {
      this.zkManager = createAndStartZKManager(conf);
      elector = new CuratorbasedElectorService(this);
    } else {
      elector = new ActiveStandbyElectorbasedElectorService(this);
    }
    return elector;
  }

在hadoop3.3.1中,配置yarn.resourcemanager.ha.automatic-failover.curator-leader-elector.enabled已经被弃用了,因此选举器就是ActiveStandbyElector,它是基于ZK选主实现的。

基于ZK的HA机制

在ActiveStandbyElector中,被选举成为Active的RM会调用AdminService的transitionToActive()函数,把RM状态转换为Active:

public void becomeActive() throws ServiceFailedException {
    cancelDisconnectTimer();
    try {
    	//调用AdminService的transitionToActive()函数,把RM状态转换为Active
    	rm.getRMContext().getRMAdminService().transitionToActive(req);
    } catch (Exception e) {
    	throw new ServiceFailedException("RM could not transition to Active", e);
    }
  }

AdminService是提供管理员服务的RPC请求(类似的,ClientRMService为普通用户提供服务)。在AdminService中:

public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
    if (isRMActive()) {
      return;
    }
    // call refreshAdminAcls before HA state transition 
    // 先刷新AdminACL,防止adminacl被之前的activeRM修改
    // for the case that adminAcls have been updated in previous active RM
    try {
    	refreshAdminAcls(false);
    } catch (YarnException ex) {
    	throw new ServiceFailedException("Can not execute refreshAdminAcls", ex);
    }

    //检查是否有管理员权限,如果有就返回当前的user,没有就扔异常
    UserGroupInformation user = checkAccess("transitionToActive");
    //检查更改此节点HA状态的请求是否有效。例如,开启了自动故障切换但是控制台发来切换非强制的指令,这个来自控制台的指令是不能工作的(扔异常)
    checkHaStateChange(reqInfo);

    try {
      // call all refresh*s for active RM to get the updated configurations.
      //全部refresh,让active RM获取到最新的配置
    	refreshAll();
    } catch (Exception e) {
    	rm.getRMContext()
          .getDispatcher()
          .getEventHandler()
          .handle(
              new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e, "failure to refresh configuration settings"));
      throw new ServiceFailedException(
          "Error on refreshAll during transition to Active", e);
    }

    try {
      //核心:rm切换到active
    	rm.transitionToActive();
    } catch (Exception e) {
    	RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
          "", "RM",
          "Exception transitioning to active");
        throw new ServiceFailedException(
          "Error when transitioning to Active mode", e);
    }
    	RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
        "RM");
  }

要想将RM转换为Active,首先需要进行各种权限检查,并且让这个RM或得到最新的各种信息。
可以看到,在这个方法中调用了ResourceManager的transitionToActive()方法:

//将RM转换到Active
  synchronized void transitionToActive() throws Exception {
    //如果本身是Active,返回
    if (rmContext.getHAServiceState() == HAServiceProtocol.HAServiceState.ACTIVE) {
      	LOG.info("Already in active state");
      	return;
    }
    LOG.info("Transitioning to active state");
    this.rmLoginUGI.doAs(new PrivilegedExceptionAction() {
      @Override
      public Void run() throws Exception {
        try {
        	//启动RMActiveServices
        	startActiveServices();
          	return null;
        } catch (Exception e) {
          	reinitialize(true);
          	throw e;
        }
      }
  	}); 	
  	rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
    LOG.info("Transitioned to active state");
}

调用了startActiveServices()函数,启动RMActiveServices

void startActiveServices() throws Exception {
    if (activeServices != null) {
     	clusterTimeStamp = System.currentTimeMillis();
      	activeServices.start();
    }
}

这里的start()(定义在org.apache.hadoop.service AbstractService.java)会调用RMActiveServices(ResourceManager内部类)serviceStart()函数来开启RM Active的服务。
这里涉及了RMStateStore,是实现ResourceManager状态存储的基类。它的作用稍后会进行分析,可以理解为它负责存储和读取RM的状态。

protected void serviceStart() throws Exception {
	  //RMStateStore是实现ResourceManager状态存储的基类。
	  //负责异步通知和与对象的接口。
	  //真正的存储实现需要从中派生并实现阻塞存储和加载方法,
	  //以实际存储和加载状态。
      RMStateStore rmStore = rmContext.getStateStore();
      // The state store needs to start irrespective of recoveryEnabled as apps
      // need events to move to further states.
      rmStore.start();
	  //如果开启了自动重启:yarn.resourcemanager.recovery.enabled:true
      if(recoveryEnabled) {
        try {
          LOG.info("Recovery started");
          //检查版本信息
          //因为存储的内容可能随着后续版本的更新而改变
          //需要通过增加版本号来区分
          //高版本能兼容低版本
          rmStore.checkVersion();
          if (rmContext.isWorkPreservingRecoveryEnabled()) {
          	//设置Epoch值,ZK选举的时候会用到
            rmContext.setEpoch(rmStore.getAndIncrementEpoch());
          }
          //读取RM状态
          RMState state = rmStore.loadState();
          //恢复RM状态
          recover(state);
          LOG.info("Recovery ended");
        } catch (Exception e) {
          // the Exception from loadState() needs to be handled for
          // HA and we need to give up master status if we got fenced
          LOG.error("Failed to load/recover state", e);
          throw e;
        }
      } else {
      	//如果开启了YARN Federation,默认关闭
        if (HAUtil.isFederationEnabled(conf)) {
        	//更新epoch
          long epoch = conf.getLong(YarnConfiguration.RM_EPOCH,
              YarnConfiguration.DEFAULT_RM_EPOCH);
          	rmContext.setEpoch(epoch);
          	LOG.info("Epoch set for Federation: " + epoch);
        }
      }
      super.serviceStart();
    }

它的核心是进行数据的同步,那么具体什么数据被同步了呢,是怎样被同步的呢?

RM状态信息(元数据)与高可用机制

在上面的代码中,可以看出,启动Active RM的时候,核心是获取到状态信息,然后恢复状态。

//上面代码中的一段核心代码
//读取RM状态
RMState state = rmStore.loadState();
//恢复RM状态
recover(state);
LOG.info("Recovery ended");

这里的rmStore是RMStateStore虚基类的实例。
从官方文档可以知道,RMStateStore是实现ResourceManager状态存储的基类。负责异步通知和与对象的接口。真正的存储实现需要从中派生并实现阻塞存储和加载方法,以实际存储和加载状态。

在Hadoop3.3.1中,提供了五种RM状态信息的存储方式,分别是:

  • MemoryRMStateStore:信息状态保存在内存中的实现类
  • FileSystemRMStateStore:信息状态保存在HDFS文件系统中,如HDFS 或本地FS
  • NullRMStateStore:所有函数都是空的,什么都不做,不保存应用状态信息。
  • ZKRMStateStore:信息状态保存在Zookeeper中。
  • LeveldbRMStateStore:基于 LevelDB 的状态存储实现

存储方式默认值为FileSystemRMStateStore。
在初始化RMActiveServiceContext(维护Active的RM状态信息的类)的过程中,StateStore会被置为NullRMStateStore();在ResourceManager初始化过程中,会读取配置信息yarn.resourcemanager.store.class,在RMStateStoreFactory类中通过反射机制,获取到指定的存储方式,并初始化。
注意:用户可以自由选择任何存储来设置 RM 重启,但必须使用基于 ZooKeeper 的状态存储来支持RM高可用。因为只有基于ZooKeeper的 state-store支持fencing机制,以避免出现裂脑情况。基于Hadoop文件系统存储的是不支持RM高可用高可用的。基于LevelDB的状态存储被认为比基于HDFS和ZooKeeper的状态存储更轻,每次状态更新的I/O *** 作更少,文件系统上的文件总数也少得多,但是也不支持高可用。

protected void serviceInit(Configuration configuration) throws Exception 
{
//....
	  RMStateStore rmStore = null;
      if (recoveryEnabled) {
      //获取rmStore
        rmStore = RMStateStoreFactory.getStore(conf);
        boolean isWorkPreservingRecoveryEnabled =
            conf.getBoolean(
              YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
              YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
        rmContext
            .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
      } else {
        rmStore = new NullRMStateStore();
      }
      try {
        rmStore.setResourceManager(rm);
        rmStore.init(conf);
        rmStore.setRMDispatcher(rmDispatcher);
      } catch (Exception e) {
        // the Exception from stateStore.init() needs to be handled for
        // HA and we need to give up master status if we got fenced
        LOG.error("Failed to init state store", e);
        throw e;
      }
      //设置RMStore
      rmContext.setStateStore(rmStore);
}

那么接下来看通过各种方式获取和保存具体状态信息的。

基于ZK的RM状态信息获取(支持高可用)

在ZK中存储的状态信息如下:

 The znode structure is as follows:
 ROOT_DIR_PATH
 |--- VERSION_INFO
 |--- EPOCH_NODE
 |--- RM_ZK_FENCING_LOCK
 |--- RM_APP_ROOT
 |     |----- HIERARCHIES
 |     |        |----- 1
 |     |        |      |----- (#ApplicationId barring last character)
 |     |        |      |       |----- (#Last character of ApplicationId)
 |     |        |      |       |       |----- (#ApplicationAttemptIds)
 |     |        |      ....
 |     |        |
 |     |        |----- 2
 |     |        |      |----- (#ApplicationId barring last 2 characters)
 |     |        |      |       |----- (#Last 2 characters of ApplicationId)
 |     |        |      |       |       |----- (#ApplicationAttemptIds)
 |     |        |      ....
 |     |        |
 |     |        |----- 3
 |     |        |      |----- (#ApplicationId barring last 3 characters)
 |     |        |      |       |----- (#Last 3 characters of ApplicationId)
 |     |        |      |       |       |----- (#ApplicationAttemptIds)
 |     |        |      ....
 |     |        |
 |     |        |----- 4
 |     |        |      |----- (#ApplicationId barring last 4 characters)
 |     |        |      |       |----- (#Last 4 characters of ApplicationId)
 |     |        |      |       |       |----- (#ApplicationAttemptIds)
 |     |        |      ....
 |     |        |
 |     |----- (#ApplicationId1)
 |     |        |----- (#ApplicationAttemptIds)
 |     |
 |     |----- (#ApplicationId2)
 |     |       |----- (#ApplicationAttemptIds)
 |     ....
 |
 |--- RM_DT_SECRET_MANAGER_ROOT
        |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
        |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
        |       |----- 1
        |       |      |----- (#TokenId barring last character)
        |       |      |       |----- (#Last character of TokenId)
        |       |      ....
        |       |----- 2
        |       |      |----- (#TokenId barring last 2 characters)
        |       |      |       |----- (#Last 2 characters of TokenId)
        |       |      ....
        |       |----- 3
        |       |      |----- (#TokenId barring last 3 characters)
        |       |      |       |----- (#Last 3 characters of TokenId)
        |       |      ....
        |       |----- 4
        |       |      |----- (#TokenId barring last 4 characters)
        |       |      |       |----- (#Last 4 characters of TokenId)
        |       |      ....
        |       |----- Token_1
        |       |----- Token_2
        |       ....
        |
        |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
        |      |----- Key_1
        |      |----- Key_2
                ....
 |--- AMRMTOKEN_SECRET_MANAGER_ROOT
        |----- currentMasterKey
        |----- nextMasterKey

 |-- RESERVATION_SYSTEM_ROOT
        |------PLAN_1
        |      |------ RESERVATION_1
        |      |------ RESERVATION_2
        |      ....
        |------PLAN_2
        ....
 |-- PROXY_CA_ROOT
        |----- caCert
        |----- caPrivateKey

可以看到,主要存储了版本,Epoch信息,Application信息,安全相关(SECRET_MANAGER)的信息等。
通过之前的可以看出,通过loadState()方法来获取状态信息。
loadState()方法在ZKRMStateStore中实现如下:

  @Override
  public synchronized RMState loadState() throws Exception {
    long start = clock.getTime();
    //需要注意:RMState是一个静态类
    //这里或得到的是一个内存中维护的RMstate对象
    RMState rmState = new RMState();
    // recover DelegationTokenSecretManager
    // 获取RM_DT_SECRET_MANAGER_ROOT下的安全信息
    loadRMDTSecretManagerState(rmState);
    // recover RM applications
    // 获取RM Application,对应RM_APP_ROOT下的信息
    loadRMAppState(rmState);
    // recover AMRMTokenSecretManager
    // 获取对应AMRMTOKEN_SECRET_MANAGER_ROOT的信息
    loadAMRMTokenSecretManagerState(rmState);
    // recover reservation state
    // 获取对应RESERVATION_SYSTEM_ROOT的信息
    loadReservationSystemState(rmState);
    // recover ProxyCAManager state
    // 获取对应PROXY_CA_ROOT的信息
    loadProxyCAManagerState(rmState);
    opDurations.addLoadStateCallDuration(clock.getTime() - start);
    return rmState;
  }

也就是说在loadState()的时候,把ZK里面存储的所有关于RM的信息全部或取下来,放在静态的rmState里面。

基于FileSystem的RM状态信息获取(不支持高可用)

和基于ZK的基本一致,loadState()函数内容完全一致,只是存储位置不同。
在FileSystemRMStateStore中存储了变量FileSystem,来管理当前的文件系统类型。

public class FileSystemRMStateStore extends RMStateStore {
	//...
	protected FileSystem fs;
	//...
}

存储位置是存储在配置文件中yarn.resourcemanager.fs.state-store.uri指定的位置。默认值为${hadoop.tmp.dir}/yarn/system/rmstore,如果未提供文件系统名称,将使用*conf/core-site.xml中指定的fs.default.name。

基于内存的RM状态信息获取(不支持高可用)

获取到RMState(静态类)的信息。

public class MemoryRMStateStore extends RMStateStore {
  	//...
  	RMState state = new RMState();
  	//...
}

当需要存储信息时,会将信息直接写入这个静态类的对象。
在loadState()时,会返回一个copy

public synchronized RMState loadState() throws Exception {
    // 返回状态的副本以允许修改真实状态
    RMState returnState = new RMState();
    returnState.appState.putAll(state.appState);
    returnState.rmSecretManagerState.getMasterKeyState()
      .addAll(state.rmSecretManagerState.getMasterKeyState());
    returnState.rmSecretManagerState.getTokenState().putAll(
      state.rmSecretManagerState.getTokenState());
    returnState.rmSecretManagerState.dtSequenceNumber =
        state.rmSecretManagerState.dtSequenceNumber;
    returnState.amrmTokenSecretManagerState =
        state.amrmTokenSecretManagerState == null ? null
            : AMRMTokenSecretManagerState
              .newInstance(state.amrmTokenSecretManagerState);
    if (state.proxyCAState.getCaCert() != null) {
      byte[] caCertData = state.proxyCAState.getCaCert().getEncoded();
      returnState.proxyCAState.setCaCert(caCertData);
    }
    if (state.proxyCAState.getCaPrivateKey() != null) {
      byte[] caPrivateKeyData
          = state.proxyCAState.getCaPrivateKey().getEncoded();
      returnState.proxyCAState.setCaPrivateKey(caPrivateKeyData);
    }
    return returnState;
  }
状态信息恢复

再通过上面两种方法来获取到RM状态之后,就可以进行状态恢复。调用recover(state)方法,来将状态同步到当前RM上。

public void recover(RMState state) throws Exception {
    // recover RMdelegationTokenSecretManager
    rmContext.getRMDelegationTokenSecretManager().recover(state);
    // recover AMRMTokenSecretManager
    rmContext.getAMRMTokenSecretManager().recover(state);
    // recover reservations
    if (reservationSystem != null) {
      reservationSystem.recover(state);
    }
    // recover applications
    rmAppManager.recover(state);
    // recover ProxyCA
    rmContext.getProxyCAManager().recover(state);
    setSchedulerRecoveryStartAndWaitTime(state, conf);
  }

可以看出,恢复的也就是存储的那几块内容。

计算高可用

内容来自《 Hadoop 权威指南第四版 》

task失败

task 失败的第一种情况就是用户的 map task 或者 reduce task 代码在执行的之后抛出了运行时异常。如果发生了,就做以下 *** 作:

  • task JVM 会在退出之前报告失败给他的 AppMaster。
  • 错误会立刻写进用户的日志中。
  • AppMaster 将 task 标记为 fail。
  • 释放 task 的容器,以留给其他的 task 使用。

task 失败的另一种情况就是 JVM 意外的退出,在这种情况下, *** 作如下:

  • node manager 注意到处理已经退出,就报告给 AppMaster。
  • AppMaster 就会将 task 标记为 failed。

AppMaster注意到task有一段时间没有发来执行状态的更新,就将其标记为 filed。JVM进程会在当前周期(超时周期一般为 10 分钟,通过 mapreduce.task.timeout 属性来设置,值的单位为毫秒)后自动的被kill掉。

如果AppMaster被告知task的尝试失败了,他将会重新调度来执行该task。 AppMaster会尽量不将task重新调度到先前失败的node manager之下。默认情况下当 task 失败了四次,就不再重新运行他。尝试次数是可以设置的,通过 mapreduce.map.maxattempt 属性设置 map 尝试次数。通过 mapreduce.reduce.maxattempt 设置 reduce 尝试次数。

job失败

在一些应用中,不希望因为少数的task失败而造成job中途夭折。这时,可以设置失败task的比例(mapreduce.map.failures.maxpercents 和 mapreduce.reduce.failures.maxpercents),失败的task在此比例内,就不会触发job的失败。当超过设置的比例,整个job失败后,RM会在其他NM上重启AM(默认2次)。

参考

Apache Hadoop YARN 官方文档
Apache Hadoop 3.3.1源码
YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复
yarn3.2源码分析之ResourceManager基于zk的HA机制
YARN - Task, Node manager, AppMaster, Resource manager 失败时所做的处理
RM状态存储与还原机制详解
YARN Federation的架构设计
深入理解Hadoop HA机制
醒一醒,讲到 ZooKeeper 的选举机制了
hadoop HA 详解

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5573165.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存