2021-12-03 DataNode注册与心跳

2021-12-03 DataNode注册与心跳,第1张

2021-12-03 DataNode注册心跳

本文基于hadoop-3.3.0

目录

1 概述

2 注册过程

2.1 注册前置过程

2.2 注册过程

2.3 心跳发送


1 概述

dn在完成启动(源码阅读)之后,需要向nn进行注册,才能提供后续的数据存储服务,注册完成后定时进行心跳检测,确保dn的存活。

而注册的流程是在DataNode的构造函数中的startDataNode方法中调用refreshNamenode方法完成:

blockPoolManager.refreshNamenodes(getConf());
2 注册过程 2.1 注册前置过程

先看refreshNamenodes方法,获取到新的NS和NNLifeRpcAddress后调用doRefreshNamenodes完成后续 *** 作

void refreshNamenodes(Configuration conf)
    throws IOException {
    LOG.info("Refresh request received for nameservices: " +
             conf.get(DFSConfigKeys.DFS_NAMESERVICES));

    Map> newAddressMap = null;
    Map> newLifelineAddressMap = null;

    try {
        newAddressMap =
            DFSUtil.getNNServiceRpcAddressesForCluster(conf);
        newLifelineAddressMap =
            DFSUtil.getNNLifelineRpcAddressesForCluster(conf);
    } catch (IOException ioe) {
        LOG.warn("Unable to get NameNode addresses.");
    }

    if (newAddressMap == null || newAddressMap.isEmpty()) {
        throw new IOException("No services to connect, missing NameNode " +
                              "address.");
    }

    synchronized (refreshNamenodesLock) {
        doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
    }
}

doRefreshNamenodes方法中主要包含5个步骤:

1. 对于每个新的名称服务,确定它是对现有 NS 的一组 NN 的更新,还是全新的名称服务

2. 我们目前拥有但不再存在的任何名称服务都需要删除

3. 开启新的ns

上面三步是在同步代码块中

4. 删除过时的ns,不在同步代码块,可能触发删除等 *** 作,耗时多

5. 更新ns

在第三步的时候通过BPOfferService完成dn的注册和心跳(下一节)

