2021-12-05 datanode存储相关(一):storage管理

2021-12-05 datanode存储相关(一):storage管理,第1张

2021-12-05 datanode存储相关(一):storage管理

基于源码hadoop-3.3.0

1 概述

众所周知,dn主要是用来存储hadoop集群中的具体的数据的。但实际上,Datanode还是需要保存一部分Datanode自身的元数据的, 这些元数据是通过Datanode磁盘存储上的一些文件目录来保存的。

Datanode可以定义多个存储目录保存数据块,Datanode的多个存储目录存储的数据块并不相同,并且不同的存储目录可以是异构的, 这样的设计可以提高数据块IO的吞吐率[比如多块磁盘]。

1.1 实际存储

下面看一个实际中的存储:

我们简单了解一下:

  • BP-607470660-10.200.50.133-1568802623014:BP表示blockpool,所以这个目录是一个块池目录,块池目录保存了一个块池在当前存储目录下存储的所有数据块, 在Federation部署方式中, Datanode的一个存储目录会包含多个以“BP”开头的块池目录。 BP后面会紧跟一个唯一的随机块池ID, 在这个示例中就是607470660。 接下来的IP地址10.200.50.133是当前块池对应的Namenode的IP地址。 最后一个部分是这个块池的创建时间。
  • VERSION文件:在current中存在一个VERSION文件,截图未显示,内容如下

块池目录的VERSION文件同样包含了文件系统布局版本(layoutVersion) 、 HDFS集群ID(clusterld) 以及创建时间(cTime) 等集群信息。 除此之外, 块池目录的VERSION文件还包含了以下信息。

    • storageType:存储类型,这里是DATA_NODE
  • current/BP-607470660-10.200.50.133-1568802623014/current目录:
    • finalized/rbw:finalized和rbw目录都是用于存储数据块的, 包括数据块文件以及对应的校验和文件。 rbw(replica being written, 正在写入副本) 目录保存了正在由HDFS客户端写入当前Datanode的数据块。 finalized目录包含了已经完成写入 *** 作的数据块, 由于这样的数据块可能非常多, 所以finalized目录会以特定的目录结构存储这些数据块。每个数据块对应 2 个文件,blk 文件存放数据,另外一个以 meta 结尾的存放校验和等元数据
    • VERSION:这里会记录namespaceID,cTime,blockpoolId,layoutVersion
  • current/BP-607470660-10.200.50.133-1568802623014/scanner.cursor:DataNode 会定期的对每个 blk 文件做校验,这个文件是用来记录校验到哪个位置的
  • in_use.lock:被dn线程持有的锁文件,用于防止多个Datanode线程启动并且并发修改这个存储目录
  • lazyPersist: HDFS 2.X中引入了一个新的特性, 用于支持将临时数据写入内存, 然后通过懒持久化(lazyPersist) 方式写入磁盘。 如果用户开启了这个特性,lazyPersist目录就用于将内存中的临时数据懒持久化到磁盘。
1.2 功能划分

Datanode最重要的功能就是管理磁盘上存储的HDFS数据块(block)。

Datanode将这个管理功能切分为两个部分:

① 管理与组织磁盘存储目录(由dfs.data.dir指定) , 如current、previous、 detach、 tmp等, 这个功能由DataStorage类实现;

②管理与组织数据块及其元数据文件, 这个功能主要由FsDatasetImpl(对应每一个存储目录)相关类实现。

本文主要介绍第一部分:磁盘存储目录的管理。

2 磁盘存储目录管理

在dn中负责管理磁盘存储的主要继承结构如下:

因此我们分别了解一下这几个概念。

2.1 Storage

根据注释,此类用于存储storage file信息,本地的storage 信息存储在VERSION文件中,主要包括node的类型,存储布局版本,namespaceId,fs state create time。

本地存储可以驻留在多个目录中。每个目录都应包含与其他目录相同的VERSION 文件。在启动Hadoop服务器(名称节点和数据节点)期间读取它们的本地存储来自他们的信息。

服务器在运行时为每个存储目录持有一个锁,以便其他节点无法启动共享相同的存储。当服务器停止(正常或异常)时释放锁。

