Hadoop yarn源码分析(十)ResourceTrackerService源码分析 2021SC@SDUSC

Hadoop yarn源码分析(十)ResourceTrackerService源码分析 2021SC@SDUSC,第1张

Hadoop yarn源码分析(十)ResourceTrackerService源码分析 2021SC@SDUSC

2021SC@SDUSC

Hadoop yarn源码分析(十) ResourceTrackerService源码分析
  • 一、ResourceTrackerService简介
  • 二、ResourceTrackerService基本属性
  • 三、ResourceTrackerService基本方法
    • 3.1 serviceInit()方法
    • 3.2 serviceStart()方法
    • 3.3 registerNodeManager()方法
    • 3.3 nodeHeartbeat()方法
    • 3.4 unRegisterNodeManager()方法

一、ResourceTrackerService简介

NodeManager是执行在单个节点上的代理,负责管理Hadoop集群中的单个计算节点。主要有与ResourceManager保持通信,监控管理Container的生命周期和使用情况,管理日志和不同应用程序用到的附属服务等功能。
ResourceTrackerService负责处理来自NodeManager的请求,包括注册、心跳两种请求。注册是在NodeManager启动时发生的行为,请求包含节点id、可用的资源上限等信息。而心跳是周期性行为,包含各个Container的运行状态,运行的Application列表、节点健康状况等信息。ResourceTrackerService可为NodeManager返回待释放的Container列表、 Application列表等信息用来应答请求。

二、ResourceTrackerService基本属性

包括RM上下文信息、NodeList管理、NM监控等基本信息,还有心跳间隔以及相关的NM版本信息。

  //org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService.Java
  //RM上下文
  private final RMContext rmContext;
  //NodesListManager
  private final NodesListManager nodesListManager;
  //NM监控
  private final NMLivelinessMonitor nmLivelinessMonitor;
  //安全
  private final RMContainerTokenSecretManager containerTokenSecretManager;
  private final NMTokenSecretManagerInRM nmTokenSecretManager;

  //读锁
  private final ReadLock readLock;
  //写锁
  private final WriteLock writeLock;

  //下一次心跳间隔
  private long nextHeartBeatInterval;
  //心跳间隔的相关参数,最大、最小值等
  private boolean heartBeatIntervalScalingEnable;
  private long heartBeatIntervalMin;
  private long heartBeatIntervalMax;
  private float heartBeatIntervalSpeedupFactor;
  private float heartBeatIntervalSlowdownFactor;

  //rpc服务
  private Server server;
  //绑定的地址
  private InetSocketAddress resourceTrackerAddress;
  //最小的NM版本
  private String minimumNodeManagerVersion;

  //最小分配内存
  private int minAllocMb;
  //最小分配core
  private int minAllocVcores;

  private DecommissioningNodesWatcher decommissioningWatcher;

  private boolean isDistributedNodeLabelsConf;
  private boolean isDelegatedCentralizedNodeLabelsConf;
  private DynamicResourceConfiguration drConf;

  //timeline相关参数
  private final AtomicLong timelineCollectorVersion = new AtomicLong(0);
  private boolean checkIpHostnameInRegistration;
  private boolean timelineServiceV2Enabled;

构造方法

public ResourceTrackerService(RMContext rmContext,
      NodesListManager nodesListManager,
      NMLivelinessMonitor nmLivelinessMonitor,
      RMContainerTokenSecretManager containerTokenSecretManager,
      NMTokenSecretManagerInRM nmTokenSecretManager) {
    super(ResourceTrackerService.class.getName());
    //RM上下文
    this.rmContext = rmContext;
    //NodesListManager
    this.nodesListManager = nodesListManager;
    //NM监控
    this.nmLivelinessMonitor = nmLivelinessMonitor;
    this.containerTokenSecretManager = containerTokenSecretManager;
    this.nmTokenSecretManager = nmTokenSecretManager;
    //锁的读写 *** 作
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.readLock = lock.readLock();
    this.writeLock = lock.writeLock();
    this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext);
  }
三、ResourceTrackerService基本方法 3.1 serviceInit()方法