private void doRefreshNamenodes(
    Map> addrMap,
    Map> lifelineAddrMap)
    throws IOException {
    assert Thread.holdsLock(refreshNamenodesLock);

    Set toRefresh = Sets.newlinkedHashSet();
    Set toAdd = Sets.newlinkedHashSet();
    Set toRemove;

    synchronized (this) {
        // Step 1. For each of the new nameservices, figure out whether
        // it's an update of the set of NNs for an existing NS,
        // or an entirely new nameservice.
        for (String nameserviceId : addrMap.keySet()) {
            if (bpByNameserviceId.containsKey(nameserviceId)) {
                toRefresh.add(nameserviceId);
            } else {
                toAdd.add(nameserviceId);
            }
        }

        // Step 2. Any nameservices we currently have but are no longer present
        // need to be removed.
        toRemove = Sets.newHashSet(Sets.difference(
            bpByNameserviceId.keySet(), addrMap.keySet()));

        assert toRefresh.size() + toAdd.size() ==
            addrMap.size() :
        "toAdd: " + Joiner.on(",").useForNull("").join(toAdd) +
            "  toRemove: " + Joiner.on(",").useForNull("").join(toRemove) +
            "  toRefresh: " + Joiner.on(",").useForNull("").join(toRefresh);


        // Step 3. Start new nameservices
        if (!toAdd.isEmpty()) {
            LOG.info("Starting BPOfferServices for nameservices: " +
                     Joiner.on(",").useForNull("").join(toAdd));

            for (String nsToAdd : toAdd) {
                Map nnIdToAddr = addrMap.get(nsToAdd);
                Map nnIdToLifelineAddr =
                    lifelineAddrMap.get(nsToAdd);
                ArrayList addrs =
                    Lists.newArrayListWithCapacity(nnIdToAddr.size());
                ArrayList nnIds =
                    Lists.newArrayListWithCapacity(nnIdToAddr.size());
                ArrayList lifelineAddrs =
                    Lists.newArrayListWithCapacity(nnIdToAddr.size());
                for (String nnId : nnIdToAddr.keySet()) {
                    addrs.add(nnIdToAddr.get(nnId));
                    nnIds.add(nnId);
                    lifelineAddrs.add(nnIdToLifelineAddr != null ?
                                      nnIdToLifelineAddr.get(nnId) : null);
                }
                // 根据传入的参数初始化BPOfferService,此方法是BlockPoolManager的成员函数,
                // 而BlockPoolManager对象是在初始化DataNode的时候创建(构造函数中创建,创建时传入了dn)
                BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs,
                                                 lifelineAddrs);
                bpByNameserviceId.put(nsToAdd, bpos);
                offerServices.add(bpos);
            }
        }
        startAll();
    }

    // Step 4. Shut down old nameservices. This happens outside
    // of the synchronized(this) lock since they need to call
    // back to .remove() from another thread
    if (!toRemove.isEmpty()) {
        LOG.info("Stopping BPOfferServices for nameservices: " +
                 Joiner.on(",").useForNull("").join(toRemove));

        for (String nsToRemove : toRemove) {
            BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
            bpos.stop();
            bpos.join();
            // they will call remove on their own
        }
    }

    // Step 5. Update nameservices whose NN list has changed
    if (!toRefresh.isEmpty()) {
        LOG.info("Refreshing list of NNs for nameservices: " +
                 Joiner.on(",").useForNull("").join(toRefresh));

        for (String nsToRefresh : toRefresh) {
            BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
            Map nnIdToAddr = addrMap.get(nsToRefresh);
            Map nnIdToLifelineAddr =
                lifelineAddrMap.get(nsToRefresh);
            ArrayList addrs =
                Lists.newArrayListWithCapacity(nnIdToAddr.size());
            ArrayList lifelineAddrs =
                Lists.newArrayListWithCapacity(nnIdToAddr.size());
            ArrayList nnIds = Lists.newArrayListWithCapacity(
                nnIdToAddr.size());
            for (String nnId : nnIdToAddr.keySet()) {
                addrs.add(nnIdToAddr.get(nnId));
                lifelineAddrs.add(nnIdToLifelineAddr != null ?
                                  nnIdToLifelineAddr.get(nnId) : null);
                nnIds.add(nnId);
            }
            try {
                UserGroupInformation.getLoginUser()
                    .doAs(new PrivilegedExceptionAction() {
                        @Override
                        public Object run() throws Exception {
                            bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs);
                            return null;
                        }
                    });
            } catch (InterruptedException ex) {
                IOException ioe = new IOException();
                ioe.initCause(ex.getCause());
                throw ioe;
            }
        }
    }
} 

在这里调用BPOfferService的start方法,而后在start方法中调用BPServiceActor中的start方法

synchronized void startAll() throws IOException {
    try {
        UserGroupInformation.getLoginUser().doAs(
            new PrivilegedExceptionAction() {
                @Override
                public Object run() throws Exception {
                    for (BPOfferService bpos : offerServices) {
                        // 逐个启动bpos
                        bpos.start();
                    }
                    return null;
                }
            });
    } catch (InterruptedException ex) {
        IOException ioe = new IOException();
        ioe.initCause(ex.getCause());
        throw ioe;
    }
}

// BPOfferService#start
void start() {
    for (BPServiceActor actor : bpServices) {
        actor.start();
    }
}


// BPServiceActor#start
//This must be called only by BPOfferService
void start() {
    if ((bpThread != null) && (bpThread.isAlive())) {
        //Thread is started already
        return;
    }
    bpThread = new Thread(this);
    bpThread.setDaemon(true); // needed for JUnit testing
    bpThread.start();

    if (lifelineSender != null) {
        lifelineSender.start();
    }
} 

通过BPServiceActor#start方法可以看出,使用this对象作为线程启动,即调用BPServiceActor#run()方法:

这里包括两个流程:

  1. dn注册:connectToNNAndHandshake
  2. 发送心跳:offerService