Storage是一个抽象类, 为Datanode、 Namenode提供抽象的存储服务。 Storage类管理着当前节点上( 可以是Datanode或者Namenode) 所有的存储目录, 每个存储目录都由一个StorageDirectory对象管理,StorageDirectory类定义了存储目录上的通用 *** 作。 由于HDFS 2.X版本引入了Federation机制 , Datanode会为多个块池保存数据块, HDFS定义了BlockPoolSliceStorage类来管理Datanode上的一个块池, 这个块池分布在Datanode配置的所有存储目录中。Storage用一个线性表字段storageDirs存储它管理的所有StorageDirectory, 并通过Dirlterator迭代器进行遍历。

private final List storageDirs = new CopyOnWriteArrayList<>();
2.1.1 Storage.StorageState

StorageState定义了存储空间可能出现的所有状态。 在升级、 回滚、 升级提交、 检查点等 *** 作中, 节点(Datanode或者Namenode) 的存储空间可能出现各种异常, 例如误 *** 作、 断电、 宕机等情况, 这个时候存储空间就可能处于某种中间状态, 引入中间状态, 有利于HDFS从错误中恢复过来。 存储状态的确定, 是在StorageDirectory.analyzeStorage()方法中进行的.

  public enum StorageState {
    NON_EXISTENT, // 存储不存在
    NOT_FORMATTED, // 存储未格式化
    COMPLETE_UPGRADE, // 完成升级
    RECOVER_UPGRADE,  // 恢复升级
    COMPLETE_FINALIZE, // 完成升级提交
    COMPLETE_ROLLBACK, // 完成回滚 *** 作
    RECOVER_ROLLBACK,  // 恢复回滚
    COMPLETE_CHECKPOINT, // 完成检查点 *** 作
    RECOVER_CHECKPOINT,  // 恢复检查点 *** 作
    NORMAL; // 正常状态
  }
2.1.2 Storage.StorageDirectory

我们知道Datanode和Namenode都可以定义多个存储目录来存储数据,StorageDirectory是Storage的内部类, 定义了管理存储目录的通用方法。

重点字段:

// 存储目录的根, 就是java.io.File文件。
    final File root;              // root directory

    // 指示当前目录是否是共享的。
    // 例如在HA部署中, 不同的Namenode之间共享存储目录,
    // 或者在Federation部署中不同的块池之间共享存储目录。
    final boolean isShared;

    // 当前存储目录的类型。
    final StorageDirType dirType; // storage dir type

    // 独占锁, java.nio.FileLock类型,
    // 用来支持Datanode或者Namenode线程独占存储目录的锁 *** 作。
    FileLock lock;                // storage lock

    // 权限信息
    private final FsPermission permission;

    // 存储目录的标识符。
    private String storageUuid = null;      // Storage directory identifier.

    // 位置信息
    private final StorageLocation location;

StorageDirectory的 *** 作主要包含:

2.1.2.1 获取文件夹

主要是获取存储目录中的各个文件和目录的方法:

包括:

  • current和current Version文件
  • previous和previous Version文件
  • previous tmp目录
  • removed tmp目录
  • finlized tmp目录
  • lastCheckpoint tmp目录
  • previous.checkpoint文件
public File getCurrentDir() {
    if (root == null) {
        return null;
    }
    return new File(root, STORAGE_DIR_CURRENT);
}


public File getVersionFile() {
    if (root == null) {
        return null;
    }
    return new File(new File(root, STORAGE_DIR_CURRENT), STORAGE_FILE_VERSION);
}


public File getPreviousVersionFile() {
    if (root == null) {
        return null;
    }
    return new File(new File(root, STORAGE_DIR_PREVIOUS), STORAGE_FILE_VERSION);
}


public File getPreviousDir() {
    if (root == null) {
        return null;
    }
    return new File(root, STORAGE_DIR_PREVIOUS);
}


public File getPreviousTmp() {
    if (root == null) {
        return null;
    }
    return new File(root, STORAGE_TMP_PREVIOUS);
}


