模拟RPC的客户端、服务端、通信协议三者如何工作的1.2、代码编写:
(1)在HDFSClient项目基础上创建包名com.atguigu.rpc
(2)创建RPC协议
package com.song.rpc; public interface RPCProtocol { long versionID = 666; void mkdirs(String path); }
(3)创建RPC服务端
package com.song.rpc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import java.io.IOException; public class NNServer implements RPCProtocol { @Override public void mkdirs(String path) { System.out.println("服务端,创建路径" + path); } public static void main(String[] args) throws IOException { Server server = new RPC.Builder(new Configuration()) .setBindAddress("localhost") .setPort(8888) .setProtocol(RPCProtocol.class) .setInstance(new NNServer()) .build(); System.out.println("服务器开始工作"); server.start(); } }
(4)创建RPC客户端
package com.song.rpc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import java.io.IOException; import java.net.InetSocketAddress; public class HDFSClient { public static void main(String[] args) throws IOException { RPCProtocol client = RPC.getProxy( RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 8888), new Configuration()); System.out.println("我是客户端"); client.mkdirs("/input"); } }1.2.3、测试
(1)启动服务端
观察控制台打印:服务器开始工作
在控制台Terminal窗口输入,jps,查看到NNServer服务
(2)启动客户端
观察客户端控制台打印:我是客户端
观察服务端控制台打印:服务端,创建路径/input
RPC的客户端调用通信协议方法,方法的执行在服务端;通信协议就是接口规范。
2、NameNode启动源码解析NameNode工作机制
源码启动流程
0)在pom.xml中增加如下依赖
org.apache.hadoop hadoop-client3.1.3 org.apache.hadoop hadoop-hdfs3.1.3 org.apache.hadoop hadoop-hdfs-client3.1.3 provided
1)ctrl + n 全局查找namenode,进入NameNode.java
NameNode官方说明 NameNode serves as both directory namespace manager and "inode table" for the Hadoop DFS. There is a single NameNode running in any DFS deployment. (Well, except when there is a second backup/failover NameNode, or when using federated NameNodes.) The NameNode controls two critical tables: 1) filename->blocksequence (namespace) 2) block->machinelist ("inodes") The first table is stored on disk and is very precious. The second table is rebuilt every time the NameNode comes up. 'NameNode' refers to both this class as well as the 'NameNode server'. The 'FSNamesystem' class actually performs most of the filesystem management. The majority of the 'NameNode' class itself is concerned with exposing the IPC interface and the HTTP server to the outside world, plus some configuration management. NameNode implements the ClientProtocol interface, which allows clients to ask for DFS services. ClientProtocol is not designed for direct use by authors of DFS client code. End-users should instead use the FileSystem class. NameNode also implements the DatanodeProtocol interface, used by DataNodes that actually store DFS data blocks. These methods are invoked repeatedly and automatically by all the DataNodes in a DFS deployment. NameNode also implements the NamenodeProtocol interface, used by secondary namenodes or rebalancing processes to get partial NameNode state, for example partial blocksMap etc.
2)ctrl + f,查找main方法
NameNode.java public static void main(String argv[]) throws Exception { if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) { System.exit(0); } try { StringUtils.startupShutdownMessage(NameNode.class, argv, LOG); // 创建NameNode NameNode namenode = createNameNode(argv, null); if (namenode != null) { namenode.join(); } } catch (Throwable e) { LOG.error("Failed to start namenode.", e); terminate(1, e); } } 点击createNameNode public static NameNode createNameNode(String argv[], Configuration conf) throws IOException { … … StartupOption startOpt = parseArguments(argv); if (startOpt == null) { printUsage(System.err); return null; } setStartupOption(conf, startOpt); boolean aborted = false; switch (startOpt) { case FORMAT: aborted = format(conf, startOpt.getForceFormat(), startOpt.getInteractiveFormat()); terminate(aborted ? 1 : 0); return null; // avoid javac warning case GENCLUSTERID: … … default: DefaultMetricsSystem.initialize("NameNode"); // 创建NameNode对象 return new NameNode(conf); } } 点击NameNode public NameNode(Configuration conf) throws IOException { this(conf, NamenodeRole.NAMENODE); } protected NameNode(Configuration conf, NamenodeRole role) throws IOException { ... ... try { initializeGenericKeys(conf, nsId, namenodeId); initialize(getConf()); ... ... } catch (IOException e) { this.stopAtException(e); throw e; } catch (HadoopIllegalArgumentException e) { this.stopAtException(e); throw e; } this.started.set(true); } 点击initialize protected void initialize(Configuration conf) throws IOException { ... ... if (NamenodeRole.NAMENODE == role) { // 启动HTTP服务端(9870) startHttpServer(conf); } // 加载镜像文件和编辑日志到内存 loadNamesystem(conf); startAliasMapServerIfNecessary(conf); // 创建NN的RPC服务端 rpcServer = createRpcServer(conf); initReconfigurableBackoffKey(); if (clientNamenodeAddress == null) { // This is expected for MiniDFSCluster. Set it now using // the RPC server's bind address. clientNamenodeAddress = NetUtils.getHostPortString(getNameNodeAddress()); LOG.info("Clients are to use " + clientNamenodeAddress + " to access" + " this namenode/service."); } if (NamenodeRole.NAMENODE == role) { httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } // NN启动资源检查 startCommonServices(conf); startMetricsLogger(conf); }1.1、 启动9870端口服务
1)点击startHttpServer
NameNode.java private void startHttpServer(final Configuration conf) throws IOException { httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf)); httpServer.start(); httpServer.setStartupProgress(startupProgress); } protected InetSocketAddress getHttpServerBindAddress(Configuration conf) { InetSocketAddress bindAddress = getHttpServerAddress(conf); ... ... return bindAddress; } protected InetSocketAddress getHttpServerAddress(Configuration conf) { return getHttpAddress(conf); } public static InetSocketAddress getHttpAddress(Configuration conf) { return NetUtils.createSocketAddr( conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT)); } public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT; public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT; int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;
2)点击startHttpServer方法中的httpServer.start();
NameNodeHttpServer.java void start() throws IOException { ... ... // Hadoop自己封装了HttpServer,形成自己的HttpServer2 HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf, httpAddr, httpsAddr, "hdfs", DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY); ... ... httpServer = builder.build(); ... ... httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn); httpServer.setAttribute(JspHelper.CURRENT_CONF, conf); setupServlets(httpServer, conf); httpServer.start(); ... ... } 点击setupServlets private static void setupServlets(HttpServer2 httpServer, Configuration conf) { httpServer.addInternalServlet("startupProgress", StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class); httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true); httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC, ImageServlet.class, true); }1.2、 加载镜像文件和编辑日志
1)点击loadNamesystem
NameNode.java protected void loadNamesystem(Configuration conf) throws IOException { this.namesystem = FSNamesystem.loadFromDisk(conf); } static FSNamesystem loadFromDisk(Configuration conf) throws IOException { checkConfiguration(conf); FSImage fsImage = new FSImage(conf, FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceEditsDirs(conf)); FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false); StartupOption startOpt = NameNode.getStartupOption(conf); if (startOpt == StartupOption.RECOVER) { namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); } long loadStart = monotonicNow(); try { namesystem.loadFSImage(startOpt); } catch (IOException ioe) { LOG.warn("Encountered exception loading fsimage", ioe); fsImage.close(); throw ioe; } long timetakenToLoadFSImage = monotonicNow() - loadStart; LOG.info("Finished loading FSImage in " + timetakenToLoadFSImage + " msecs"); NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics(); if (nnMetrics != null) { nnMetrics.setFsImageLoadTime((int) timetakenToLoadFSImage); } namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime()); return namesystem; }1.3、 初始化NN的RPC服务端
1)点击createRpcServer
NameNode.java protected NameNodeRpcServer createRpcServer(Configuration conf) throws IOException { return new NameNodeRpcServer(conf, this); } NameNodeRpcServer.java public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { ... .... serviceRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()) .setNumHandlers(serviceHandlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build(); ... .... }1.4、 NN启动资源检查
1)点击startCommonServices
NameNode.java private void startCommonServices(Configuration conf) throws IOException { namesystem.startCommonServices(conf, haContext); registerNNSMXBean(); if (NamenodeRole.NAMENODE != role) { startHttpServer(conf); httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } rpcServer.start(); try { plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, ServicePlugin.class); } catch (RuntimeException e) { String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY); LOG.error("Unable to load NameNode plugins. Specified list of plugins: " + pluginsValue, e); throw e; } … … }
2)点击startCommonServices
FSNamesystem.java void startCommonServices(Configuration conf, HAContext haContext) throws IOException { this.registerMBean(); // register the MBean for the FSNamesystemState writeLock(); this.haContext = haContext; try { nnResourceChecker = new NameNodeResourceChecker(conf); // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m)) checkAvailableResources(); assert !blockManager.isPopulatingReplQueues(); StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.SAFEMODE); long completeBlocksTotal = getCompleteBlocksTotal(); // 安全模式 prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, completeBlocksTotal); // 启动块服务 blockManager.activate(conf, completeBlocksTotal); } finally { writeUnlock("startCommonServices"); } registerMXBean(); DefaultMetricsSystem.instance().register(this); if (inodeAttributeProvider != null) { inodeAttributeProvider.start(); dir.setINodeAttributeProvider(inodeAttributeProvider); } snapshotManager.registerMXBean(); InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true); this.nameNodeHostName = (serviceAddress != null) ? serviceAddress.getHostName() : ""; } 点击NameNodeResourceChecker NameNodeResourceChecker.java public NameNodeResourceChecker(Configuration conf) throws IOException { this.conf = conf; volumes = new HashMap1.5、 NN对心跳超时判断(); // dfs.namenode.resource.du.reserved默认值 1024 * 1024 * 100 =》100m duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT); Collection extraCheckedVolumes = Util.stringCollectionAsURIs(conf .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY)); Collection localEditDirs = Collections2.filter( FSNamesystem.getNamespaceEditsDirs(conf), new Predicate () { @Override public boolean apply(URI input) { if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { return true; } return false; } }); // 对所有路径进行资源检查 for (URI editsDirToCheck : localEditDirs) { addDirToCheck(editsDirToCheck, FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains( editsDirToCheck)); } // All extra checked volumes are marked "required" for (URI extraDirToCheck : extraCheckedVolumes) { addDirToCheck(extraDirToCheck, true); } minimumRedundantVolumes = conf.getInt( DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT); } 点击checkAvailableResources FNNamesystem.java void checkAvailableResources() { long resourceCheckTime = monotonicNow(); Preconditions.checkState(nnResourceChecker != null, "nnResourceChecker not initialized"); // 判断资源是否足够,不够返回false hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace(); resourceCheckTime = monotonicNow() - resourceCheckTime; NameNode.getNameNodeMetrics().addResourceCheckTime(resourceCheckTime); } NameNodeResourceChecker.java public boolean hasAvailableDiskSpace() { return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(), minimumRedundantVolumes); } NameNodeResourcePolicy.java static boolean areResourcesAvailable( Collection extends CheckableNameNodeResource> resources, int minimumRedundantResources) { // TODO: workaround: // - during startup, if there are no edits dirs on disk, then there is // a call to areResourcesAvailable() with no dirs at all, which was // previously causing the NN to enter safemode if (resources.isEmpty()) { return true; } int requiredResourceCount = 0; int redundantResourceCount = 0; int disabledRedundantResourceCount = 0; // 判断资源是否充足 for (CheckableNameNodeResource resource : resources) { if (!resource.isRequired()) { redundantResourceCount++; if (!resource.isResourceAvailable()) { disabledRedundantResourceCount++; } } else { requiredResourceCount++; if (!resource.isResourceAvailable()) { // Short circuit - a required resource is not available. 不充足返回false return false; } } } if (redundantResourceCount == 0) { // If there are no redundant resources, return true if there are any // required resources available. return requiredResourceCount > 0; } else { return redundantResourceCount - disabledRedundantResourceCount >= minimumRedundantResources; } } interface CheckableNameNodeResource { public boolean isResourceAvailable(); public boolean isRequired(); } ctrl + h,查找实现类CheckedVolume NameNodeResourceChecker.java public boolean isResourceAvailable() { // 获取当前目录的空间大小 long availableSpace = df.getAvailable(); if (LOG.isDebugEnabled()) { LOG.debug("Space available on volume '" + volume + "' is " + availableSpace); } // 如果当前空间大小,小于100m,返回false if (availableSpace < duReserved) { LOG.warn("Space available on volume '" + volume + "' is " + availableSpace + ", which is below the configured reserved amount " + duReserved); return false; } else { return true; } }
Ctrl + n 搜索namenode,ctrl + f搜索startCommonServices
- 点击namesystem.startCommonServices(conf, haContext);
- 点击blockManager.activate(conf, completeBlocksTotal);
- 点击datanodeManager.activate(conf);
DatanodeManager.java void activate(final Configuration conf) { datanodeAdminManager.activate(conf); heartbeatManager.activate(); } DatanodeManager.java void activate() { // 启动的线程,搜索run方法 heartbeatThread.start(); } public void run() { while(namesystem.isRunning()) { restartHeartbeatStopWatch(); try { final long now = Time.monotonicNow(); if (lastHeartbeatCheck + heartbeatRecheckInterval < now) { // 心跳检查 heartbeatCheck(); lastHeartbeatCheck = now; } if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) { synchronized(HeartbeatManager.this) { for(DatanodeDescriptor d : datanodes) { d.setNeedKeyUpdate(true); } } lastBlockKeyUpdate = now; } } catch (Exception e) { LOG.error("Exception while checking heartbeat", e); } try { Thread.sleep(5000); // 5 seconds } catch (InterruptedException ignored) { } // avoid declaring nodes dead for another cycle if a GC pause lasts // longer than the node recheck interval if (shouldAbortHeartbeatCheck(-5000)) { LOG.warn("Skipping next heartbeat scan due to excessive pause"); lastHeartbeatCheck = Time.monotonicNow(); } } } void heartbeatCheck() { final DatanodeManager dm = blockManager.getDatanodeManager(); boolean allAlive = false; while (!allAlive) { // locate the first dead node. DatanodeDescriptor dead = null; // locate the first failed storage that isn't on a dead node. DatanodeStorageInfo failedStorage = null; // check the number of stale nodes int numOfStaleNodes = 0; int numOfStaleStorages = 0; synchronized(this) { for (DatanodeDescriptor d : datanodes) { // check if an excessive GC pause has occurred if (shouldAbortHeartbeatCheck(0)) { return; } // 判断DN节点是否挂断 if (dead == null && dm.isDatanodeDead(d)) { stats.incrExpiredHeartbeats(); dead = d; } if (d.isStale(dm.getStaleInterval())) { numOfStaleNodes++; } DatanodeStorageInfo[] storageInfos = d.getStorageInfos(); for(DatanodeStorageInfo storageInfo : storageInfos) { if (storageInfo.areBlockContentsStale()) { numOfStaleStorages++; } if (failedStorage == null && storageInfo.areBlocksOnFailedStorage() && d != dead) { failedStorage = storageInfo; } } } // Set the number of stale nodes in the DatanodeManager dm.setNumStaleNodes(numOfStaleNodes); dm.setNumStaleStorages(numOfStaleStorages); } ... ... } } boolean isDatanodeDead(DatanodeDescriptor node) { return (node.getLastUpdateMonotonic() < (monotonicNow() - heartbeatExpireInterval)); } private long heartbeatExpireInterval; // 10分钟 + 30秒 this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds; private volatile int heartbeatRecheckInterval; heartbeatRecheckInterval = conf.getInt( DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes private volatile long heartbeatIntervalSeconds; heartbeatIntervalSeconds = conf.getTimeDuration( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS); public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;1.6、 安全模式
FSNamesystem.java void startCommonServices(Configuration conf, HAContext haContext) throws IOException { this.registerMBean(); // register the MBean for the FSNamesystemState writeLock(); this.haContext = haContext; try { nnResourceChecker = new NameNodeResourceChecker(conf); // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m)) checkAvailableResources(); assert !blockManager.isPopulatingReplQueues(); StartupProgress prog = NameNode.getStartupProgress(); // 开始进入安全模式 prog.beginPhase(Phase.SAFEMODE); // 获取所有可以正常使用的block long completeBlocksTotal = getCompleteBlocksTotal(); prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, completeBlocksTotal); // 启动块服务 blockManager.activate(conf, completeBlocksTotal); } finally { writeUnlock("startCommonServices"); } registerMXBean(); DefaultMetricsSystem.instance().register(this); if (inodeAttributeProvider != null) { inodeAttributeProvider.start(); dir.setINodeAttributeProvider(inodeAttributeProvider); } snapshotManager.registerMXBean(); InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true); this.nameNodeHostName = (serviceAddress != null) ? serviceAddress.getHostName() : ""; } 点击getCompleteBlocksTotal public long getCompleteBlocksTotal() { // Calculate number of blocks under construction long numUCBlocks = 0; readLock(); try { // 获取正在构建的block numUCBlocks = leaseManager.getNumUnderConstructionBlocks(); // 获取所有的块 - 正在构建的block = 可以正常使用的block return getBlocksTotal() - numUCBlocks; } finally { readUnlock("getCompleteBlocksTotal"); } } 点击activate public void activate(Configuration conf, long blockTotal) { pendingReconstruction.start(); datanodeManager.activate(conf); this.redundancyThread.setName("RedundancyMonitor"); this.redundancyThread.start(); storageInfoDefragmenterThread.setName("StorageInfoMonitor"); storageInfoDefragmenterThread.start(); this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); } 点击activate void activate(long total) { assert namesystem.hasWriteLock(); assert status == BMSafeModeStatus.OFF; startTime = monotonicNow(); // 计算是否满足块个数的阈值 setBlockTotal(total); // 判断DataNode节点和块信息是否达到退出安全模式标准 if (areThresholdsMet()) { boolean exitResult = leaveSafeMode(false); Preconditions.checkState(exitResult, "Failed to leave safe mode."); } else { // enter safe mode status = BMSafeModeStatus.PENDING_THRESHOLD; initializeReplQueuesIfNecessary(); reportStatus("STATE* Safe mode ON.", true); lastStatusReport = monotonicNow(); } } 点击setBlockTotal void setBlockTotal(long total) { assert namesystem.hasWriteLock(); synchronized (this) { this.blockTotal = total; // 计算阈值:例如:1000个正常的块 * 0.999 = 999 this.blockThreshold = (long) (total * threshold); } this.blockReplQueueThreshold = (long) (total * replQueueThreshold); } this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT); public static final float DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f; 点击areThresholdsMet private boolean areThresholdsMet() { assert namesystem.hasWriteLock(); // Calculating the number of live datanodes is time-consuming // in large clusters. Skip it when datanodeThreshold is zero. int datanodeNum = 0; if (datanodeThreshold > 0) { datanodeNum = blockManager.getDatanodeManager().getNumLiveDataNodes(); } synchronized (this) { // 已经正常注册的块数 》= 块的最小阈值 》=最小可用DataNode return blockSafe >= blockThreshold && datanodeNum >= datanodeThreshold; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)