@Override
public void run() {
    LOG.info(this + " starting to offer service");

    try {
        while (true) {
            // init stuff
            try {
                // setup storage
                // bp连接到namenode,注册datanode
                connectToNNAndHandshake();
                break;
            } catch (IOException ioe) {
                // Initial handshake, storage recovery or registration failed
                runningState = RunningState.INIT_FAILED;
                if (shouldRetryInit()) {
                    // Retry until all namenode's of BPOS failed initialization
                    LOG.error("Initialization failed for " + this + " "
                              + ioe.getLocalizedMessage());
                    sleepAndLogInterrupts(5000, "initializing");
                } else {
                    runningState = RunningState.FAILED;
                    LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
                    return;
                }
            }
        }

        runningState = RunningState.RUNNING;
        if (initialRegistrationComplete != null) {
            initialRegistrationComplete.countDown();
        }

        while (shouldRun()) {
            try {
                offerService();
            } catch (Exception ex) {
                LOG.error("Exception in BPOfferService for " + this, ex);
                sleepAndLogInterrupts(5000, "offering service");
            }
        }
        runningState = RunningState.EXITED;
    } catch (Throwable ex) {
        LOG.warn("Unexpected exception in block pool " + this, ex);
        runningState = RunningState.FAILED;
    } finally {
        LOG.warn("Ending block pool service for: " + this);
        cleanUp();
    }
}

// 注册dn到nn
private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    // 这里会创建一个DatanodeProtocolClientSideTranslatorPB对象
    // 此对象的作用是将client端的请求转到rpc server端上
    bpNamenode = dn.connectToNN(nnAddr);

    // First phase of the handshake with NN - get the namespace
    // info.
    NamespaceInfo nsInfo = retrieveNamespaceInfo();

    // Verify that this matches the other NN in this HA pair.
    // This also initializes our block pool in the DN if we are
    // the first NN connection for this BP.
    bpos.verifyAndSetNamespaceInfo(this, nsInfo);

    
    this.bpThread.setName(formatThreadName("heartbeating", nnAddr));

    // Second phase of the handshake with the NN.
    // 第二阶段的握手
    register(nsInfo);
}

// 发送心跳
// 此处略,查看2.3节
2.2 注册过程

上一节提到了注册过程的前提流程,在这里终于走到了dn的注册过程中:

在这个方法中会完成dn的注册

void register(NamespaceInfo nsInfo) throws IOException {
    // The handshake() phase loaded the block pool storage
    // off disk - so update the bpRegistration object from that info
    // 创建dn的注册信息到指定的blockPool
    DatanodeRegistration newBpRegistration = bpos.createRegistration();

    LOG.info(this + " beginning handshake with NN");

    while (shouldRun()) {
        try {
            // Use returned registration from namenode with updated fields
            // 调用DatanodeProtocolClientSideTranslatorPB#registerDatanode完成dn的注册
            newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);
            newBpRegistration.setNamespaceInfo(nsInfo);
            bpRegistration = newBpRegistration;
            break;
        } catch(EOFException e) {  // namenode might have just restarted
            LOG.info("Problem connecting to server: " + nnAddr + " :"
                     + e.getLocalizedMessage());
        } catch(SocketTimeoutException e) {  // namenode is busy
            LOG.info("Problem connecting to server: " + nnAddr);
        } catch(RemoteException e) {
            LOG.warn("RemoteException in register", e);
            throw e;
        } catch(IOException e) {
            LOG.warn("Problem connecting to server: " + nnAddr);
        }
        // Try again in a second
        sleepAndLogInterrupts(1000, "connecting to server");
    }

    if (bpRegistration == null) {
        throw new IOException("DN shut down before block pool registered");
    }

    LOG.info(this + " successfully registered with NN");
    bpos.registrationSucceeded(this, bpRegistration);

    // reset lease id whenever registered to NN.
    // ask for a new lease id at the next heartbeat.
    fullBlockReportLeaseId = 0;

    // random short delay - helps scatter the BR from all DNs
    scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs, true);
}

此方法完成dn的注册,过程中由于RpcEngine是使用的Protobuf,因此注册完成后会使用hadoop自行封装的PBHelper.convert将注册完成后的响应信息中的注册信息从protobuf中转换成DatanodeRegistration

// DatanodeProtocolClientSideTranslatorPB#registerDatanode
@Override
public DatanodeRegistration registerDatanode(DatanodeRegistration registration
      ) throws IOException {
      // 封装注册信息,发送到rpc server(NameNodeRpcServer)
    RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
        .newBuilder().setRegistration(PBHelper.convert(registration));
    RegisterDatanodeResponseProto resp;
    try {
      resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
    } catch (ServiceException se) {
      throw ProtobufHelper.getRemoteException(se);
    }
    return PBHelper.convert(resp.getRegistration());
  }

而后rpc server端接收到代理请求,这里是NameNodeRpcServer