public File getRemovedTmp() {
    if (root == null) {
        return null;
    }
    return new File(root, STORAGE_TMP_REMOVED);
}


public File getFinalizedTmp() {
    if (root == null) {
        return null;
    }
    return new File(root, STORAGE_TMP_FINALIZED);
}


public File getLastCheckpointTmp() {
    if (root == null) {
        return null;
    }
    return new File(root, STORAGE_TMP_LAST_CKPT);
}


public File getPreviousCheckpoint() {
    if (root == null) {
        return null;
    }
    return new File(root, STORAGE_PREVIOUS_CKPT);
}

2.1.2.2 加锁解锁 *** 作

Datanode磁盘存储结构中,存储目录下会有一个in_use.lock文件,这个文件用于对当前存储目录加锁, 以保证Datanode进程对存储目录的独占使用。 当Datanode进程退出执行时, in_use.lock文件会被删除。 StorageDirectory提供了tryLock()与unlock()两个锁方法, 分别实现了对存储目录加锁以及解锁的功能。

StorageDirectory中真正进行加锁 *** 作的是tryLock()方法。tryLock()方法会首先构造锁文件, 然后调用file.getChannel.lock()方法尝试获得存储目录的独占锁, 如果已经有进程占有锁文件, 那么file.getChannel.lock()就会返回一个null的引用, 表明有另一个节点运行在当前的存储目录上, tryLock()方法会抛出异常并退出执行。 如果加锁成功, tryLock()方法会在锁文件中写入虚拟机信息。

加锁成功后, tryLock()方法会调用deleteonExit()方法, 在Java虚拟机运行结束时删除in_use.lock文件。

@SuppressWarnings("resource")
FileLock tryLock() throws IOException {
    boolean deletionHookAdded = false;
    // 构造in_use.lock
    File lockF = new File(root, STORAGE_FILE_LOCK);
    
    // 如果in_use.lock创建失败
    if (!lockF.exists()) {
        lockF.deleteonExit();
        deletionHookAdded = true;
    }
    RandomAccessFile file = new RandomAccessFile(lockF, "rws");
    String jvmName = ManagementFactory.getRuntimeMXBean().getName();
    FileLock res = null;
    try {
        // 给文件加锁
        res = file.getChannel().tryLock();
        // 已有程序获取了锁
        if (null == res) {
            LOG.error("Unable to acquire file lock on path {}", lockF);
            throw new OverlappingFileLockException();
        }
        // 加锁成功,往文件中添加虚拟机信息
        file.write(jvmName.getBytes(Charsets.UTF_8));
        LOG.info("Lock on {} acquired by nodename {}", lockF, jvmName);
    } catch(OverlappingFileLockException oe) {
        // Cannot read from the locked file on Windows.
        // 已有程序获取了锁,关闭锁文件
        String lockingJvmName = Path.WINDOWS ? "" : (" " + file.readLine());
        LOG.error("It appears that another node {} has already locked the "
                  + "storage directory: {}", lockingJvmName, root, oe);
        file.close();
        return null;
    } catch(IOException e) {
        // 读取锁文件失败,关闭锁
        LOG.error("Failed to acquire lock on {}. If this storage directory is"
                  + " mounted via NFS, ensure that the appropriate nfs lock services"
                  + " are running.", lockF, e);
        file.close();
        throw e;
    }
    // 在虚拟机任务运行成功后,删除锁文件
    if (!deletionHookAdded) {
        // If the file existed prior to our startup, we didn't
        // call deleteonExit above. But since we successfully locked
        // the dir, we can take care of cleaning it up.
        lockF.deleteonExit();
    }
    return res;
}


public void unlock() throws IOException {
    if (this.lock == null)
        return;
    this.lock.release();
    lock.channel().close();
    lock = null;
}

2.1.2.3 状态恢复 *** 作

Datanode在执行升级、 回滚、 提交 *** 作的过程中会出现各种异常, 例如误 *** 作、 断电、 宕机等情况。