主要进行初始化,并分配最小内存和最小CPU

  protected void serviceInit(Configuration conf) throws Exception {
    //地址
    resourceTrackerAddress = conf.getSocketAddr(
        YarnConfiguration.RM_BIND_HOST,
        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
    //初始化 *** 作
    RackResolver.init(conf);
    //检查ip
    checkIpHostnameInRegistration = conf.getBoolean(
        YarnConfiguration.RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
        YarnConfiguration.DEFAULT_RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY);
    //最小分配内存
    minAllocMb = conf.getInt(
        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
    //最小分配CPU
    minAllocVcores = conf.getInt(
        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
    //最小NM版本号
    minimumNodeManagerVersion = conf.get(
        YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
        YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
    timelineServiceV2Enabled =  YarnConfiguration.
        timelineServiceV2Enabled(conf);

    if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
      isDistributedNodeLabelsConf =
          YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
      isDelegatedCentralizedNodeLabelsConf =
          YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
    }
    //更新心跳信息
    updateHeartBeatConfiguration(conf);
    //加载动态资源信息
    loadDynamicResourceConfiguration(conf);
    decommissioningWatcher.init(conf);
    super.serviceInit(conf);
  }
3.2 serviceStart()方法

主要启动RPC服务,端口号为0.0.0.0 :8031

  @Override
  protected void serviceStart() throws Exception {
    super.serviceStart();
    //如果启用了安全性,ResourceTrackerServer将通过Kerberos对NodeManager进行身份验证,因此没有secretManager
    Configuration conf = getConfig();
    YarnRPC rpc = YarnRPC.create(conf);
    this.server = rpc.getServer(
        ResourceTracker.class, this, resourceTrackerAddress, conf, null,
        conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
            YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));

    //启用服务授权
    if (conf.getBoolean(
        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
        false)) {
      InputStream inputStream =
          this.rmContext.getConfigurationProvider()
              .getConfigurationInputStream(conf,
                  YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
      if (inputStream != null) {
        conf.addResource(inputStream);
      }
      refreshServiceAcls(conf, RMPolicyProvider.getInstance());
    }

    this.server.start();
    //更新连接信息
    conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
        YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
        YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
        server.getListenerAddress());
  }
3.3 registerNodeManager()方法

注册NM方法,负责获取NM上汇报的各种资源信息(NodeId, host,port,memory,资源)等,并向RM进行注册、恢复。

  @SuppressWarnings("unchecked")
  @Override
  public RegisterNodeManagerResponse registerNodeManager(
      RegisterNodeManagerRequest request) throws YarnException,
      IOException {
    //nodeId,端口号,node状态等信息,cm端口、http端口
    NodeId nodeId = request.getNodeId();
    String host = nodeId.getHost();
    int cmPort = nodeId.getPort();
    int httpPort = request.getHttpPort();
    //获取资源容量
    Resource capability = request.getResource();
    //获取NM版本
    String nodeManagerVersion = request.getNMVersion();
    //获取物理资源
    Resource physicalResource = request.getPhysicalResource();
    NodeStatus nodeStatus = request.getNodeStatus();

    //更新NM版本信息
    RegisterNodeManagerResponse response = recordFactory
        .newRecordInstance(RegisterNodeManagerResponse.class);

    if (!minimumNodeManagerVersion.equals("NONE")) {
      if (minimumNodeManagerVersion.equals("EqualToRM")) {
        minimumNodeManagerVersion = YarnVersionInfo.getVersion();
      }

      if ((nodeManagerVersion == null) ||
          (VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) {
        String message =
            "Disallowed NodeManager Version " + nodeManagerVersion
                + ", is less than the minimum version "
                + minimumNodeManagerVersion + " sending SHUTDOWN signal to "
                + "NodeManager.";
        LOG.info(message);
        response.setDiagnosticsMessage(message);
        response.setNodeAction(NodeAction.SHUTDOWN);
        return response;
      }
    }

    //验证node是否有效
    if (checkIpHostnameInRegistration) {
      InetSocketAddress nmAddress =
          NetUtils.createSocketAddrForHost(host, cmPort);
      InetAddress inetAddress = Server.getRemoteIp();
      if (inetAddress != null && nmAddress.isUnresolved()) {
        //拒绝注册未解析的NM,以防止RM在分配时陷入困境
        final String message =
            "hostname cannot be resolved (ip=" + inetAddress.getHostAddress()
                + ", hostname=" + host + ")";
        LOG.warn("Unresolved nodemanager registration: " + message);
        response.setDiagnosticsMessage(message);
        response.setNodeAction(NodeAction.SHUTDOWN);
        return response;
      }
    }

    //检查是否是一个有效的指针
    if (!this.nodesListManager.isValidNode(host) &&
        !isNodeInDecommissioning(nodeId)) {
      String message =
          "Disallowed NodeManager from  " + host
              + ", Sending SHUTDOWN signal to the NodeManager.";
      LOG.info(message);
      response.setDiagnosticsMessage(message);
      response.setNodeAction(NodeAction.SHUTDOWN);
      return response;
    }

    //检查节点的容量是否从  dynamic-resources.xml 加载
    String nid = nodeId.toString();

    Resource dynamicLoadCapability = loadNodeResourceFromDRConfiguration(nid);
    if (dynamicLoadCapability != null) {
      LOG.debug("Resource for node: {} is adjusted from: {} to: {} due to"
          + " settings in dynamic-resources.xml.", nid, capability,
          dynamicLoadCapability);
      capability = dynamicLoadCapability;
      //与新资源同步回退
      response.setResource(capability);
    }

    //检测该node是否有最小资源限制 , 发送SHUTDOWN 指令
    if (capability.getMemorySize() < minAllocMb
        || capability.getVirtualCores() < minAllocVcores) {
      String message = "NodeManager from  " + host
          + " doesn't satisfy minimum allocations, Sending SHUTDOWN"
          + " signal to the NodeManager. Node capabilities are " + capability
          + "; minimums are " + minAllocMb + "mb and " + minAllocVcores
          + " vcores";
      LOG.info(message);
      response.setDiagnosticsMessage(message);
      response.setNodeAction(NodeAction.SHUTDOWN);
      return response;
    }

    response.setContainerTokenMasterKey(containerTokenSecretManager
        .getCurrentKey());
    response.setNMTokenMasterKey(nmTokenSecretManager
        .getCurrentKey());
    //构建RMNode对象
    RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
        resolve(host), capability, nodeManagerVersion, physicalResource);
    //构建旧的node对象
    RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
    if (oldNode == null) {
      //新构建RMnode
      RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
          request.getNMContainerStatuses(),
          request.getRunningApplications(), nodeStatus);
      if (request.getLogAggregationReportsForApps() != null
          && !request.getLogAggregationReportsForApps().isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Found the number of previous cached log aggregation "
              + "status from nodemanager:" + nodeId + " is :"
              + request.getLogAggregationReportsForApps().size());
        }
        startEvent.setLogAggregationReportsForApps(request
            .getLogAggregationReportsForApps());
      }
      this.rmContext.getDispatcher().getEventHandler().handle(
          startEvent);
    } else {
      //移除注册
      LOG.info("Reconnect from the node at: " + host);
      this.nmLivelinessMonitor.unregister(nodeId);

      if (CollectionUtils.isEmpty(request.getRunningApplications())
          && rmNode.getState() != NodeState.DECOMMISSIonING
          && rmNode.getHttpPort() != oldNode.getHttpPort()) {
        // Reconnected node differs, so replace old node and start new node
        switch (rmNode.getState()) {
        case RUNNING:
          ClusterMetrics.getMetrics().decrNumActiveNodes();
          break;
        case UNHEALTHY:
          ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
          break;
        default:
          LOG.debug("Unexpected Rmnode state");
        }
        //重新连接
        this.rmContext.getDispatcher().getEventHandler()
            .handle(new NodeRemovedSchedulerEvent(rmNode));

        this.rmContext.getRMNodes().put(nodeId, rmNode);
        this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus));
      } else {
        //重新设置heartbeat ID
        oldNode.resetLastNodeHeartBeatResponse();

        this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMNodeReconnectEvent(nodeId, rmNode,
                request.getRunningApplications(),
                request.getNMContainerStatuses()));
      }
    }
    // 在每个节点管理器寄存器上,我们将清除任何正在运行的应用程序的NMToken密钥(如果存在)
    this.nmTokenSecretManager.removeNodeKey(nodeId);
    this.nmLivelinessMonitor.register(nodeId);
    
    //处理接收到的容器状态,这应该在插入新RMNode后处理
    if (!rmContext.isWorkPreservingRecoveryEnabled()) {
      if (!request.getNMContainerStatuses().isEmpty()) {
        LOG.info("received container statuses on node manager register :"
            + request.getNMContainerStatuses());
        for (NMContainerStatus status : request.getNMContainerStatuses()) {
          handleNMContainerStatus(status, nodeId);
        }
      }
    }

    //将节点标签更新到RM的NodeLabelManager
    Set nodeLabels = NodeLabelsUtils.convertToStringSet(
        request.getNodeLabels());
    if (isDistributedNodeLabelsConf && nodeLabels != null) {
      try {
        updateNodeLabelsFromNMReport(nodeLabels, nodeId);
        response.setAreNodeLabelsAcceptedByRM(true);
      } catch (IOException ex) {
        //确保在响应中捕获异常
        response.setDiagnosticsMessage(ex.getMessage());
        response.setAreNodeLabelsAcceptedByRM(false);
      }
    } else if (isDelegatedCentralizedNodeLabelsConf) {
      this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId);
    }

    // Update node's attributes to RM's NodeAttributesManager.
    if (request.getNodeAttributes() != null) {
      try {
        // update node attributes if necessary then update heartbeat response
        updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes());
        response.setAreNodeAttributesAcceptedByRM(true);
      } catch (IOException ex) {
        //确保捕获并发送错误消息作为响应
        String errorMsg = response.getDiagnosticsMessage() == null ?
            ex.getMessage() :
            response.getDiagnosticsMessage() + "n" + ex.getMessage();
        response.setDiagnosticsMessage(errorMsg);
        response.setAreNodeAttributesAcceptedByRM(false);
      }
    }

    StringBuilder message = new StringBuilder();
    message.append("NodeManager from node ").append(host).append("(cmPort: ")
        .append(cmPort).append(" httpPort: ");
    message.append(httpPort).append(") ")
        .append("registered with capability: ").append(capability);
    message.append(", assigned nodeId ").append(nodeId);
    if (response.getAreNodeLabelsAcceptedByRM()) {
      message.append(", node labels { ").append(
          StringUtils.join(",", nodeLabels) + " } ");
    }
    if (response.getAreNodeAttributesAcceptedByRM()) {
      message.append(", node attributes { ")
          .append(request.getNodeAttributes() + " } ");
    }

    LOG.info(message.toString());
    response.setNodeAction(NodeAction.NORMAL);
    response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
    response.setRMVersion(YarnVersionInfo.getVersion());
    return response;
  }