// NameNodeRpcServer#registerDatanode
@Override // DatanodeProtocol
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
    throws IOException {
    checkNNStartup();
    verifySoftwareVersion(nodeReg);
    namesystem.registerDatanode(nodeReg);
    return nodeReg;
}

真实调用FSNamesystem的registerDatanode方法

void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
    writeLock();
    try {
        blockManager.registerDatanode(nodeReg);
    } finally {
        writeUnlock("registerDatanode");
    }
}

// 而后调用blockManager#registerDataNode
public void registerDatanode(DatanodeRegistration nodeReg)
    throws IOException {
    assert namesystem.hasWriteLock();
    // 调用DatanodeManager#registerDatanode完成最后的注册 *** 作
    datanodeManager.registerDatanode(nodeReg);
    // 注册完成后还需要检测一下blockManager的安全模式
    bmSafeMode.checkSafeMode();
}

最后看下DatanodeManager#registerDatanode方法,这里会根据三种不同的情况分别完成注册:

  1. 数据节点已经存在,但是用于服务新的数据存储,因此移除现在的
  2. 重复注册的情况,主要就是更新信息
  3. 处理从未注册的过的新节点注册的情况
public void registerDatanode(DatanodeRegistration nodeReg)
    throws DisallowedDatanodeException, UnresolvedTopologyException {
    InetAddress dnAddress = Server.getRemoteIp();
    if (dnAddress != null) {
        // Mostly called inside an RPC, update ip and peer hostname
        String hostname = dnAddress.getHostName();
        String ip = dnAddress.getHostAddress();
        if (checkIpHostnameInRegistration && !isNameResolved(dnAddress)) {
            // Reject registration of unresolved datanode to prevent performance
            // impact of repetitive DNS lookups later.
            final String message = "hostname cannot be resolved (ip="
                + ip + ", hostname=" + hostname + ")";
            LOG.warn("Unresolved datanode registration: " + message);
            throw new DisallowedDatanodeException(nodeReg, message);
        }
        // update node registration with the ip and hostname from rpc request
        nodeReg.setIpAddr(ip);
        nodeReg.setPeerHostName(hostname);
    }

    try {
        nodeReg.setExportedKeys(blockManager.getBlockKeys());

        // Checks if the node is not on the hosts list.  If it is not, then
        // it will be disallowed from registering. 
        if (!hostConfigManager.isIncluded(nodeReg)) {
            throw new DisallowedDatanodeException(nodeReg);
        }

        NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
                                     + nodeReg + " storage " + nodeReg.getDatanodeUuid());

        DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
        DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
            nodeReg.getIpAddr(), nodeReg.getXferPort());

        // 数据节点已经存在,但是用于服务新的数据存储,因此移除现在的
        if (nodeN != null && nodeN != nodeS) {
            NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
            // nodeN previously served a different data storage, 
            // which is not served by anybody anymore.
            removeDatanode(nodeN);
            // physically remove node from datanodeMap
            wipeDatanode(nodeN);
            nodeN = null;
        }

        // 重新注册的情况
        if (nodeS != null) {
            if (nodeN == nodeS) {
                // The same datanode has been just restarted to serve the same data 
                // storage. We do not need to remove old data blocks, the delta will
                // be calculated on the next block report from the datanode
                if(NameNode.stateChangeLog.isDebugEnabled()) {
                    NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
                                                  + "node restarted.");
                }
            } else {
                // nodeS is found
                        
                NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
                                             + " is replaced by " + nodeReg + " with the same storageID "
                                             + nodeReg.getDatanodeUuid());
            }

            boolean success = false;
            // 更新一些dn描述信息
            try {
                // update cluster map
                getNetworkTopology().remove(nodeS);
                if(shouldCountVersion(nodeS)) {
                    decrementVersionCount(nodeS.getSoftwareVersion());
                }
                nodeS.updateRegInfo(nodeReg);

                nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
                nodeS.setDisallowed(false); // Node is in the include list

                // resolve network location
                if(this.rejectUnresolvedTopologyDN) {
                    nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
                    nodeS.setDependentHostNames(getNetworkDependencies(nodeS));
                } else {
                    nodeS.setNetworkLocation(
                        resolveNetworkLocationWithFallBackToDefaultLocation(nodeS));
                    nodeS.setDependentHostNames(
                        getNetworkDependenciesWithDefault(nodeS));
                }
                getNetworkTopology().add(nodeS);
                resolveUpgradeDomain(nodeS);

                // also treat the registration message as a heartbeat
                heartbeatManager.register(nodeS);
                incrementVersionCount(nodeS.getSoftwareVersion());
                startAdminOperationIfNecessary(nodeS);
                success = true;
            } finally {
                if (!success) {
                    removeDatanode(nodeS);
                    wipeDatanode(nodeS);
                    countSoftwareVersions();
                }
            }
            return;
        }

        DatanodeDescriptor nodeDescr 
            = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
        boolean success = false;
        try {
            // 接下来处理完全新的注册节点
            // resolve network location
            //解析网络信息,将其加入集群的网络拓扑中
            if(this.rejectUnresolvedTopologyDN) {
                nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
                nodeDescr.setDependentHostNames(getNetworkDependencies(nodeDescr));
            } else {
                nodeDescr.setNetworkLocation(
                    resolveNetworkLocationWithFallBackToDefaultLocation(nodeDescr));
                nodeDescr.setDependentHostNames(
                    getNetworkDependenciesWithDefault(nodeDescr));
            }
            nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
            resolveUpgradeDomain(nodeDescr);

            // register new datanode
            // 这里调用addDatanode方法将dn添加到网络拓扑中
            addDatanode(nodeDescr);
            blockManager.getBlockReportLeaseManager().register(nodeDescr);
            // also treat the registration message as a heartbeat
            // no need to update its timestamp
            // because its is done when the descriptor is created
            // 这里将注册完成的dn添加到heartbeatManager中的dn数组中,便于后续维护dns的心跳
            heartbeatManager.addDatanode(nodeDescr);
            heartbeatManager.updateDnStat(nodeDescr);
            incrementVersionCount(nodeReg.getSoftwareVersion());
            startAdminOperationIfNecessary(nodeDescr);
            success = true;
        } finally {
            if (!success) {
                removeDatanode(nodeDescr);
                wipeDatanode(nodeDescr);
                countSoftwareVersions();
            }
        }
    } catch (InvalidTopologyException e) {
        // If the network location is invalid, clear the cached mappings
        // so that we have a chance to re-add this DataNode with the
        // correct network location later.
        List invalidNodeNames = new ArrayList<>(3);
        // clear cache for nodes in IP or Hostname
        invalidNodeNames.add(nodeReg.getIpAddr());
        invalidNodeNames.add(nodeReg.getHostName());
        invalidNodeNames.add(nodeReg.getPeerHostName());
        dnsToSwitchMapping.reloadCachedMappings(invalidNodeNames);
        throw e;
    }
}