StorageDirectory提供了doRecover()和analyzeStorage()两个方法, Datanode会首先调用analyzeStorage()方法分析当前节点的存储状态, 然后根据分析所得的存储状态调用doRecover()方法执行恢复 *** 作。

  • analyzeStorage
public StorageState analyzeStorage(StartupOption startOpt, Storage storage,
                                   boolean checkCurrentIsEmpty)
    throws IOException {

    if (location != null &&
        location.getStorageType() == StorageType.PROVIDED) {
        // currently we assume that PROVIDED storages are always NORMAL
        // 如果是外部存储   目前,我们假设提供的存储始终为“正常”
        return StorageState.NORMAL;
    }

    assert root != null : "root is null";
    boolean hadMkdirs = false;
    String rootPath = root.getCanonicalPath();
    // NON_EXISTENT: 以非FORMAT选项启动时,目录不存在;
    // 或者目录不可写、路径为文件时, 存储目录状态都为NOT_EXISTENT状态。
    try { 
        // check that storage exists
        // 判断存储是否存在
        if (!root.exists()) {
            // storage directory does not exist
            // 如果启动方式不是format和hotswap,则标识存储不存在
            if (startOpt != StartupOption.FORMAT &&
                startOpt != StartupOption.HOTSWAP) {
                LOG.warn("Storage directory {} does not exist", rootPath);
                return StorageState.NON_EXISTENT;
            }
            // directory无法创建
            LOG.info("{} does not exist. Creating ...", rootPath);
            if (!root.mkdirs()) {
                throw new IOException("Cannot create directory " + rootPath);
            }
            hadMkdirs = true;
        }
        // 指定的存储是否是目录
        // or is inaccessible
        if (!root.isDirectory()) {
            LOG.warn("{} is not a directory", rootPath);
            return StorageState.NON_EXISTENT;
        }
        // 是否有写的权限
        if (!FileUtil.canWrite(root)) {
            LOG.warn("Cannot access storage directory {}", rootPath);
            return StorageState.NON_EXISTENT;
        }
    } catch(SecurityException ex) {
        // 权限异常
        LOG.warn("Cannot access storage directory {}", rootPath, ex);
        return StorageState.NON_EXISTENT;
    }

    // 存储存在,加锁
    this.lock(); // lock storage if it exists

    // If startOpt is HOTSWAP, it returns NOT_FORMATTED for empty directory,
    // while it also checks the layout version.
    // 如果startOpt是hotswap,则状态未not_formatted
    if (startOpt == HdfsServerConstants.StartupOption.FORMAT ||
        (startOpt == StartupOption.HOTSWAP && hadMkdirs)) {
        if (checkCurrentIsEmpty) {
            checkEmptyCurrent();
        }
        return StorageState.NOT_FORMATTED;
    }

    // 启动方式是否为导入检查点
    if (startOpt != HdfsServerConstants.StartupOption.import) {
        storage.checkOldLayoutStorage(this);
    }

    // check whether current directory is valid
    // 如果没有version file则创建,这里主要做判断是否存在
    File versionFile = getVersionFile();
    boolean hasCurrent = versionFile.exists();

    // check which directories exist
    boolean hasPrevious = getPreviousDir().exists();
    boolean hasPreviousTmp = getPreviousTmp().exists();
    boolean hasRemovedTmp = getRemovedTmp().exists();
    // 检查是否有Finalized文件
    boolean hasFinalizedTmp = getFinalizedTmp().exists();
    boolean hasCheckpointTmp = getLastCheckpointTmp().exists();

    // 各个指定的文件存在才执行下面的判断 ,如果current存在,则存储状态为normal
    // 如果前一个文件也存在,则抛出非一致性异常,如果判断不通过,则标识为not_formatted
    if (!(hasPreviousTmp || hasRemovedTmp
          || hasFinalizedTmp || hasCheckpointTmp)) {
        // no temp dirs - no recovery
        if (hasCurrent)
            return StorageState.NORMAL;
        if (hasPrevious)
            throw new InconsistentFSStateException(root,
                                                   "version file in current directory is missing.");
        if (checkCurrentIsEmpty) {
            checkEmptyCurrent();
        }
        return StorageState.NOT_FORMATTED;
    }

    // 判断tmp目录存在多于多个,抛出非一致性异常
    if ((hasPreviousTmp?1:0) + (hasRemovedTmp?1:0)
        + (hasFinalizedTmp?1:0) + (hasCheckpointTmp?1:0) > 1)
        // more than one temp dirs
        throw new InconsistentFSStateException(root,
                                               "too many temporary directories.");

    // # of temp dirs == 1 should either recover or complete a transition
    // 标识状态是否完成checkpoint
    if (hasCheckpointTmp) {
        return hasCurrent ? StorageState.COMPLETE_CHECKPOINT
            : StorageState.RECOVER_CHECKPOINT;
    }

    // 判断是否完成finalize
    if (hasFinalizedTmp) {
        if (hasPrevious)
            throw new InconsistentFSStateException(root,
                                                   STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_FINALIZED
                                                   + "cannot exist together.");
        return StorageState.COMPLETE_FINALIZE;
    }

    if (hasPreviousTmp) {
        if (hasPrevious)
            throw new InconsistentFSStateException(root,
                                                   STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_PREVIOUS
                                                   + " cannot exist together.");
        // 如果current目录存在,标识完成更新,否则标识为recover_upgrade
        if (hasCurrent)
            return StorageState.COMPLETE_UPGRADE;
        return StorageState.RECOVER_UPGRADE;
    }

    assert hasRemovedTmp : "hasRemovedTmp must be true";
    if (!(hasCurrent ^ hasPrevious))
        throw new InconsistentFSStateException(root,
                                               "one and only one directory " + STORAGE_DIR_CURRENT 
                                               + " or " + STORAGE_DIR_PREVIOUS 
                                               + " must be present when " + STORAGE_TMP_REMOVED
                                               + " exists.");
    if (hasCurrent)
        return StorageState.COMPLETE_ROLLBACK;
    return StorageState.RECOVER_ROLLBACK;
}

