本文基于hadoop-3.3.0
目录
1 概述
2 注册过程
2.1 注册前置过程
2.2 注册过程
2.3 心跳发送
1 概述
dn在完成启动(源码阅读)之后,需要向nn进行注册,才能提供后续的数据存储服务,注册完成后定时进行心跳检测,确保dn的存活。
而注册的流程是在DataNode的构造函数中的startDataNode方法中调用refreshNamenode方法完成:
blockPoolManager.refreshNamenodes(getConf());2 注册过程 2.1 注册前置过程
先看refreshNamenodes方法,获取到新的NS和NNLifeRpcAddress后调用doRefreshNamenodes完成后续 *** 作
void refreshNamenodes(Configuration conf) throws IOException { LOG.info("Refresh request received for nameservices: " + conf.get(DFSConfigKeys.DFS_NAMESERVICES)); Map> newAddressMap = null; Map > newLifelineAddressMap = null; try { newAddressMap = DFSUtil.getNNServiceRpcAddressesForCluster(conf); newLifelineAddressMap = DFSUtil.getNNLifelineRpcAddressesForCluster(conf); } catch (IOException ioe) { LOG.warn("Unable to get NameNode addresses."); } if (newAddressMap == null || newAddressMap.isEmpty()) { throw new IOException("No services to connect, missing NameNode " + "address."); } synchronized (refreshNamenodesLock) { doRefreshNamenodes(newAddressMap, newLifelineAddressMap); } }
doRefreshNamenodes方法中主要包含5个步骤:
1. 对于每个新的名称服务,确定它是对现有 NS 的一组 NN 的更新,还是全新的名称服务
2. 我们目前拥有但不再存在的任何名称服务都需要删除
3. 开启新的ns
上面三步是在同步代码块中
4. 删除过时的ns,不在同步代码块,可能触发删除等 *** 作,耗时多
5. 更新ns
在第三步的时候通过BPOfferService完成dn的注册和心跳(下一节)
private void doRefreshNamenodes( Map> addrMap, Map > lifelineAddrMap) throws IOException { assert Thread.holdsLock(refreshNamenodesLock); Set toRefresh = Sets.newlinkedHashSet(); Set toAdd = Sets.newlinkedHashSet(); Set toRemove; synchronized (this) { // Step 1. For each of the new nameservices, figure out whether // it's an update of the set of NNs for an existing NS, // or an entirely new nameservice. for (String nameserviceId : addrMap.keySet()) { if (bpByNameserviceId.containsKey(nameserviceId)) { toRefresh.add(nameserviceId); } else { toAdd.add(nameserviceId); } } // Step 2. Any nameservices we currently have but are no longer present // need to be removed. toRemove = Sets.newHashSet(Sets.difference( bpByNameserviceId.keySet(), addrMap.keySet())); assert toRefresh.size() + toAdd.size() == addrMap.size() : "toAdd: " + Joiner.on(",").useForNull(" ").join(toAdd) + " toRemove: " + Joiner.on(",").useForNull(" ").join(toRemove) + " toRefresh: " + Joiner.on(",").useForNull(" ").join(toRefresh); // Step 3. Start new nameservices if (!toAdd.isEmpty()) { LOG.info("Starting BPOfferServices for nameservices: " + Joiner.on(",").useForNull(" ").join(toAdd)); for (String nsToAdd : toAdd) { Map nnIdToAddr = addrMap.get(nsToAdd); Map nnIdToLifelineAddr = lifelineAddrMap.get(nsToAdd); ArrayList addrs = Lists.newArrayListWithCapacity(nnIdToAddr.size()); ArrayList nnIds = Lists.newArrayListWithCapacity(nnIdToAddr.size()); ArrayList lifelineAddrs = Lists.newArrayListWithCapacity(nnIdToAddr.size()); for (String nnId : nnIdToAddr.keySet()) { addrs.add(nnIdToAddr.get(nnId)); nnIds.add(nnId); lifelineAddrs.add(nnIdToLifelineAddr != null ? nnIdToLifelineAddr.get(nnId) : null); } // 根据传入的参数初始化BPOfferService,此方法是BlockPoolManager的成员函数, // 而BlockPoolManager对象是在初始化DataNode的时候创建(构造函数中创建,创建时传入了dn) BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs, lifelineAddrs); bpByNameserviceId.put(nsToAdd, bpos); offerServices.add(bpos); } } startAll(); } // Step 4. Shut down old nameservices. This happens outside // of the synchronized(this) lock since they need to call // back to .remove() from another thread if (!toRemove.isEmpty()) { LOG.info("Stopping BPOfferServices for nameservices: " + Joiner.on(",").useForNull(" ").join(toRemove)); for (String nsToRemove : toRemove) { BPOfferService bpos = bpByNameserviceId.get(nsToRemove); bpos.stop(); bpos.join(); // they will call remove on their own } } // Step 5. Update nameservices whose NN list has changed if (!toRefresh.isEmpty()) { LOG.info("Refreshing list of NNs for nameservices: " + Joiner.on(",").useForNull(" ").join(toRefresh)); for (String nsToRefresh : toRefresh) { BPOfferService bpos = bpByNameserviceId.get(nsToRefresh); Map nnIdToAddr = addrMap.get(nsToRefresh); Map nnIdToLifelineAddr = lifelineAddrMap.get(nsToRefresh); ArrayList addrs = Lists.newArrayListWithCapacity(nnIdToAddr.size()); ArrayList lifelineAddrs = Lists.newArrayListWithCapacity(nnIdToAddr.size()); ArrayList nnIds = Lists.newArrayListWithCapacity( nnIdToAddr.size()); for (String nnId : nnIdToAddr.keySet()) { addrs.add(nnIdToAddr.get(nnId)); lifelineAddrs.add(nnIdToLifelineAddr != null ? nnIdToLifelineAddr.get(nnId) : null); nnIds.add(nnId); } try { UserGroupInformation.getLoginUser() .doAs(new PrivilegedExceptionAction
在这里调用BPOfferService的start方法,而后在start方法中调用BPServiceActor中的start方法
synchronized void startAll() throws IOException { try { UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction
通过BPServiceActor#start方法可以看出,使用this对象作为线程启动,即调用BPServiceActor#run()方法:
这里包括两个流程:
- dn注册:connectToNNAndHandshake
- 发送心跳:offerService
@Override public void run() { LOG.info(this + " starting to offer service"); try { while (true) { // init stuff try { // setup storage // bp连接到namenode,注册datanode connectToNNAndHandshake(); break; } catch (IOException ioe) { // Initial handshake, storage recovery or registration failed runningState = RunningState.INIT_FAILED; if (shouldRetryInit()) { // Retry until all namenode's of BPOS failed initialization LOG.error("Initialization failed for " + this + " " + ioe.getLocalizedMessage()); sleepAndLogInterrupts(5000, "initializing"); } else { runningState = RunningState.FAILED; LOG.error("Initialization failed for " + this + ". Exiting. ", ioe); return; } } } runningState = RunningState.RUNNING; if (initialRegistrationComplete != null) { initialRegistrationComplete.countDown(); } while (shouldRun()) { try { offerService(); } catch (Exception ex) { LOG.error("Exception in BPOfferService for " + this, ex); sleepAndLogInterrupts(5000, "offering service"); } } runningState = RunningState.EXITED; } catch (Throwable ex) { LOG.warn("Unexpected exception in block pool " + this, ex); runningState = RunningState.FAILED; } finally { LOG.warn("Ending block pool service for: " + this); cleanUp(); } } // 注册dn到nn private void connectToNNAndHandshake() throws IOException { // get NN proxy // 这里会创建一个DatanodeProtocolClientSideTranslatorPB对象 // 此对象的作用是将client端的请求转到rpc server端上 bpNamenode = dn.connectToNN(nnAddr); // First phase of the handshake with NN - get the namespace // info. NamespaceInfo nsInfo = retrieveNamespaceInfo(); // Verify that this matches the other NN in this HA pair. // This also initializes our block pool in the DN if we are // the first NN connection for this BP. bpos.verifyAndSetNamespaceInfo(this, nsInfo); this.bpThread.setName(formatThreadName("heartbeating", nnAddr)); // Second phase of the handshake with the NN. // 第二阶段的握手 register(nsInfo); } // 发送心跳 // 此处略,查看2.3节2.2 注册过程
上一节提到了注册过程的前提流程,在这里终于走到了dn的注册过程中:
在这个方法中会完成dn的注册
void register(NamespaceInfo nsInfo) throws IOException { // The handshake() phase loaded the block pool storage // off disk - so update the bpRegistration object from that info // 创建dn的注册信息到指定的blockPool DatanodeRegistration newBpRegistration = bpos.createRegistration(); LOG.info(this + " beginning handshake with NN"); while (shouldRun()) { try { // Use returned registration from namenode with updated fields // 调用DatanodeProtocolClientSideTranslatorPB#registerDatanode完成dn的注册 newBpRegistration = bpNamenode.registerDatanode(newBpRegistration); newBpRegistration.setNamespaceInfo(nsInfo); bpRegistration = newBpRegistration; break; } catch(EOFException e) { // namenode might have just restarted LOG.info("Problem connecting to server: " + nnAddr + " :" + e.getLocalizedMessage()); } catch(SocketTimeoutException e) { // namenode is busy LOG.info("Problem connecting to server: " + nnAddr); } catch(RemoteException e) { LOG.warn("RemoteException in register", e); throw e; } catch(IOException e) { LOG.warn("Problem connecting to server: " + nnAddr); } // Try again in a second sleepAndLogInterrupts(1000, "connecting to server"); } if (bpRegistration == null) { throw new IOException("DN shut down before block pool registered"); } LOG.info(this + " successfully registered with NN"); bpos.registrationSucceeded(this, bpRegistration); // reset lease id whenever registered to NN. // ask for a new lease id at the next heartbeat. fullBlockReportLeaseId = 0; // random short delay - helps scatter the BR from all DNs scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs, true); }
此方法完成dn的注册,过程中由于RpcEngine是使用的Protobuf,因此注册完成后会使用hadoop自行封装的PBHelper.convert将注册完成后的响应信息中的注册信息从protobuf中转换成DatanodeRegistration
// DatanodeProtocolClientSideTranslatorPB#registerDatanode @Override public DatanodeRegistration registerDatanode(DatanodeRegistration registration ) throws IOException { // 封装注册信息,发送到rpc server(NameNodeRpcServer) RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto .newBuilder().setRegistration(PBHelper.convert(registration)); RegisterDatanodeResponseProto resp; try { resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build()); } catch (ServiceException se) { throw ProtobufHelper.getRemoteException(se); } return PBHelper.convert(resp.getRegistration()); }
而后rpc server端接收到代理请求,这里是NameNodeRpcServer
// NameNodeRpcServer#registerDatanode @Override // DatanodeProtocol public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) throws IOException { checkNNStartup(); verifySoftwareVersion(nodeReg); namesystem.registerDatanode(nodeReg); return nodeReg; }
真实调用FSNamesystem的registerDatanode方法
void registerDatanode(DatanodeRegistration nodeReg) throws IOException { writeLock(); try { blockManager.registerDatanode(nodeReg); } finally { writeUnlock("registerDatanode"); } } // 而后调用blockManager#registerDataNode public void registerDatanode(DatanodeRegistration nodeReg) throws IOException { assert namesystem.hasWriteLock(); // 调用DatanodeManager#registerDatanode完成最后的注册 *** 作 datanodeManager.registerDatanode(nodeReg); // 注册完成后还需要检测一下blockManager的安全模式 bmSafeMode.checkSafeMode(); }
最后看下DatanodeManager#registerDatanode方法,这里会根据三种不同的情况分别完成注册:
- 数据节点已经存在,但是用于服务新的数据存储,因此移除现在的
- 重复注册的情况,主要就是更新信息
- 处理从未注册的过的新节点注册的情况
public void registerDatanode(DatanodeRegistration nodeReg) throws DisallowedDatanodeException, UnresolvedTopologyException { InetAddress dnAddress = Server.getRemoteIp(); if (dnAddress != null) { // Mostly called inside an RPC, update ip and peer hostname String hostname = dnAddress.getHostName(); String ip = dnAddress.getHostAddress(); if (checkIpHostnameInRegistration && !isNameResolved(dnAddress)) { // Reject registration of unresolved datanode to prevent performance // impact of repetitive DNS lookups later. final String message = "hostname cannot be resolved (ip=" + ip + ", hostname=" + hostname + ")"; LOG.warn("Unresolved datanode registration: " + message); throw new DisallowedDatanodeException(nodeReg, message); } // update node registration with the ip and hostname from rpc request nodeReg.setIpAddr(ip); nodeReg.setPeerHostName(hostname); } try { nodeReg.setExportedKeys(blockManager.getBlockKeys()); // Checks if the node is not on the hosts list. If it is not, then // it will be disallowed from registering. if (!hostConfigManager.isIncluded(nodeReg)) { throw new DisallowedDatanodeException(nodeReg); } NameNode.stateChangeLog.info("BLOCK* registerDatanode: from " + nodeReg + " storage " + nodeReg.getDatanodeUuid()); DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid()); DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr( nodeReg.getIpAddr(), nodeReg.getXferPort()); // 数据节点已经存在,但是用于服务新的数据存储,因此移除现在的 if (nodeN != null && nodeN != nodeS) { NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN); // nodeN previously served a different data storage, // which is not served by anybody anymore. removeDatanode(nodeN); // physically remove node from datanodeMap wipeDatanode(nodeN); nodeN = null; } // 重新注册的情况 if (nodeS != null) { if (nodeN == nodeS) { // The same datanode has been just restarted to serve the same data // storage. We do not need to remove old data blocks, the delta will // be calculated on the next block report from the datanode if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* registerDatanode: " + "node restarted."); } } else { // nodeS is found NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS + " is replaced by " + nodeReg + " with the same storageID " + nodeReg.getDatanodeUuid()); } boolean success = false; // 更新一些dn描述信息 try { // update cluster map getNetworkTopology().remove(nodeS); if(shouldCountVersion(nodeS)) { decrementVersionCount(nodeS.getSoftwareVersion()); } nodeS.updateRegInfo(nodeReg); nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion()); nodeS.setDisallowed(false); // Node is in the include list // resolve network location if(this.rejectUnresolvedTopologyDN) { nodeS.setNetworkLocation(resolveNetworkLocation(nodeS)); nodeS.setDependentHostNames(getNetworkDependencies(nodeS)); } else { nodeS.setNetworkLocation( resolveNetworkLocationWithFallBackToDefaultLocation(nodeS)); nodeS.setDependentHostNames( getNetworkDependenciesWithDefault(nodeS)); } getNetworkTopology().add(nodeS); resolveUpgradeDomain(nodeS); // also treat the registration message as a heartbeat heartbeatManager.register(nodeS); incrementVersionCount(nodeS.getSoftwareVersion()); startAdminOperationIfNecessary(nodeS); success = true; } finally { if (!success) { removeDatanode(nodeS); wipeDatanode(nodeS); countSoftwareVersions(); } } return; } DatanodeDescriptor nodeDescr = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK); boolean success = false; try { // 接下来处理完全新的注册节点 // resolve network location //解析网络信息,将其加入集群的网络拓扑中 if(this.rejectUnresolvedTopologyDN) { nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr)); nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr)); } else { nodeDescr.setNetworkLocation( resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr)); nodeDescr.setDependentHostNames( getNetworkDependenciesWithDefault(nodeDescr)); } nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion()); resolveUpgradeDomain(nodeDescr); // register new datanode // 这里调用addDatanode方法将dn添加到网络拓扑中 addDatanode(nodeDescr); blockManager.getBlockReportLeaseManager().register(nodeDescr); // also treat the registration message as a heartbeat // no need to update its timestamp // because its is done when the descriptor is created // 这里将注册完成的dn添加到heartbeatManager中的dn数组中,便于后续维护dns的心跳 heartbeatManager.addDatanode(nodeDescr); heartbeatManager.updateDnStat(nodeDescr); incrementVersionCount(nodeReg.getSoftwareVersion()); startAdminOperationIfNecessary(nodeDescr); success = true; } finally { if (!success) { removeDatanode(nodeDescr); wipeDatanode(nodeDescr); countSoftwareVersions(); } } } catch (InvalidTopologyException e) { // If the network location is invalid, clear the cached mappings // so that we have a chance to re-add this DataNode with the // correct network location later. ListinvalidNodeNames = new ArrayList<>(3); // clear cache for nodes in IP or Hostname invalidNodeNames.add(nodeReg.getIpAddr()); invalidNodeNames.add(nodeReg.getHostName()); invalidNodeNames.add(nodeReg.getPeerHostName()); dnsToSwitchMapping.reloadCachedMappings(invalidNodeNames); throw e; } }
addDatanode方法
void addDatanode(final DatanodeDescriptor node) { // To keep host2DatanodeMap consistent with datanodeMap, // remove from host2DatanodeMap the datanodeDescriptor removed // from datanodeMap before adding node to host2DatanodeMap. // 在后面将node添加到host2DatanodeMap中之前先将node移除,防止冲突 synchronized(this) { host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node)); } networktopology.add(node); // may throw InvalidTopologyException host2DatanodeMap.add(node); // 检查集群是否是多机架 checkIfClusterIsNowMultiRack(node); resolveUpgradeDomain(node); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ".addDatanode: " + "node " + node + " is added to datanodeMap."); } }2.3 心跳发送
根据2.1节中的run方法,心跳发送主要是BPServiceActor中的offerService方法
private void offerService() throws Exception { LOG.info("For namenode " + nnAddr + " using" + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msecs" + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msecs" + " Initial delay: " + dnConf.initialBlockReportDelayMs + "msecs" + "; heartBeatInterval=" + dnConf.heartBeatInterval + (lifelineSender != null ? "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : "")); // // Now loop for a long time.... // while (shouldRun()) { try { DataNodeFaultInjector.get().startOfferService(); final long startTime = scheduler.monotonicNow(); // // Every so often, send heartbeat or block-report // 是否发送心跳,通过初始化Scheduler时设置的nextHeartbeatTime( // nextHeartbeatTime + heartbeatIntervalMs(默认3s)) // - startTime 是否小于等于0判断 final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime); HeartbeatResponse resp = null; if (sendHeartbeat) { // // All heartbeat messages include following info: // -- Datanode name // -- data transfer port // -- Total capacity // -- Bytes remaining // boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) && scheduler.isBlockReportDue(startTime); if (!dn.areHeartbeatsDisabledForTests()) { // 发送心跳 resp = sendHeartBeat(requestBlockReportLease); assert resp != null; if (resp.getFullBlockReportLeaseId() != 0) { if (fullBlockReportLeaseId != 0) { LOG.warn(nnAddr + " sent back a full block report lease " + "ID of 0x" + Long.toHexString(resp.getFullBlockReportLeaseId()) + ", but we already have a lease ID of 0x" + Long.toHexString(fullBlockReportLeaseId) + ". " + "Overwriting old lease ID."); } fullBlockReportLeaseId = resp.getFullBlockReportLeaseId(); } dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime, getRpcMetricSuffix()); // If the state of this NN has changed (eg STANDBY->ACTIVE) // then let the BPOfferService update itself. // // important that this happens before processCommand below, // since the first heartbeat to a new active might have commands // that we should actually process. bpos.updateActorStatesFromHeartbeat( this, resp.getNameNodeHaState()); state = resp.getNameNodeHaState().getState(); if (state == HAServiceState.ACTIVE) { handleRollingUpgradeStatus(resp); } commandProcessingThread.enqueue(resp.getCommands()); } } if (!dn.areIBRDisabledForTests() && (ibrManager.sendImmediately()|| sendHeartbeat)) { ibrManager.sendIBRs(bpNamenode, bpRegistration, bpos.getBlockPoolId(), getRpcMetricSuffix()); } Listcmds = null; boolean forceFullBr = scheduler.forceFullBlockReport.getAndSet(false); if (forceFullBr) { LOG.info("Forcing a full block report to " + nnAddr); } if ((fullBlockReportLeaseId != 0) || forceFullBr) { cmds = blockReport(fullBlockReportLeaseId); fullBlockReportLeaseId = 0; } commandProcessingThread.enqueue(cmds); if (!dn.areCacheReportsDisabledForTests()) { DatanodeCommand cmd = cacheReport(); commandProcessingThread.enqueue(cmd); } if (sendHeartbeat) { dn.getMetrics().addHeartbeatTotal( scheduler.monotonicNow() - startTime, getRpcMetricSuffix()); } // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime()); } catch(RemoteException re) { String reClass = re.getClassName(); if (UnregisteredNodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) { LOG.warn(this + " is shutting down", re); shouldServiceRun = false; return; } LOG.warn("RemoteException in offerService", re); sleepAfterException(); } catch (IOException e) { LOG.warn("IOException in offerService", e); sleepAfterException(); } finally { DataNodeFaultInjector.get().endOfferService(); } processQueueMessages(); } // while (shouldRun()) } // offerService
进入BPServiceActor#sendHeartBeat
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) throws IOException { scheduler.scheduleNextHeartbeat(); StorageReport[] reports = dn.getFSDataset().getStorageReports(bpos.getBlockPoolId()); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat with " + reports.length + " storage reports from service actor: " + this); } final long now = monotonicNow(); scheduler.updateLastHeartbeatTime(now); VolumeFailureSummary volumeFailureSummary = dn.getFSDataset() .getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0; final boolean outliersReportDue = scheduler.isOutliersReportDue(now); final SlowPeerReports slowPeers = outliersReportDue && dn.getPeerMetrics() != null ? SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) : SlowPeerReports.EMPTY_REPORT; // 获取各个disk的延迟数据,包括readio writeio metadataio final SlowDiskReports slowDisks = outliersReportDue && dn.getDiskMetrics() != null ? SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) : SlowDiskReports.EMPTY_REPORT; // 发送心跳,bpNamenode是DatanodeProtocolClientSideTranslatorPB实例, // 最终调用namenodeRpcServer的sendHeartbeat HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary, requestBlockReportLease, slowPeers, slowDisks); if (outliersReportDue) { // If the report was due and successfully sent, schedule the next one. scheduler.scheduleNextOutlierReport(); } return response; }
而后bpNamenode.sendHeartBeat发送心跳,NameNodeRpcServer接受到心跳:
// NameNodeRpcServer#sendHeartbeat @Override // DatanodeProtocol public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, StorageReport[] report, long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks) throws IOException { checkNNStartup(); verifyRequest(nodeReg); // 这里调用FSNamesystem#handleHeartbeat return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary, requestFullBlockReportLease, slowPeers, slowDisks); }
FSNamesystem#handleHeartbeat方法处理心跳,在此过程中主要是调用DatanodeManager#handleHeartbeat
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks) throws IOException { readLock(); try { //get datanode commands final int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress; // 这里调用DatanodeManager#handleHeartbeat方法处理心跳,返回心跳过程中携带的命令 DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary, slowPeers, slowDisks); long blockReportLeaseId = 0; if (requestFullBlockReportLease) { blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg); } //create ha status final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat( haContext.getState().getServiceState(), getFSImage().getCorrectLastAppliedOrWrittenTxId()); // 返回一个心跳响应 return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo, blockReportLeaseId); } finally { readUnlock("handleHeartbeat"); } }
在DatanodeManager#handleHeartbeat中心跳的具体流程如下:
- 先获取datanode的信息,判断是否允许连接(比如在exclude中),如果不允许的话,直接抛出异常。
- 判断是否注册过,如果没注册过,直接返回注册命令
- 更新datanode的信息,主要就是更新DatanodeDescriptor中的信息,如使用空间,剩余空间等。
- 检查是否处于安全模式
- 获取块恢复命令:blockRecoverCommand
- 针对block执行处理命令
- 获取numReplicationTasks个普通block做后续处理
- 判断block是否被移除
- 生成复制命令
- 获取numECTasks个ecblock做后续处理
- 获取numReplicationTasks个普通block做后续处理
- 生成删除的命令
- 生成缓存相关的命令
- 生成带宽相关的命令
- 返回所有的命令
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, final String blockPoolId, long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes, VolumeFailureSummary volumeFailureSummary, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks) throws IOException { final DatanodeDescriptor nodeinfo; try { // 先获取datanode的信息 nodeinfo = getDatanode(nodeReg); } catch (UnregisteredNodeException e) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } // Check if this datanode should actually be shutdown instead. // 1.判断是否允许连接,如果不允许的话,直接抛出异常 if (nodeinfo != null && nodeinfo.isDisallowed()) { setDatanodeDead(nodeinfo); throw new DisallowedDatanodeException(nodeinfo); } // 2. 判断是否注册过,如果没注册过,直接返回注册命令 if (nodeinfo == null || !nodeinfo.isRegistered()) { return new DatanodeCommand[]{RegisterCommand.REGISTER}; } // 3.更新dn信息 heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity, cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary); // If we are in safemode, do not send back any recovery / replication // requests. Don't even drain the existing queue of work. // 4.检查是否处于安全模式 if (namesystem.isInSafeMode()) { return new DatanodeCommand[0]; } // block recovery command // 5. 获取blockRecoverCommand final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId, nodeinfo); if (brCommand != null) { return new DatanodeCommand[]{brCommand}; } final Listcmds = new ArrayList<>(); // Allocate _approximately_ maxTransfers pending tasks to DataNode. // NN chooses pending tasks based on the ratio between the lengths of // replication and erasure-coded block queues. int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks(); int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded(); int totalBlocks = totalReplicateBlocks + totalECBlocks; // 6. 针对block执行处理命令 if (totalBlocks > 0) { int numReplicationTasks = (int) Math.ceil( (double) (totalReplicateBlocks * maxTransfers) / totalBlocks); int numECTasks = (int) Math.ceil( (double) (totalECBlocks * maxTransfers) / totalBlocks); if (LOG.isDebugEnabled()) { LOG.debug("Pending replication tasks: " + numReplicationTasks + " erasure-coded tasks: " + numECTasks); } // check pending replication tasks // 获取numReplicationTasks个block做后续处理 List pendingList = nodeinfo.getReplicationCommand( numReplicationTasks); if (pendingList != null && !pendingList.isEmpty()) { // If the block is deleted, the block size will become // BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't // need // to send for replication or reconstruction Iterator iterator = pendingList.iterator(); while (iterator.hasNext()) { BlockTargetPair cmd = iterator.next(); // 判断是否已经被移除了 if (cmd.block != null && cmd.block.getNumBytes() == BlockCommand.NO_ACK) { // block deleted DatanodeStorageInfo.decrementBlocksScheduled(cmd.targets); iterator.remove(); } } // 添加复制的命令 if (!pendingList.isEmpty()) { cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)); } } // check pending erasure coding tasks // 获取numEcTasks个ecBlock做后续处理 List pendingECList = nodeinfo .getErasureCodeCommand(numECTasks); if (pendingECList != null && !pendingECList.isEmpty()) { cmds.add(new BlockECReconstructionCommand( DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList)); } } // check block invalidation // 7.添加删除块的命令 Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); if (blks != null) { cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks)); } // cache commands // 8.添加缓存相关命令 addCacheCommands(blockPoolId, nodeinfo, cmds); // key update command // 更新access key blockManager.addKeyUpdateCommand(cmds, nodeinfo); // check for balancer bandwidth update // 9.添加带宽相关命令 if (nodeinfo.getBalancerBandwidth() > 0) { cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth())); // set back to 0 to indicate that datanode has been sent the new value nodeinfo.setBalancerBandwidth(0); } if (slowPeerTracker != null) { final Map slowPeersMap = slowPeers.getSlowPeers(); if (!slowPeersMap.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap); } for (String slowNodeId : slowPeersMap.keySet()) { slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false)); } } } if (slowDiskTracker != null) { if (!slowDisks.getSlowDisks().isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("DataNode " + nodeReg + " reported slow disks: " + slowDisks.getSlowDisks()); } slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks); } slowDiskTracker.checkAndUpdateReportIfNecessary(); } // 10. 返回所有命令 if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } return new DatanodeCommand[0]; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)