addDatanode方法

void addDatanode(final DatanodeDescriptor node) {
    // To keep host2DatanodeMap consistent with datanodeMap,
    // remove  from host2DatanodeMap the datanodeDescriptor removed
    // from datanodeMap before adding node to host2DatanodeMap.
    // 在后面将node添加到host2DatanodeMap中之前先将node移除,防止冲突
    synchronized(this) {
        host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
    }

    networktopology.add(node); // may throw InvalidTopologyException
    host2DatanodeMap.add(node);
    // 检查集群是否是多机架
    checkIfClusterIsNowMultiRack(node);
    resolveUpgradeDomain(node);

    if (LOG.isDebugEnabled()) {
        LOG.debug(getClass().getSimpleName() + ".addDatanode: "
                  + "node " + node + " is added to datanodeMap.");
    }
}
2.3 心跳发送

根据2.1节中的run方法,心跳发送主要是BPServiceActor中的offerService方法

private void offerService() throws Exception {
    LOG.info("For namenode " + nnAddr + " using"
             + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msecs"
             + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msecs"
             + " Initial delay: " + dnConf.initialBlockReportDelayMs + "msecs"
             + "; heartBeatInterval=" + dnConf.heartBeatInterval
             + (lifelineSender != null ?
                "; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));

    //
    // Now loop for a long time....
    //
    while (shouldRun()) {
        try {
            DataNodeFaultInjector.get().startOfferService();
            final long startTime = scheduler.monotonicNow();

            //
            // Every so often, send heartbeat or block-report
            // 是否发送心跳,通过初始化Scheduler时设置的nextHeartbeatTime(
            // nextHeartbeatTime + heartbeatIntervalMs(默认3s))
           // - startTime 是否小于等于0判断
            final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
            HeartbeatResponse resp = null;
            if (sendHeartbeat) {
                //
                // All heartbeat messages include following info:
                // -- Datanode name
                // -- data transfer port
                // -- Total capacity
                // -- Bytes remaining
                //
                boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                    scheduler.isBlockReportDue(startTime);
                if (!dn.areHeartbeatsDisabledForTests()) {
                    // 发送心跳
                    resp = sendHeartBeat(requestBlockReportLease);
                    assert resp != null;
                    if (resp.getFullBlockReportLeaseId() != 0) {
                        if (fullBlockReportLeaseId != 0) {
                            LOG.warn(nnAddr + " sent back a full block report lease " +
                                     "ID of 0x" +
                                     Long.toHexString(resp.getFullBlockReportLeaseId()) +
                                     ", but we already have a lease ID of 0x" +
                                     Long.toHexString(fullBlockReportLeaseId) + ". " +
                                     "Overwriting old lease ID.");
                        }
                        fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
                    }
                    dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime,
                                                 getRpcMetricSuffix());

                    // If the state of this NN has changed (eg STANDBY->ACTIVE)
                    // then let the BPOfferService update itself.
                    //
                    // important that this happens before processCommand below,
                    // since the first heartbeat to a new active might have commands
                    // that we should actually process.
                    bpos.updateActorStatesFromHeartbeat(
                        this, resp.getNameNodeHaState());
                    state = resp.getNameNodeHaState().getState();

                    if (state == HAServiceState.ACTIVE) {
                        handleRollingUpgradeStatus(resp);
                    }
                    commandProcessingThread.enqueue(resp.getCommands());
                }
            }
            if (!dn.areIBRDisabledForTests() &&
                (ibrManager.sendImmediately()|| sendHeartbeat)) {
                ibrManager.sendIBRs(bpNamenode, bpRegistration,
                                    bpos.getBlockPoolId(), getRpcMetricSuffix());
            }

            List cmds = null;
            boolean forceFullBr =
                scheduler.forceFullBlockReport.getAndSet(false);
            if (forceFullBr) {
                LOG.info("Forcing a full block report to " + nnAddr);
            }
            if ((fullBlockReportLeaseId != 0) || forceFullBr) {
                cmds = blockReport(fullBlockReportLeaseId);
                fullBlockReportLeaseId = 0;
            }
            commandProcessingThread.enqueue(cmds);

            if (!dn.areCacheReportsDisabledForTests()) {
                DatanodeCommand cmd = cacheReport();
                commandProcessingThread.enqueue(cmd);
            }

            if (sendHeartbeat) {
                dn.getMetrics().addHeartbeatTotal(
                    scheduler.monotonicNow() - startTime, getRpcMetricSuffix());
            }

            // There is no work to do;  sleep until hearbeat timer elapses, 
            // or work arrives, and then iterate again.
            ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
        } catch(RemoteException re) {
            String reClass = re.getClassName();
            if (UnregisteredNodeException.class.getName().equals(reClass) ||
                DisallowedDatanodeException.class.getName().equals(reClass) ||
                IncorrectVersionException.class.getName().equals(reClass)) {
                LOG.warn(this + " is shutting down", re);
                shouldServiceRun = false;
                return;
            }
            LOG.warn("RemoteException in offerService", re);
            sleepAfterException();
        } catch (IOException e) {
            LOG.warn("IOException in offerService", e);
            sleepAfterException();
        } finally {
            DataNodeFaultInjector.get().endOfferService();
        }
        processQueueMessages();
    } // while (shouldRun())
} // offerService

进入BPServiceActor#sendHeartBeat

HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
    throws IOException {
    scheduler.scheduleNextHeartbeat();
    StorageReport[] reports =
        dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
    if (LOG.isDebugEnabled()) {
        LOG.debug("Sending heartbeat with " + reports.length +
                  " storage reports from service actor: " + this);
    }

    final long now = monotonicNow();
    scheduler.updateLastHeartbeatTime(now);
    VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
        .getVolumeFailureSummary();
    int numFailedVolumes = volumeFailureSummary != null ?
        volumeFailureSummary.getFailedStorageLocations().length : 0;
    final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
    final SlowPeerReports slowPeers =
        outliersReportDue && dn.getPeerMetrics() != null ?
        SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
    SlowPeerReports.EMPTY_REPORT;
    // 获取各个disk的延迟数据,包括readio writeio metadataio
    final SlowDiskReports slowDisks =
        outliersReportDue && dn.getDiskMetrics() != null ?
        SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
    SlowDiskReports.EMPTY_REPORT;

    // 发送心跳,bpNamenode是DatanodeProtocolClientSideTranslatorPB实例,
    // 最终调用namenodeRpcServer的sendHeartbeat
    HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
                                             reports,
                                             dn.getFSDataset().getCacheCapacity(),
                                             dn.getFSDataset().getCacheUsed(),
                                             dn.getXmitsInProgress(),
                                             dn.getXceiverCount(),
                                             numFailedVolumes,
                                             volumeFailureSummary,
                                             requestBlockReportLease,
                                             slowPeers,
                                             slowDisks);

    if (outliersReportDue) {
        // If the report was due and successfully sent, schedule the next one.
        scheduler.scheduleNextOutlierReport();
    }

    return response;
}