总结起来就是包含这几种状态:

NON_EXISTENT: 以非FORMAT选项启动时,目录不存在;或者目录不可写、路径为文件时, 存储目录状态都为NOT_EXISTENT状态。
NOT_FORMATTED: 以FORMAT选项启动时, 都为NOT_FORMATTED状态。
NORMAL: 没有tmp中间状态文件夹, 则存储目录为正常状态。
COMPLETE_UPGRADE: 存在current/VERSION文件, 存在previous.tmp文件夹,则存储目录为升级完成状态。
RECOVER_UPGRADE: 存在previous.tmp文件夹, 不存在current/VERSION文件, 存储目录应该从升级中恢复。
COMPLETE_ROLLBACK: 存在removed.tmp文件夹, 也存在current/VERSION文件, 则存储目录的回滚 *** 作成功完成。
RECOVER_ROLLBACK: 存在removed.tmp文件夹, 不存在current/VERSION文件, 存储目录应该从回滚中恢复。
COMPLETE_FINALIZE: 存在finalized.tmp文件夹, 存储目录可以继续执行提交 *** 作。
  • doRecover

调用analyzeStorage()之后, Datanode就可以确定存储目录的状态了。 对于异常状态,可以通过调用doRecover()方法进行恢复, 使存储空间的状态恢复到NORMAL状态。

恢复过程源码中的注释很清晰。

