2021SC@SDUSC
- 一、ResourceTrackerService简介
- 二、ResourceTrackerService基本属性
- 三、ResourceTrackerService基本方法
- 3.1 serviceInit()方法
- 3.2 serviceStart()方法
- 3.3 registerNodeManager()方法
- 3.3 nodeHeartbeat()方法
- 3.4 unRegisterNodeManager()方法
NodeManager是执行在单个节点上的代理,负责管理Hadoop集群中的单个计算节点。主要有与ResourceManager保持通信,监控管理Container的生命周期和使用情况,管理日志和不同应用程序用到的附属服务等功能。
ResourceTrackerService负责处理来自NodeManager的请求,包括注册、心跳两种请求。注册是在NodeManager启动时发生的行为,请求包含节点id、可用的资源上限等信息。而心跳是周期性行为,包含各个Container的运行状态,运行的Application列表、节点健康状况等信息。ResourceTrackerService可为NodeManager返回待释放的Container列表、 Application列表等信息用来应答请求。
包括RM上下文信息、NodeList管理、NM监控等基本信息,还有心跳间隔以及相关的NM版本信息。
//org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService.Java //RM上下文 private final RMContext rmContext; //NodesListManager private final NodesListManager nodesListManager; //NM监控 private final NMLivelinessMonitor nmLivelinessMonitor; //安全 private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; //读锁 private final ReadLock readLock; //写锁 private final WriteLock writeLock; //下一次心跳间隔 private long nextHeartBeatInterval; //心跳间隔的相关参数,最大、最小值等 private boolean heartBeatIntervalScalingEnable; private long heartBeatIntervalMin; private long heartBeatIntervalMax; private float heartBeatIntervalSpeedupFactor; private float heartBeatIntervalSlowdownFactor; //rpc服务 private Server server; //绑定的地址 private InetSocketAddress resourceTrackerAddress; //最小的NM版本 private String minimumNodeManagerVersion; //最小分配内存 private int minAllocMb; //最小分配core private int minAllocVcores; private DecommissioningNodesWatcher decommissioningWatcher; private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; private DynamicResourceConfiguration drConf; //timeline相关参数 private final AtomicLong timelineCollectorVersion = new AtomicLong(0); private boolean checkIpHostnameInRegistration; private boolean timelineServiceV2Enabled;
构造方法
public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager) { super(ResourceTrackerService.class.getName()); //RM上下文 this.rmContext = rmContext; //NodesListManager this.nodesListManager = nodesListManager; //NM监控 this.nmLivelinessMonitor = nmLivelinessMonitor; this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; //锁的读写 *** 作 ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext); }三、ResourceTrackerService基本方法 3.1 serviceInit()方法
主要进行初始化,并分配最小内存和最小CPU
protected void serviceInit(Configuration conf) throws Exception { //地址 resourceTrackerAddress = conf.getSocketAddr( YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); //初始化 *** 作 RackResolver.init(conf); //检查ip checkIpHostnameInRegistration = conf.getBoolean( YarnConfiguration.RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY, YarnConfiguration.DEFAULT_RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY); //最小分配内存 minAllocMb = conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); //最小分配CPU minAllocVcores = conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); //最小NM版本号 minimumNodeManagerVersion = conf.get( YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION); timelineServiceV2Enabled = YarnConfiguration. timelineServiceV2Enabled(conf); if (YarnConfiguration.areNodeLabelsEnabled(conf)) { isDistributedNodeLabelsConf = YarnConfiguration.isDistributedNodeLabelConfiguration(conf); isDelegatedCentralizedNodeLabelsConf = YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf); } //更新心跳信息 updateHeartBeatConfiguration(conf); //加载动态资源信息 loadDynamicResourceConfiguration(conf); decommissioningWatcher.init(conf); super.serviceInit(conf); }3.2 serviceStart()方法
主要启动RPC服务,端口号为0.0.0.0 :8031
@Override protected void serviceStart() throws Exception { super.serviceStart(); //如果启用了安全性,ResourceTrackerServer将通过Kerberos对NodeManager进行身份验证,因此没有secretManager Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); this.server = rpc.getServer( ResourceTracker.class, this, resourceTrackerAddress, conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT)); //启用服务授权 if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { InputStream inputStream = this.rmContext.getConfigurationProvider() .getConfigurationInputStream(conf, YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); if (inputStream != null) { conf.addResource(inputStream); } refreshServiceAcls(conf, RMPolicyProvider.getInstance()); } this.server.start(); //更新连接信息 conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, server.getListenerAddress()); }3.3 registerNodeManager()方法
注册NM方法,负责获取NM上汇报的各种资源信息(NodeId, host,port,memory,资源)等,并向RM进行注册、恢复。
@SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { //nodeId,端口号,node状态等信息,cm端口、http端口 NodeId nodeId = request.getNodeId(); String host = nodeId.getHost(); int cmPort = nodeId.getPort(); int httpPort = request.getHttpPort(); //获取资源容量 Resource capability = request.getResource(); //获取NM版本 String nodeManagerVersion = request.getNMVersion(); //获取物理资源 Resource physicalResource = request.getPhysicalResource(); NodeStatus nodeStatus = request.getNodeStatus(); //更新NM版本信息 RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); if (!minimumNodeManagerVersion.equals("NONE")) { if (minimumNodeManagerVersion.equals("EqualToRM")) { minimumNodeManagerVersion = YarnVersionInfo.getVersion(); } if ((nodeManagerVersion == null) || (VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) { String message = "Disallowed NodeManager Version " + nodeManagerVersion + ", is less than the minimum version " + minimumNodeManagerVersion + " sending SHUTDOWN signal to " + "NodeManager."; LOG.info(message); response.setDiagnosticsMessage(message); response.setNodeAction(NodeAction.SHUTDOWN); return response; } } //验证node是否有效 if (checkIpHostnameInRegistration) { InetSocketAddress nmAddress = NetUtils.createSocketAddrForHost(host, cmPort); InetAddress inetAddress = Server.getRemoteIp(); if (inetAddress != null && nmAddress.isUnresolved()) { //拒绝注册未解析的NM,以防止RM在分配时陷入困境 final String message = "hostname cannot be resolved (ip=" + inetAddress.getHostAddress() + ", hostname=" + host + ")"; LOG.warn("Unresolved nodemanager registration: " + message); response.setDiagnosticsMessage(message); response.setNodeAction(NodeAction.SHUTDOWN); return response; } } //检查是否是一个有效的指针 if (!this.nodesListManager.isValidNode(host) && !isNodeInDecommissioning(nodeId)) { String message = "Disallowed NodeManager from " + host + ", Sending SHUTDOWN signal to the NodeManager."; LOG.info(message); response.setDiagnosticsMessage(message); response.setNodeAction(NodeAction.SHUTDOWN); return response; } //检查节点的容量是否从 dynamic-resources.xml 加载 String nid = nodeId.toString(); Resource dynamicLoadCapability = loadNodeResourceFromDRConfiguration(nid); if (dynamicLoadCapability != null) { LOG.debug("Resource for node: {} is adjusted from: {} to: {} due to" + " settings in dynamic-resources.xml.", nid, capability, dynamicLoadCapability); capability = dynamicLoadCapability; //与新资源同步回退 response.setResource(capability); } //检测该node是否有最小资源限制 , 发送SHUTDOWN 指令 if (capability.getMemorySize() < minAllocMb || capability.getVirtualCores() < minAllocVcores) { String message = "NodeManager from " + host + " doesn't satisfy minimum allocations, Sending SHUTDOWN" + " signal to the NodeManager. Node capabilities are " + capability + "; minimums are " + minAllocMb + "mb and " + minAllocVcores + " vcores"; LOG.info(message); response.setDiagnosticsMessage(message); response.setNodeAction(NodeAction.SHUTDOWN); return response; } response.setContainerTokenMasterKey(containerTokenSecretManager .getCurrentKey()); response.setNMTokenMasterKey(nmTokenSecretManager .getCurrentKey()); //构建RMNode对象 RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, resolve(host), capability, nodeManagerVersion, physicalResource); //构建旧的node对象 RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { //新构建RMnode RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), request.getRunningApplications(), nodeStatus); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("Found the number of previous cached log aggregation " + "status from nodemanager:" + nodeId + " is :" + request.getLogAggregationReportsForApps().size()); } startEvent.setLogAggregationReportsForApps(request .getLogAggregationReportsForApps()); } this.rmContext.getDispatcher().getEventHandler().handle( startEvent); } else { //移除注册 LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); if (CollectionUtils.isEmpty(request.getRunningApplications()) && rmNode.getState() != NodeState.DECOMMISSIonING && rmNode.getHttpPort() != oldNode.getHttpPort()) { // Reconnected node differs, so replace old node and start new node switch (rmNode.getState()) { case RUNNING: ClusterMetrics.getMetrics().decrNumActiveNodes(); break; case UNHEALTHY: ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); break; default: LOG.debug("Unexpected Rmnode state"); } //重新连接 this.rmContext.getDispatcher().getEventHandler() .handle(new NodeRemovedSchedulerEvent(rmNode)); this.rmContext.getRMNodes().put(nodeId, rmNode); this.rmContext.getDispatcher().getEventHandler() .handle(new RMNodeStartedEvent(nodeId, null, null, nodeStatus)); } else { //重新设置heartbeat ID oldNode.resetLastNodeHeartBeatResponse(); this.rmContext.getDispatcher().getEventHandler() .handle(new RMNodeReconnectEvent(nodeId, rmNode, request.getRunningApplications(), request.getNMContainerStatuses())); } } // 在每个节点管理器寄存器上,我们将清除任何正在运行的应用程序的NMToken密钥(如果存在) this.nmTokenSecretManager.removeNodeKey(nodeId); this.nmLivelinessMonitor.register(nodeId); //处理接收到的容器状态,这应该在插入新RMNode后处理 if (!rmContext.isWorkPreservingRecoveryEnabled()) { if (!request.getNMContainerStatuses().isEmpty()) { LOG.info("received container statuses on node manager register :" + request.getNMContainerStatuses()); for (NMContainerStatus status : request.getNMContainerStatuses()) { handleNMContainerStatus(status, nodeId); } } } //将节点标签更新到RM的NodeLabelManager Set3.3 nodeHeartbeat()方法nodeLabels = NodeLabelsUtils.convertToStringSet( request.getNodeLabels()); if (isDistributedNodeLabelsConf && nodeLabels != null) { try { updateNodeLabelsFromNMReport(nodeLabels, nodeId); response.setAreNodeLabelsAcceptedByRM(true); } catch (IOException ex) { //确保在响应中捕获异常 response.setDiagnosticsMessage(ex.getMessage()); response.setAreNodeLabelsAcceptedByRM(false); } } else if (isDelegatedCentralizedNodeLabelsConf) { this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId); } // Update node's attributes to RM's NodeAttributesManager. if (request.getNodeAttributes() != null) { try { // update node attributes if necessary then update heartbeat response updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes()); response.setAreNodeAttributesAcceptedByRM(true); } catch (IOException ex) { //确保捕获并发送错误消息作为响应 String errorMsg = response.getDiagnosticsMessage() == null ? ex.getMessage() : response.getDiagnosticsMessage() + "n" + ex.getMessage(); response.setDiagnosticsMessage(errorMsg); response.setAreNodeAttributesAcceptedByRM(false); } } StringBuilder message = new StringBuilder(); message.append("NodeManager from node ").append(host).append("(cmPort: ") .append(cmPort).append(" httpPort: "); message.append(httpPort).append(") ") .append("registered with capability: ").append(capability); message.append(", assigned nodeId ").append(nodeId); if (response.getAreNodeLabelsAcceptedByRM()) { message.append(", node labels { ").append( StringUtils.join(",", nodeLabels) + " } "); } if (response.getAreNodeAttributesAcceptedByRM()) { message.append(", node attributes { ") .append(request.getNodeAttributes() + " } "); } LOG.info(message.toString()); response.setNodeAction(NodeAction.NORMAL); response.setRMIdentifier(ResourceManager.getClusterTimeStamp()); response.setRMVersion(YarnVersionInfo.getVersion()); return response; }
NodeManager 节点上的 NodeStatusUpdaterImpl进程, 会通过线程StatusUpdaterRunnable 中的run() 方法 . 定时向ResourceTrackerService发送心跳数据。
node心跳顺序:
1.检查node是否有效
2.检查是否注册,并更新心跳信息
3.检查是否为新的心跳,并且不是重复的
4.发送心跳状态给RMNode
5.若启用了分布式节点标签配置,则更新节点标签
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { //node状态 NodeStatus remoteNodeStatus = request.getNodeStatus(); NodeId nodeId = remoteNodeStatus.getNodeId(); // 1. 检查node是否有效, 检查是否退役中 if (!this.nodesListManager.isValidNode(nodeId.getHost()) && !isNodeInDecommissioning(nodeId)) { String message = "Disallowed NodeManager nodeId: " + nodeId + " hostname: " + nodeId.getHost(); LOG.info(message); return YarnServerBuilderUtils.newNodeHeartbeatResponse( NodeAction.SHUTDOWN, message); } // 2. 检查node是否被注册 RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); if (rmNode == null) { //node不存在 String message = "Node not found resyncing " + remoteNodeStatus.getNodeId(); LOG.info(message); return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC, message); } //发送心跳数据给监控 this.nmLivelinessMonitor.receivedPing(nodeId); this.decommissioningWatcher.update(rmNode, remoteNodeStatus); // 3. 检查心跳是否为新的,而不是重复的 NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); if (getNextResponseId( remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse .getResponseId()) { LOG.info("Received duplicate heartbeat from node " + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId()); return lastNodeHeartbeatResponse; } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse .getResponseId()) { String message = "Too far behind rm response id:" + lastNodeHeartbeatResponse.getResponseId() + " nm response id:" + remoteNodeStatus.getResponseId(); LOG.info(message); // 直接给RMNode发送重启命令 this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING)); return YarnServerBuilderUtils.newNodeHeartbeatResponse(NodeAction.RESYNC, message); } // 检查rmNode是否在退役中或者已退役 if (rmNode.getState() == NodeState.DECOMMISSIonING && decommissioningWatcher.checkReadyToBeDecommissioned( rmNode.getNodeID())) { String message = "DECOMMISSIonING " + nodeId + " is ready to be decommissioned"; LOG.info(message); this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); this.nmLivelinessMonitor.unregister(nodeId); return YarnServerBuilderUtils.newNodeHeartbeatResponse( NodeAction.SHUTDOWN, message); } if (timelineServiceV2Enabled) { // 检查并且更新集合的请求信息 updateAppCollectorsMap(request); } // 构建心跳响应 long newInterval = nextHeartBeatInterval; if (heartBeatIntervalScalingEnable) { newInterval = rmNode.calculateHeartBeatInterval( nextHeartBeatInterval, heartBeatIntervalMin, heartBeatIntervalMax, heartBeatIntervalSpeedupFactor, heartBeatIntervalSlowdownFactor); } NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils.newNodeHeartbeatResponse( getNextResponseId(lastNodeHeartbeatResponse.getResponseId()), NodeAction.NORMAL, null, null, null, null, newInterval); rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); populateTokenSequenceNo(request, nodeHeartBeatResponse); if (timelineServiceV2Enabled) { // 返回node需要的映射 setAppCollectorsMapToResponse(rmNode.getRunningApps(), nodeHeartBeatResponse); } // 4. 发送响应给RMNode,并保存最后一次请求 RMNodeStatusEvent nodeStatusEvent = new RMNodeStatusEvent(nodeId, remoteNodeStatus); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { nodeStatusEvent.setLogAggregationReportsForApps(request .getLogAggregationReportsForApps()); } this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent); // 5. 更新node的标签信息 if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) { try { updateNodeLabelsFromNMReport( NodeLabelsUtils.convertToStringSet(request.getNodeLabels()), nodeId); nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true); } catch (IOException ex) { //确保捕获并发送错误消息作为响应 nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage()); nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(false); } } // 6. 检查节点的容量是否是从dynamic-resources.xml 加载, 如果是的话,发送更新资源信息 String nid = nodeId.toString(); Resource capability = loadNodeResourceFromDRConfiguration(nid); // 如果不为空,则与新资源同步回退 if (capability != null) { nodeHeartBeatResponse.setResource(capability); } // Check if we got an event (AdminService) that updated the resources if (rmNode.isUpdatedCapability()) { nodeHeartBeatResponse.setResource(rmNode.getTotalCapability()); rmNode.resetUpdatedCapability(); } // 7. 发送Container的数量限制给node, 如果超出node中队列限制,则进行截取 *** 作 if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) { nodeHeartBeatResponse.setContainerQueuingLimit( this.rmContext.getNodeManagerQueueLimitCalculator() .createContainerQueuingLimit()); } // 8. 获取node 属性, 并进行更新 *** 作 if (request.getNodeAttributes() != null) { try { // 如果必要的话,更新node属性和心跳响应 updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes()); nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(true); } catch (IOException ex) { //确保捕获并发送错误消息作为响应 String errorMsg = nodeHeartBeatResponse.getDiagnosticsMessage() == null ? ex.getMessage() : nodeHeartBeatResponse.getDiagnosticsMessage() + "n" + ex .getMessage(); nodeHeartBeatResponse.setDiagnosticsMessage(errorMsg); nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(false); } } return nodeHeartBeatResponse; }3.4 unRegisterNodeManager()方法
根据请求信息中的nodeId,解除注册
public UnRegisterNodeManagerResponse unRegisterNodeManager( UnRegisterNodeManagerRequest request) throws YarnException, IOException { UnRegisterNodeManagerResponse response = recordFactory .newRecordInstance(UnRegisterNodeManagerResponse.class); //获取nodeId NodeId nodeId = request.getNodeId(); //定义RMNode RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); if (rmNode == null) { LOG.info("Node not found, ignoring the unregister from node id : " + nodeId); return response; } LOG.info("Node with node id : " + nodeId + " has shutdown, hence unregistering the node."); this.nmLivelinessMonitor.unregister(nodeId); this.rmContext.getDispatcher().getEventHandler() .handle(new RMNodeEvent(nodeId, RMNodeEventType.SHUTDOWN)); return response; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)