在文档(8)中分析了namenode的构造方法,其中的initialize方法会加载 namenode的元数据并启动一些服务。其中加载原数据是通过调用FSNamesystem 类的loadFromDisk方法来实现的。在这个方法中会创建FSImage对象, FSNamesystem对象,并使用FSNamesystem对象加载元数据。
在文档(8)中详细分析了FSImage的创建,这里继续分析 FSNamesystem对象的创建。其构造方法如下:
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache) throws IOException { provider = DFSUtil.createKeyProviderCryptoExtension(conf); if (provider == null) { LOG.info("No KeyProvider found."); } else { LOG.info("Found KeyProvider: " + provider.toString()); } if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) { LOG.info("Enabling async auditlog"); enableAsyncAuditLog(); } fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics); cond = fsLock.newWriteLockCondition(); cpLock = new ReentrantLock(); this.fsImage = fsImage; try { resourceRecheckInterval = conf.getLong( DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY, DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT); this.blockManager = new BlockManager(this, conf); this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics(); this.blockIdManager = new BlockIdManager(blockManager); this.fsOwner = UserGroupInformation.getCurrentUser(); this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY, DFS_PERMISSIONS_ENABLED_DEFAULT); LOG.info("fsOwner = " + fsOwner); LOG.info("supergroup = " + supergroup); LOG.info("isPermissionEnabled = " + isPermissionEnabled); // block allocation has to be persisted in HA using a shared edits directory // so that the standby has up-to-date namespace information nameserviceId = DFSUtil.getNamenodeNameServiceId(conf); this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId); // Sanity check the HA-related config. if (nameserviceId != null) { LOG.info("Determined nameservice ID: " + nameserviceId); } LOG.info("HA Enabled: " + haEnabled); if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) { LOG.warn("Configured NNs:n" + DFSUtil.nnAddressesAsString(conf)); throw new IOException("Invalid configuration: a shared edits dir " + "must not be specified if HA is not enabled."); } // Get the checksum type from config String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT); DataChecksum.Type checksumType; try { checksumType = DataChecksum.Type.valueOf(checksumTypeStr); } catch (IllegalArgumentException iae) { throw new IOException("Invalid checksum type in " + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr); } this.serverDefaults = new FsServerDefaults( conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT), conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT), conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT), (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT), conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT), conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT), checksumType); this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, DFS_NAMENODE_MAX_OBJECTS_DEFAULT); this.minBlockSize = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT); this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT); this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT); this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT); LOG.info("Append Enabled: " + supportAppends); this.dtpReplaceDatanodeonFailure = ReplaceDatanodeOnFailure.get(conf); this.standbyShouldCheckpoint = conf.getBoolean( DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT); // # edit autoroll threshold is a multiple of the checkpoint threshold this.editLogRollerThreshold = (long) (conf.getFloat( DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD, DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT) * conf.getLong( DFS_NAMENODE_CHECKPOINT_TXNS_KEY, DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT)); this.editLogRollerInterval = conf.getInt( DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS, DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT); this.lazyPersistFileScrubIntervalSec = conf.getInt( DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT); if (this.lazyPersistFileScrubIntervalSec == 0) { throw new IllegalArgumentException( DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC + " must be non-zero."); } // For testing purposes, allow the DT secret manager to be started regardless // of whether security is enabled. alwaysUseDelegationTokensForTests = conf.getBoolean( DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT); this.dtSecretManager = createDelegationTokenSecretManager(conf); this.dir = new FSDirectory(this, conf); this.snapshotManager = new SnapshotManager(dir); this.cacheManager = new CacheManager(this, conf, blockManager); this.safeMode = new SafeModeInfo(conf); this.topConf = new TopConf(conf); this.auditLoggers = initAuditLoggers(conf); this.isDefaultAuditLogger = auditLoggers.size() == 1 && auditLoggers.get(0) instanceof DefaultAuditLogger; this.retryCache = ignoreRetryCache ? null : initRetryCache(conf); Class extends INodeAttributeProvider> klass = conf.getClass( DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY, null, INodeAttributeProvider.class); if (klass != null) { inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf); LOG.info("Using INode attribute provider: " + klass.getName()); } } catch(IOException e) { LOG.error(getClass().getSimpleName() + " initialization failed.", e); close(); throw e; } catch (RuntimeException re) { LOG.error(getClass().getSimpleName() + " initialization failed.", re); close(); throw re; } }
这个方法很长,主要是对参数进行赋值。其中较为重要的是第24行 的BlockManager,它负责管理集群的块信息,还有就是第117行创建的 FSDirectory对象,它负责管理namenode的目录结构。
接着继续分析FSNamesystem对象加载元数据的方法,即loadFSImage 方法。其内容如下:
private void loadFSImage(StartupOption startOpt) throws IOException { final FSImage fsImage = getFSImage(); // format before starting up if requested if (startOpt == StartupOption.FORMAT) { fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id startOpt = StartupOption.REGULAR; } boolean success = false; writeLock(); try { // We shouldn't be calling saveNamespace if we've come up in standby state. metaRecoveryContext recovery = startOpt.createRecoveryContext(); final boolean staleImage = fsImage.recoverTransitionRead(startOpt, this, recovery); if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) || RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) { rollingUpgradeInfo = null; } final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade(); LOG.info("Need to save fs image? " + needToSave + " (staleImage=" + staleImage + ", haEnabled=" + haEnabled + ", isRollingUpgrade=" + isRollingUpgrade() + ")"); if (needToSave) { fsImage.saveNamespace(this); } else { updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(), startOpt); // No need to save, so mark the phase done. StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.SAVING_CHECKPOINT); prog.endPhase(Phase.SAVING_CHECKPOINT); } // This will start a new log segment and write to the seen_txid file, so // we shouldn't do it when coming up in standby state if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE) || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) { fsImage.openEditLogForWrite(); } success = true; } finally { if (!success) { fsImage.close(); } writeUnlock("loadFSImage"); } imageLoadComplete(); }
这个方法的重点在第17行,这里调用了fsImage的 recoverTransitionRead方法,这里的fsImage是通过getFSImage 方法获得的。而这个getFSImage方法很简单返回的就是其自身的fsImage 属性,而这个fsImage属性是在这个FSNamesystem对象创建的时候传入的 (可参考上述构造方法)。而传入fsImage是文档(8)中分析创建的对象。
这里调用的recoverTransitionRead方法内容如下:
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target, metaRecoveryContext recovery) throws IOException { assert startOpt != StartupOption.FORMAT : "NameNode formatting should be performed before reading the image"; CollectionimageDirs = storage.getImageDirectories(); Collection editsDirs = editLog.getEditURIs(); // none of the data dirs exist if((imageDirs.size() == 0 || editsDirs.size() == 0) && startOpt != StartupOption.import) throw new IOException( "All specified directories are not accessible or do not exist."); // 1. For each data directory calculate its state and // check whether all is consistent before transitioning. Map dataDirStates = new HashMap (); boolean isFormatted = recoverStorageDirs(startOpt, storage, dataDirStates); if (LOG.isTraceEnabled()) { LOG.trace("Data dir states:n " + Joiner.on("n ").withKeyValueSeparator(": ") .join(dataDirStates)); } if (!isFormatted && startOpt != StartupOption.ROLLBACK && startOpt != StartupOption.import) { throw new IOException("NameNode is not formatted."); } int layoutVersion = storage.getLayoutVersion(); if (startOpt == StartupOption.metaDATAVERSION) { System.out.println("HDFS Image Version: " + layoutVersion); System.out.println("Software format version: " + HdfsConstants.NAMENODE_LAYOUT_VERSION); return false; } if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) { NNStorage.checkVersionUpgradable(storage.getLayoutVersion()); } if (startOpt != StartupOption.UPGRADE && startOpt != StartupOption.UPGRADEonLY && !RollingUpgradeStartupOption.STARTED.matches(startOpt) && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION && layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) { throw new IOException( "nFile system image contains an old layout version " + storage.getLayoutVersion() + ".nAn upgrade to version " + HdfsConstants.NAMENODE_LAYOUT_VERSION + " is required.n" + "Please restart NameNode with the "" + RollingUpgradeStartupOption.STARTED.getOptionString() + "" option if a rolling upgrade is already started;" + " or restart NameNode with the "" + StartupOption.UPGRADE.getName() + "" option to start" + " a new upgrade."); } storage.processStartupOptionsForUpgrade(startOpt, layoutVersion); // 2. Format unformatted dirs. for (Iterator it = storage.dirIterator(); it.hasNext();) { StorageDirectory sd = it.next(); StorageState curState = dataDirStates.get(sd); switch(curState) { case NON_EXISTENT: throw new IOException(StorageState.NON_EXISTENT + " state cannot be here"); case NOT_FORMATTED: // Create a dir structure, but not the VERSION file. The presence of // VERSION is checked in the inspector's needToSave() method and // saveNamespace is triggered if it is absent. This will bring // the storage state uptodate along with a new VERSION file. // If HA is enabled, NNs start up as standby so saveNamespace is not // triggered. LOG.info("Storage directory " + sd.getRoot() + " is not formatted."); LOG.info("Formatting ..."); sd.clearDirectory(); // create empty currrent dir // For non-HA, no further action is needed here, as saveNamespace will // take care of the rest. if (!target.isHaEnabled()) { continue; } // If HA is enabled, save the dirs to create a version file later when // a checkpoint image is saved. if (newDirs == null) { newDirs = new HashSet (); } newDirs.add(sd); break; default: break; } } // 3. Do transitions switch(startOpt) { case UPGRADE: case UPGRADEONLY: doUpgrade(target); return false; // upgrade saved image already case import: doimportCheckpoint(target); return false; // import checkpoint saved image already case ROLLBACK: throw new AssertionError("Rollback is now a standalone command, " + "NameNode should not be starting with this option."); case REGULAR: default: // just load the image } return loadFSImage(target, startOpt, recovery); }
这段代码的重点就在最后一行,这里调用了FSImage类的 loadFSImage方法来加载数据。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)