public void doRecover(StorageState curState) throws IOException {
    File curDir = getCurrentDir();
    if (curDir == null || root == null) {
        // at this point, we do not support recovery on PROVIDED storages
        return;
    }
    String rootPath = root.getCanonicalPath();
    switch(curState) {
        case COMPLETE_UPGRADE:  // mv previous.tmp -> previous
            LOG.info("Completing previous upgrade for storage directory {}",
                     rootPath);
            rename(getPreviousTmp(), getPreviousDir());
            return;
        case RECOVER_UPGRADE:   // mv previous.tmp -> current
            LOG.info("Recovering storage directory {} from previous upgrade",
                     rootPath);
            if (curDir.exists())
                deleteDir(curDir);
            rename(getPreviousTmp(), curDir);
            return;
        case COMPLETE_ROLLBACK: // rm removed.tmp
            LOG.info("Completing previous rollback for storage directory {}",
                     rootPath);
            deleteDir(getRemovedTmp());
            return;
        case RECOVER_ROLLBACK:  // mv removed.tmp -> current
            LOG.info("Recovering storage directory {} from previous rollback",
                     rootPath);
            rename(getRemovedTmp(), curDir);
            return;
        case COMPLETE_FINALIZE: // rm finalized.tmp
            LOG.info("Completing previous finalize for storage directory {}",
                     rootPath);
            deleteDir(getFinalizedTmp());
            return;
        case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint
            LOG.info("Completing previous checkpoint for storage directory {}",
                     rootPath);
            File prevCkptDir = getPreviousCheckpoint();
            if (prevCkptDir.exists())
                deleteDir(prevCkptDir);
            rename(getLastCheckpointTmp(), prevCkptDir);
            return;
        case RECOVER_CHECKPOINT:  // mv lastcheckpoint.tmp -> current
            LOG.info("Recovering storage directory {} from failed checkpoint",
                     rootPath);
            if (curDir.exists())
                deleteDir(curDir);
            rename(getLastCheckpointTmp(), curDir);
            return;
        default:
            throw new IOException("Unexpected FS state: " + curState
                                  + " for storage directory: " + rootPath);
    }
}
2.2 StorageInfo

StorageInfo用于表示存储的基本信息。

@InterfaceAudience.Private
public class StorageInfo {
  // 存储系统布局版本号, 当节点存储的目录结构发生改变或者fsimage和editlog的格式发生改变时,
  // 存储系统布局版本号会更新。 这个版本号一般是负数
  public int   layoutVersion;   // layout version of the storage data
  // 存储名称空间id,每一个bp对应一个
  public int   namespaceID;     // id of the file system
  // 系统的集群id
  public String clusterID;      // id of the cluster
  // fs state的创建时间
  public long  cTime;           // creation time of the file system state

  // 存储类型:有DATA_NODE、 NAME_NODE、 JOURNAL_NODE等类型
  protected final NodeType storageType; // Type of the node using this storage 

  // 上述提及的信息都存储在VERSION中
  protected static final String STORAGE_FILE_VERSION    = "VERSION";
}
2.3 DataStorage

DataStorage继承自Storage抽象类, 提供了管理Datanode存储空间的功能。

在HDFSFederation架构中, 一个Datanode可以保存多个命名空间的数据块,每个命名空间在Datanode磁盘上都拥有一个独立的块池( BlockPool),这个块池会分布在Datanode的所有存储目录下,它们共同保存了这个块池在当前Datanode上的所有数据块。HDFS定义了BlockPoolSliceStorage类管理Datanode上单个块池的存储空间,DataStorage类则定义了bpStorageMap字段保存Datanode上所有块池BlockPoolSliceStorage对象的引用。

// Maps block pool IDs to block pool storage
private final Map bpStorageMap
      = Collections.synchronizedMap(new HashMap());

dn在启动时会调用DataStorage提供的方法初始化Datanode的存储空间, 在HDFS Federation架构中, Datanode会保存多个命名空间的数据块。 对于每一个命名空间,Datanode都会构造一个BPOfferService类维护与这个命名空间Namenode的通信 。 当BPOfferService中的BPServiceActor类与该命名空间的Namenode握手成功后, 就会调用DataNode.initBlockPool()初始化该命名空间的块池。 DataNode.initBlockPool()方法中会调用DataNode的initStorage方法完成storage的初始化。而intiStorage中会调用DataStorage.recoverTransitionRead()来执行块池存储的初始化 *** 作。且DataNode#initStorage初始化只在和第一个namenode的握手完成时完成一次(第一次握手是在dn启动中的connectToNNAndHandshake 中完成)。