3.3 nodeHeartbeat()方法

NodeManager 节点上的 NodeStatusUpdaterImpl进程, 会通过线程StatusUpdaterRunnable 中的run() 方法 . 定时向ResourceTrackerService发送心跳数据。
node心跳顺序:
1.检查node是否有效
2.检查是否注册,并更新心跳信息
3.检查是否为新的心跳,并且不是重复的
4.发送心跳状态给RMNode
5.若启用了分布式节点标签配置,则更新节点标签

public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
      throws YarnException, IOException {

    //node状态
    NodeStatus remoteNodeStatus = request.getNodeStatus();
    NodeId nodeId = remoteNodeStatus.getNodeId();

    // 1. 检查node是否有效, 检查是否退役中
    if (!this.nodesListManager.isValidNode(nodeId.getHost())
        && !isNodeInDecommissioning(nodeId)) {
      String message =
          "Disallowed NodeManager nodeId: " + nodeId + " hostname: "
              + nodeId.getHost();
      LOG.info(message);
      return YarnServerBuilderUtils.newNodeHeartbeatResponse(
          NodeAction.SHUTDOWN, message);
    }

    // 2. 检查node是否被注册
    RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
    if (rmNode == null) {
      //node不存在
      String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();
      LOG.info(message);
      return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
          message);
    }

    //发送心跳数据给监控
    this.nmLivelinessMonitor.receivedPing(nodeId);
    this.decommissioningWatcher.update(rmNode, remoteNodeStatus);

    // 3. 检查心跳是否为新的,而不是重复的
    NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
    if (getNextResponseId(
        remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse
            .getResponseId()) {
      LOG.info("Received duplicate heartbeat from node "
          + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
      return lastNodeHeartbeatResponse;
    } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse
        .getResponseId()) {
      String message =
          "Too far behind rm response id:"
              + lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
              + remoteNodeStatus.getResponseId();
      LOG.info(message);
      // 直接给RMNode发送重启命令
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
      return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC,
          message);
    }

    // 检查rmNode是否在退役中或者已退役
    if (rmNode.getState() == NodeState.DECOMMISSIonING &&
        decommissioningWatcher.checkReadyToBeDecommissioned(
            rmNode.getNodeID())) {
      String message = "DECOMMISSIonING " + nodeId +
          " is ready to be decommissioned";
      LOG.info(message);
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
      this.nmLivelinessMonitor.unregister(nodeId);
      return YarnServerBuilderUtils.newNodeHeartbeatResponse(
          NodeAction.SHUTDOWN, message);
    }

    if (timelineServiceV2Enabled) {
      // 检查并且更新集合的请求信息
      updateAppCollectorsMap(request);
    }

    // 构建心跳响应
    long newInterval = nextHeartBeatInterval;
    if (heartBeatIntervalScalingEnable) {
      newInterval = rmNode.calculateHeartBeatInterval(
          nextHeartBeatInterval, heartBeatIntervalMin,
          heartBeatIntervalMax, heartBeatIntervalSpeedupFactor,
          heartBeatIntervalSlowdownFactor);
    }
    NodeHeartbeatResponse nodeHeartBeatResponse =
        YarnServerBuilderUtils.newNodeHeartbeatResponse(
            getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
            NodeAction.NORMAL, null, null, null, null, newInterval);
    rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);

    populateKeys(request, nodeHeartBeatResponse);

    populateTokenSequenceNo(request, nodeHeartBeatResponse);

    if (timelineServiceV2Enabled) {
      // 返回node需要的映射
      setAppCollectorsMapToResponse(rmNode.getRunningApps(),
          nodeHeartBeatResponse);
    }

    // 4. 发送响应给RMNode,并保存最后一次请求
    RMNodeStatusEvent nodeStatusEvent =
        new RMNodeStatusEvent(nodeId, remoteNodeStatus);
    if (request.getLogAggregationReportsForApps() != null
        && !request.getLogAggregationReportsForApps().isEmpty()) {
      nodeStatusEvent.setLogAggregationReportsForApps(request
        .getLogAggregationReportsForApps());
    }
    this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);

    // 5. 更新node的标签信息
    if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) {
      try {
        updateNodeLabelsFromNMReport(
            NodeLabelsUtils.convertToStringSet(request.getNodeLabels()),
            nodeId);
        nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
      } catch (IOException ex) {
        //确保捕获并发送错误消息作为响应
        nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage());
        nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(false);
      }
    }

    // 6. 检查节点的容量是否是从dynamic-resources.xml 加载, 如果是的话,发送更新资源信息
    String nid = nodeId.toString();
    Resource capability = loadNodeResourceFromDRConfiguration(nid);
    // 如果不为空,则与新资源同步回退
    if (capability != null) {
      nodeHeartBeatResponse.setResource(capability);
    }
    // Check if we got an event (AdminService) that updated the resources
    if (rmNode.isUpdatedCapability()) {
      nodeHeartBeatResponse.setResource(rmNode.getTotalCapability());
      rmNode.resetUpdatedCapability();
    }

    // 7. 发送Container的数量限制给node, 如果超出node中队列限制,则进行截取 *** 作
    if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) {
      nodeHeartBeatResponse.setContainerQueuingLimit(
          this.rmContext.getNodeManagerQueueLimitCalculator()
              .createContainerQueuingLimit());
    }

    // 8. 获取node 属性, 并进行更新 *** 作
    if (request.getNodeAttributes() != null) {
      try {
        // 如果必要的话,更新node属性和心跳响应
        updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes());
        nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(true);
      } catch (IOException ex) {
        //确保捕获并发送错误消息作为响应
        String errorMsg =
            nodeHeartBeatResponse.getDiagnosticsMessage() == null ?
                ex.getMessage() :
                nodeHeartBeatResponse.getDiagnosticsMessage() + "n" + ex
                    .getMessage();
        nodeHeartBeatResponse.setDiagnosticsMessage(errorMsg);
        nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(false);
      }
    }

    return nodeHeartBeatResponse;
  }
3.4 unRegisterNodeManager()方法

根据请求信息中的nodeId,解除注册

  public UnRegisterNodeManagerResponse unRegisterNodeManager(
      UnRegisterNodeManagerRequest request) throws YarnException, IOException {
    UnRegisterNodeManagerResponse response = recordFactory
        .newRecordInstance(UnRegisterNodeManagerResponse.class);
    //获取nodeId
    NodeId nodeId = request.getNodeId();
    //定义RMNode
    RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
    if (rmNode == null) {
      LOG.info("Node not found, ignoring the unregister from node id : "
          + nodeId);
      return response;
    }
    LOG.info("Node with node id : " + nodeId
        + " has shutdown, hence unregistering the node.");
    this.nmLivelinessMonitor.unregister(nodeId);
    this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMNodeEvent(nodeId, RMNodeEventType.SHUTDOWN));
    return response;
  }

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5656014.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存