Hadoop源码分析(9)

Hadoop源码分析(9),第1张

Hadoop源码分析(9) namenode加载元数据

 在文档(8)中分析了namenode的构造方法,其中的initialize方法会加载 namenode的元数据并启动一些服务。其中加载原数据是通过调用FSNamesystem 类的loadFromDisk方法来实现的。在这个方法中会创建FSImage对象, FSNamesystem对象,并使用FSNamesystem对象加载元数据。

 在文档(8)中详细分析了FSImage的创建,这里继续分析 FSNamesystem对象的创建。其构造方法如下:

 FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
      throws IOException {
    provider = DFSUtil.createKeyProviderCryptoExtension(conf);
    if (provider == null) {
      LOG.info("No KeyProvider found.");
    } else {
      LOG.info("Found KeyProvider: " + provider.toString());
    }
    if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
                        DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
      LOG.info("Enabling async auditlog");
      enableAsyncAuditLog();
    }
    fsLock = new FSNamesystemLock(conf, detailedLockHoldTimeMetrics);
    cond = fsLock.newWriteLockCondition();
    cpLock = new ReentrantLock();

    this.fsImage = fsImage;
    try {
      resourceRecheckInterval = conf.getLong(
          DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
          DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);

      this.blockManager = new BlockManager(this, conf);
      this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
      this.blockIdManager = new BlockIdManager(blockManager);

      this.fsOwner = UserGroupInformation.getCurrentUser();
      this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, 
                                 DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
      this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
                                                 DFS_PERMISSIONS_ENABLED_DEFAULT);
      LOG.info("fsOwner             = " + fsOwner);
      LOG.info("supergroup          = " + supergroup);
      LOG.info("isPermissionEnabled = " + isPermissionEnabled);

      // block allocation has to be persisted in HA using a shared edits directory
      // so that the standby has up-to-date namespace information
      nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
      this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);  

      // Sanity check the HA-related config.
      if (nameserviceId != null) {
        LOG.info("Determined nameservice ID: " + nameserviceId);
      }
      LOG.info("HA Enabled: " + haEnabled);
      if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) {
        LOG.warn("Configured NNs:n" + DFSUtil.nnAddressesAsString(conf));
        throw new IOException("Invalid configuration: a shared edits dir " +
            "must not be specified if HA is not enabled.");
      }

      // Get the checksum type from config
      String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY, DFS_CHECKSUM_TYPE_DEFAULT);
      DataChecksum.Type checksumType;
      try {
         checksumType = DataChecksum.Type.valueOf(checksumTypeStr);
      } catch (IllegalArgumentException iae) {
         throw new IOException("Invalid checksum type in "
            + DFS_CHECKSUM_TYPE_KEY + ": " + checksumTypeStr);
      }

      this.serverDefaults = new FsServerDefaults(
          conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
          conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
          conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
          (short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
          conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
          conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, DFS_ENCRYPT_DATA_TRANSFER_DEFAULT),
          conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
          checksumType);

      this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, 
                                       DFS_NAMENODE_MAX_OBJECTS_DEFAULT);

      this.minBlockSize = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY,
          DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
      this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY,
          DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT);
      this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
          DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT);
      this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
      LOG.info("Append Enabled: " + supportAppends);

      this.dtpReplaceDatanodeonFailure = ReplaceDatanodeOnFailure.get(conf);

      this.standbyShouldCheckpoint = conf.getBoolean(
          DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
      // # edit autoroll threshold is a multiple of the checkpoint threshold 
      this.editLogRollerThreshold = (long)
          (conf.getFloat(
              DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD,
              DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT) *
          conf.getLong(
              DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
              DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT));
      this.editLogRollerInterval = conf.getInt(
          DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS,
          DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);

      this.lazyPersistFileScrubIntervalSec = conf.getInt(
          DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
          DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT);

      if (this.lazyPersistFileScrubIntervalSec == 0) {
        throw new IllegalArgumentException(
            DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC + " must be non-zero.");
      }

      // For testing purposes, allow the DT secret manager to be started regardless
      // of whether security is enabled.
      alwaysUseDelegationTokensForTests = conf.getBoolean(
          DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
          DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);

      this.dtSecretManager = createDelegationTokenSecretManager(conf);
      this.dir = new FSDirectory(this, conf);
      this.snapshotManager = new SnapshotManager(dir);
      this.cacheManager = new CacheManager(this, conf, blockManager);
      this.safeMode = new SafeModeInfo(conf);
      this.topConf = new TopConf(conf);
      this.auditLoggers = initAuditLoggers(conf);
      this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
        auditLoggers.get(0) instanceof DefaultAuditLogger;
      this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
      Class klass = conf.getClass(
          DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY,
          null, INodeAttributeProvider.class);
      if (klass != null) {
        inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf);
        LOG.info("Using INode attribute provider: " + klass.getName());
      }
    } catch(IOException e) {
      LOG.error(getClass().getSimpleName() + " initialization failed.", e);
      close();
      throw e;
    } catch (RuntimeException re) {
      LOG.error(getClass().getSimpleName() + " initialization failed.", re);
      close();
      throw re;
    }
  }

  这个方法很长,主要是对参数进行赋值。其中较为重要的是第24行 的BlockManager,它负责管理集群的块信息,还有就是第117行创建的 FSDirectory对象,它负责管理namenode的目录结构。

 接着继续分析FSNamesystem对象加载元数据的方法,即loadFSImage 方法。其内容如下:

 private void loadFSImage(StartupOption startOpt) throws IOException {
    final FSImage fsImage = getFSImage();

    // format before starting up if requested
    if (startOpt == StartupOption.FORMAT) {

      fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id

      startOpt = StartupOption.REGULAR;
    }
    boolean success = false;
    writeLock();
    try {
      // We shouldn't be calling saveNamespace if we've come up in standby state.
      metaRecoveryContext recovery = startOpt.createRecoveryContext();
      final boolean staleImage
          = fsImage.recoverTransitionRead(startOpt, this, recovery);
      if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) ||
          RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
        rollingUpgradeInfo = null;
      }
      final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade(); 
      LOG.info("Need to save fs image? " + needToSave
          + " (staleImage=" + staleImage + ", haEnabled=" + haEnabled
          + ", isRollingUpgrade=" + isRollingUpgrade() + ")");
      if (needToSave) {
        fsImage.saveNamespace(this);
      } else {
        updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(),
            startOpt);
        // No need to save, so mark the phase done.
        StartupProgress prog = NameNode.getStartupProgress();
        prog.beginPhase(Phase.SAVING_CHECKPOINT);
        prog.endPhase(Phase.SAVING_CHECKPOINT);
      }
      // This will start a new log segment and write to the seen_txid file, so
      // we shouldn't do it when coming up in standby state
      if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
          || (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
        fsImage.openEditLogForWrite();
      }
      success = true;
    } finally {
      if (!success) {
        fsImage.close();
      }
      writeUnlock("loadFSImage");
    }
    imageLoadComplete();
  }

 这个方法的重点在第17行,这里调用了fsImage的 recoverTransitionRead方法,这里的fsImage是通过getFSImage 方法获得的。而这个getFSImage方法很简单返回的就是其自身的fsImage 属性,而这个fsImage属性是在这个FSNamesystem对象创建的时候传入的 (可参考上述构造方法)。而传入fsImage是文档(8)中分析创建的对象。

 这里调用的recoverTransitionRead方法内容如下:

boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
      metaRecoveryContext recovery)
      throws IOException {
    assert startOpt != StartupOption.FORMAT : 
      "NameNode formatting should be performed before reading the image";

    Collection imageDirs = storage.getImageDirectories();
    Collection editsDirs = editLog.getEditURIs();

    // none of the data dirs exist
    if((imageDirs.size() == 0 || editsDirs.size() == 0) 
                             && startOpt != StartupOption.import)  
      throw new IOException(
          "All specified directories are not accessible or do not exist.");

    // 1. For each data directory calculate its state and 
    // check whether all is consistent before transitioning.
    Map dataDirStates = 
             new HashMap();
    boolean isFormatted = recoverStorageDirs(startOpt, storage, dataDirStates);

    if (LOG.isTraceEnabled()) {
      LOG.trace("Data dir states:n  " +
        Joiner.on("n  ").withKeyValueSeparator(": ")
        .join(dataDirStates));
    }

    if (!isFormatted && startOpt != StartupOption.ROLLBACK 
                     && startOpt != StartupOption.import) {
      throw new IOException("NameNode is not formatted.");      
    }


    int layoutVersion = storage.getLayoutVersion();
    if (startOpt == StartupOption.metaDATAVERSION) {
      System.out.println("HDFS Image Version: " + layoutVersion);
      System.out.println("Software format version: " +
        HdfsConstants.NAMENODE_LAYOUT_VERSION);
      return false;
    }

    if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
      NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
    }
    if (startOpt != StartupOption.UPGRADE
        && startOpt != StartupOption.UPGRADEonLY
        && !RollingUpgradeStartupOption.STARTED.matches(startOpt)
        && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
        && layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
      throw new IOException(
          "nFile system image contains an old layout version " 
          + storage.getLayoutVersion() + ".nAn upgrade to version "
          + HdfsConstants.NAMENODE_LAYOUT_VERSION + " is required.n"
          + "Please restart NameNode with the ""
          + RollingUpgradeStartupOption.STARTED.getOptionString()
          + "" option if a rolling upgrade is already started;"
          + " or restart NameNode with the ""
          + StartupOption.UPGRADE.getName() + "" option to start"
          + " a new upgrade.");
    }

    storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);

    // 2. Format unformatted dirs.
    for (Iterator it = storage.dirIterator(); it.hasNext();) {
      StorageDirectory sd = it.next();
      StorageState curState = dataDirStates.get(sd);
      switch(curState) {
      case NON_EXISTENT:
        throw new IOException(StorageState.NON_EXISTENT + 
                              " state cannot be here");
      case NOT_FORMATTED:
        // Create a dir structure, but not the VERSION file. The presence of
        // VERSION is checked in the inspector's needToSave() method and
        // saveNamespace is triggered if it is absent. This will bring
        // the storage state uptodate along with a new VERSION file.
        // If HA is enabled, NNs start up as standby so saveNamespace is not
        // triggered.
        LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
        LOG.info("Formatting ...");
        sd.clearDirectory(); // create empty currrent dir
        // For non-HA, no further action is needed here, as saveNamespace will
        // take care of the rest.
        if (!target.isHaEnabled()) {
          continue;
        }
        // If HA is enabled, save the dirs to create a version file later when
        // a checkpoint image is saved.
        if (newDirs == null) {
          newDirs = new HashSet();
        }
        newDirs.add(sd);
        break;
      default:
        break;
      }
    }

    // 3. Do transitions
    switch(startOpt) {
    case UPGRADE:
    case UPGRADEONLY:
      doUpgrade(target);
      return false; // upgrade saved image already
    case import:
      doimportCheckpoint(target);
      return false; // import checkpoint saved image already
    case ROLLBACK:
      throw new AssertionError("Rollback is now a standalone command, "
          + "NameNode should not be starting with this option.");
    case REGULAR:
    default:
      // just load the image
    }

    return loadFSImage(target, startOpt, recovery);
  }

 这段代码的重点就在最后一行,这里调用了FSImage类的 loadFSImage方法来加载数据。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5179293.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-18
下一篇 2022-11-18

发表评论

登录后才能评论

评论列表(0条)

保存