而后bpNamenode.sendHeartBeat发送心跳,NameNodeRpcServer接受到心跳:

// NameNodeRpcServer#sendHeartbeat
@Override // DatanodeProtocol
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
                           StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
                           int xmitsInProgress, int xceiverCount,
                           int failedVolumes, VolumeFailureSummary volumeFailureSummary,
                           boolean requestFullBlockReportLease,
                           @Nonnull SlowPeerReports slowPeers,
                           @Nonnull SlowDiskReports slowDisks) throws IOException {
    checkNNStartup();
    verifyRequest(nodeReg);
    // 这里调用FSNamesystem#handleHeartbeat
    return namesystem.handleHeartbeat(nodeReg, report,
                                      dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
                                      failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
                                      slowPeers, slowDisks);
}

FSNamesystem#handleHeartbeat方法处理心跳,在此过程中主要是调用DatanodeManager#handleHeartbeat

HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
                           StorageReport[] reports, long cacheCapacity, long cacheUsed,
                           int xceiverCount, int xmitsInProgress, int failedVolumes,
                           VolumeFailureSummary volumeFailureSummary,
                           boolean requestFullBlockReportLease,
                           @Nonnull SlowPeerReports slowPeers,
                           @Nonnull SlowDiskReports slowDisks) throws IOException {
    readLock();
    try {
        //get datanode commands
        final int maxTransfer = blockManager.getMaxReplicationStreams()
            - xmitsInProgress;
        // 这里调用DatanodeManager#handleHeartbeat方法处理心跳,返回心跳过程中携带的命令
        DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
            nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
            xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
            slowPeers, slowDisks);
        long blockReportLeaseId = 0;
        if (requestFullBlockReportLease) {
            blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
        }

        //create ha status
        final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
            haContext.getState().getServiceState(),
            getFSImage().getCorrectLastAppliedOrWrittenTxId());

        // 返回一个心跳响应
        return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
                                     blockReportLeaseId);
    } finally {
        readUnlock("handleHeartbeat");
    }
}