void initBlockPool(BPOfferService bpos) throws IOException {
    NamespaceInfo nsInfo = bpos.getNamespaceInfo();
    if (nsInfo == null) {
        throw new IOException("NamespaceInfo not found: Block pool " + bpos
                              + " should have retrieved namespace info before initBlockPool.");
    }

    setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());

    // Register the new block pool with the BP manager.
    // 通过BP manager注册新的block pool
    blockPoolManager.addBlockPool(bpos);

    // In the case that this is the first block pool to connect, initialize
    // the dataset, block scanners, etc.
    // 调用此方法完成storage的初始化
    initStorage(nsInfo);

    try {
        data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
    } catch (AddBlockPoolException e) {
        handleAddBlockPoolError(e);
    }
    // HDFS-14993: check disk after add the block pool info.
    checkDiskError();

    blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
    initDirectoryScanner(getConf());
    initDiskBalancer(data, getConf());
}


private void initStorage(final NamespaceInfo nsInfo) throws IOException {
    final FsDatasetSpi.Factory> factory
        = FsDatasetSpi.Factory.getFactory(getConf());

    if (!factory.isSimulated()) {
        final StartupOption startOpt = getStartupOption(getConf());
        if (startOpt == null) {
            throw new IOException("Startup option not set.");
        }
        final String bpid = nsInfo.getBlockPoolID();
        // read storage info, lock data dirs and transition fs state if necessary
        // 初始化storage info,加载数据目录,必要时转换fs state
        synchronized (this) {
            storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
        }
        final StorageInfo bpStorage = storage.getBPStorage(bpid);
        LOG.info("Setting up storage: nsid={};bpid={};lv={};" +
                 "nsInfo={};dnuuid={}",
                 bpStorage.getNamespaceID(), bpid, storage.getLayoutVersion(),
                 nsInfo, storage.getDatanodeUuid());
    }

    // If this is a newly formatted DataNode then assign a new DatanodeUuid.
    checkDatanodeUuid();

    // 如果数据目录未初始化完成,则根据完成了初始化的storage初始化FsDatasetImpl实例
    synchronized(this)  {
        if (data == null) {
            data = factory.newInstance(this, storage, getConf());
        }
    }
}

在上述方法中调用recoverTransitionRead分析指定块池的存储目录,如果有需要,还可以从以前的状态中恢复,且这个方法同步时还需要在多个dn线程之间,只有第一个dn才可以执行dn级别的存储目录状态恢复或转换。

void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
                           Collection dataDirs, StartupOption startOpt) throws IOException {
    if (addStorageLocations(datanode, nsInfo, dataDirs, startOpt).isEmpty()) {
        throw new IOException("All specified directories have failed to load.");
    }
}


@VisibleForTesting
synchronized List addStorageLocations(
    DataNode datanode,
	NamespaceInfo nsInfo, Collection dataDirs,
	StartupOption startOpt) throws IOException {
    // 获取并行卷加载的线程数
    final int numThreads = getParallelVolumeLoadThreadsNum(
        dataDirs.size(), datanode.getConf());
    // 创建一个线程池
    final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
    try {
        // 并发执行加载数据卷,返回完成加载的数据目录
        final List successLocations = loadDataStorage(
            datanode, nsInfo, dataDirs, startOpt, executor);
        return loadBlockPoolSliceStorage(
            datanode, nsInfo, successLocations, startOpt, executor);
    } finally {
        executor.shutdown();
    }
}

使用loadDataStorage加载数据卷:

private List loadDataStorage(DataNode datanode,
                                              NamespaceInfo nsInfo, Collection dataDirs,
                                              StartupOption startOpt, ExecutorService executor) throws IOException {
    // 用于记录完成加载的数据卷
    final List success = Lists.newArrayList();
    final List tasks = Lists.newArrayList();
    // 遍历数据目录
    for (StorageLocation dataDir : dataDirs) {
        // 是否完成了加载
        if (!containsStorageDir(dataDir)) {
            try {
                // It first ensures the datanode level format is completed.
                final List> callables
                    = Lists.newArrayList();
                // 加载一个数据目录,如果需要从过去状态中恢复
                final StorageDirectory sd = loadStorageDirectory(
                    datanode, nsInfo, dataDir, startOpt, callables);
                // 如果在loadStorageDirectory中存在状态转换即升级时callables不为空
                if (callables.isEmpty()) {
                    // 如果不是升级状态,那么将存储目录添加到storageDirs
                    addStorageDir(sd);
                    success.add(dataDir);
                } else {
                    for(Callable c : callables) {
                        tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
                    }
                }
            } catch (IOException e) {
                LOG.warn("Failed to add storage directory {}", dataDir, e);
            }
        } else {
            LOG.info("Storage directory {} has already been used.", dataDir);
            success.add(dataDir);
        }
    }

    // 将更新状态中的数据卷添加到storageDirs中
    if (!tasks.isEmpty()) {
        LOG.info("loadDataStorage: {} upgrade tasks", tasks.size());
        for(UpgradeTask t : tasks) {
            try {
                addStorageDir(t.future.get());
                success.add(t.dataDir);
            } catch (ExecutionException e) {
                LOG.warn("Failed to upgrade storage directory {}", t.dataDir, e);
            } catch (InterruptedException e) {
                throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
            }
        }
    }

    return success;
}

下面的代码为具体的分析加载数据目录的逻辑:

private StorageDirectory loadStorageDirectory(
    DataNode datanode,
    NamespaceInfo nsInfo, StorageLocation location, StartupOption startOpt,
    List> callables) throws IOException {
    // 根据指定地址创建一个存储目录
    StorageDirectory sd = new StorageDirectory(null, false, location);
    try {
        // 分析当前存储目录的状态供后续 *** 作
        // 调用analyzeStorage()方法分析当前StorageDirectory的状态
        StorageState curState = sd.analyzeStorage(startOpt, this, true);
        // sd is locked but not opened
        switch (curState) {
                // 如果状态正常,不做任何事
            case NORMAL:
                break;
                // 如果不存在,则是抛出异常
            case NON_EXISTENT:
                LOG.info("Storage directory with location {} does not exist", location);
                throw new IOException("Storage directory with location " + location
                                      + " does not exist");
                // 未格式化则进行格式化
            case NOT_FORMATTED: // format
                LOG.info("Storage directory with location {} is not formatted for "
                         + "namespace {}. Formatting...", location, nsInfo.getNamespaceID());
                format(sd, nsInfo, datanode.getDatanodeUuid(), datanode.getConf());
                break;
                // 默认则从其他状态中恢复
            default:  // recovery part is common
                sd.doRecover(curState);
        }

        // 2. Do transitions
        // Each storage directory is treated individually.
        // During startup some of them can upgrade or roll back
        // while others could be up-to-date for the regular startup.
        // doTransition()方法判断如果启动选项是ROLLBACK, 则调用doRollback()方法
        // 进行回滚 *** 作。 如果存储目录记录的文件系统布局版本号(VERSION文件记录)与
        // 内存中的版本号一致, 则Datanode正常启动; 如果存储目录记录的版本号小于
        // 内存中的版本号, 则调用doUpgrade()方法升级(注意layoutVersion为负数) 。
        if (!doTransition(sd, nsInfo, startOpt, callables, datanode.getConf())) {

            // 3. Update successfully loaded storage.
            setServiceLayoutVersion(getServiceLayoutVersion());
            writeProperties(sd);
        }

        return sd;
    } catch (IOException ioe) {
        sd.unlock();
        throw ioe;
    }
}
2.4 总结

在HDFS Federation架构中, 一个Datanode可以保存多个块池的数据块, 每个块池的数据块都会分布在Datanode所有的存储目录下。 HDFS定义了BlockPoolSliceStorage类管理Datanode上单个块池的存储空间, DataStorage类则定义了bpStorageMap字段保存Datanode上所有块池的BlockPoolSliceStorage对象的引用。

DataStorage类管理着整个Datanode的存储, 包括Datanode定义的多个存储目录。BlockPoolSliceStorage类则管理着一个块池的存储, 包括分布在Datanode的多个存储目录下的块池目录。因此从设计上看,BlockPoolSliceStorage和DataStorage的功能类似,只不过一个是管理存储,一个是管理块池。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5654500.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存