在DatanodeManager#handleHeartbeat中心跳的具体流程如下:

  1. 先获取datanode的信息,判断是否允许连接(比如在exclude中),如果不允许的话,直接抛出异常。
  2. 判断是否注册过,如果没注册过,直接返回注册命令
  3. 更新datanode的信息,主要就是更新DatanodeDescriptor中的信息,如使用空间,剩余空间等。
  4. 检查是否处于安全模式
  5. 获取块恢复命令:blockRecoverCommand
  6. 针对block执行处理命令
    1. 获取numReplicationTasks个普通block做后续处理
      1. 判断block是否被移除
      2. 生成复制命令
    2. 获取numECTasks个ecblock做后续处理
  7. 生成删除的命令
  8. 生成缓存相关的命令
  9. 生成带宽相关的命令
  10. 返回所有的命令
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
                                 StorageReport[] reports, final String blockPoolId,
                                 long cacheCapacity, long cacheUsed, int xceiverCount, 
                                 int maxTransfers, int failedVolumes,
                                 VolumeFailureSummary volumeFailureSummary,
                                 @Nonnull SlowPeerReports slowPeers,
                                 @Nonnull SlowDiskReports slowDisks) throws IOException {
    final DatanodeDescriptor nodeinfo;
    try {
        // 先获取datanode的信息
        nodeinfo = getDatanode(nodeReg);
    } catch (UnregisteredNodeException e) {
        return new DatanodeCommand[]{RegisterCommand.REGISTER};
    }

    // Check if this datanode should actually be shutdown instead.
    // 1.判断是否允许连接,如果不允许的话,直接抛出异常
    if (nodeinfo != null && nodeinfo.isDisallowed()) {
        setDatanodeDead(nodeinfo);
        throw new DisallowedDatanodeException(nodeinfo);
    }

    // 2. 判断是否注册过,如果没注册过,直接返回注册命令
    if (nodeinfo == null || !nodeinfo.isRegistered()) {
        return new DatanodeCommand[]{RegisterCommand.REGISTER};
    }
    // 3.更新dn信息
    heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
                                     cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);

    // If we are in safemode, do not send back any recovery / replication
    // requests. Don't even drain the existing queue of work.
    // 4.检查是否处于安全模式
    if (namesystem.isInSafeMode()) {
        return new DatanodeCommand[0];
    }

    // block recovery command
    // 5. 获取blockRecoverCommand
    final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId,
                                                                   nodeinfo);
    if (brCommand != null) {
        return new DatanodeCommand[]{brCommand};
    }

    final List cmds = new ArrayList<>();
    // Allocate _approximately_ maxTransfers pending tasks to DataNode.
    // NN chooses pending tasks based on the ratio between the lengths of
    // replication and erasure-coded block queues.
    int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
    int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
    int totalBlocks = totalReplicateBlocks + totalECBlocks;
    // 6. 针对block执行处理命令
    if (totalBlocks > 0) {
        int numReplicationTasks = (int) Math.ceil(
            (double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
        int numECTasks = (int) Math.ceil(
            (double) (totalECBlocks * maxTransfers) / totalBlocks);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Pending replication tasks: " + numReplicationTasks
                      + " erasure-coded tasks: " + numECTasks);
        }
        // check pending replication tasks
        // 获取numReplicationTasks个block做后续处理
        List pendingList = nodeinfo.getReplicationCommand(
            numReplicationTasks);
        if (pendingList != null && !pendingList.isEmpty()) {
            // If the block is deleted, the block size will become
            // BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't
            // need
            // to send for replication or reconstruction
            Iterator iterator = pendingList.iterator();
            while (iterator.hasNext()) {
                BlockTargetPair cmd = iterator.next();
                // 判断是否已经被移除了
                if (cmd.block != null
                    && cmd.block.getNumBytes() == BlockCommand.NO_ACK) {
                    // block deleted
                    DatanodeStorageInfo.decrementBlocksScheduled(cmd.targets);
                    iterator.remove();
                }
            }
            // 添加复制的命令
            if (!pendingList.isEmpty()) {
                cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
                                          pendingList));
            }
        }
        // check pending erasure coding tasks
        // 获取numEcTasks个ecBlock做后续处理
        List pendingECList = nodeinfo
            .getErasureCodeCommand(numECTasks);
        if (pendingECList != null && !pendingECList.isEmpty()) {
            cmds.add(new BlockECReconstructionCommand(
                DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
        }
    }

    // check block invalidation
    // 7.添加删除块的命令
    Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
    if (blks != null) {
        cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,
                                  blks));
    }
    // cache commands
    // 8.添加缓存相关命令
    addCacheCommands(blockPoolId, nodeinfo, cmds);
    // key update command
    // 更新access key
    blockManager.addKeyUpdateCommand(cmds, nodeinfo);

    // check for balancer bandwidth update
    // 9.添加带宽相关命令
    if (nodeinfo.getBalancerBandwidth() > 0) {
        cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
        // set back to 0 to indicate that datanode has been sent the new value
        nodeinfo.setBalancerBandwidth(0);
    }

    if (slowPeerTracker != null) {
        final Map slowPeersMap = slowPeers.getSlowPeers();
        if (!slowPeersMap.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
                          slowPeersMap);
            }
            for (String slowNodeId : slowPeersMap.keySet()) {
                slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
            }
        }
    }

    if (slowDiskTracker != null) {
        if (!slowDisks.getSlowDisks().isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("DataNode " + nodeReg + " reported slow disks: " +
                          slowDisks.getSlowDisks());
            }
            slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks);
        }
        slowDiskTracker.checkAndUpdateReportIfNecessary();
    }

    // 10. 返回所有命令
    if (!cmds.isEmpty()) {
        return cmds.toArray(new DatanodeCommand[cmds.size()]);
    }

    return new DatanodeCommand[0];
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5638751.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)