基于源码hadoop-3.3.0
1 概述众所周知,dn主要是用来存储hadoop集群中的具体的数据的。但实际上,Datanode还是需要保存一部分Datanode自身的元数据的, 这些元数据是通过Datanode磁盘存储上的一些文件和目录来保存的。
Datanode可以定义多个存储目录保存数据块,Datanode的多个存储目录存储的数据块并不相同,并且不同的存储目录可以是异构的, 这样的设计可以提高数据块IO的吞吐率[比如多块磁盘]。
1.1 实际存储下面看一个实际中的存储:
我们简单了解一下:
- BP-607470660-10.200.50.133-1568802623014:BP表示blockpool,所以这个目录是一个块池目录,块池目录保存了一个块池在当前存储目录下存储的所有数据块, 在Federation部署方式中, Datanode的一个存储目录会包含多个以“BP”开头的块池目录。 BP后面会紧跟一个唯一的随机块池ID, 在这个示例中就是607470660。 接下来的IP地址10.200.50.133是当前块池对应的Namenode的IP地址。 最后一个部分是这个块池的创建时间。
- VERSION文件:在current中存在一个VERSION文件,截图未显示,内容如下
块池目录的VERSION文件同样包含了文件系统布局版本(layoutVersion) 、 HDFS集群ID(clusterld) 以及创建时间(cTime) 等集群信息。 除此之外, 块池目录的VERSION文件还包含了以下信息。
-
- storageType:存储类型,这里是DATA_NODE
- current/BP-607470660-10.200.50.133-1568802623014/current目录:
-
- finalized/rbw:finalized和rbw目录都是用于存储数据块的, 包括数据块文件以及对应的校验和文件。 rbw(replica being written, 正在写入副本) 目录保存了正在由HDFS客户端写入当前Datanode的数据块。 finalized目录包含了已经完成写入 *** 作的数据块, 由于这样的数据块可能非常多, 所以finalized目录会以特定的目录结构存储这些数据块。每个数据块对应 2 个文件,blk 文件存放数据,另外一个以 meta 结尾的存放校验和等元数据
- VERSION:这里会记录namespaceID,cTime,blockpoolId,layoutVersion
- current/BP-607470660-10.200.50.133-1568802623014/scanner.cursor:DataNode 会定期的对每个 blk 文件做校验,这个文件是用来记录校验到哪个位置的
- in_use.lock:被dn线程持有的锁文件,用于防止多个Datanode线程启动并且并发修改这个存储目录
- lazyPersist: HDFS 2.X中引入了一个新的特性, 用于支持将临时数据写入内存, 然后通过懒持久化(lazyPersist) 方式写入磁盘。 如果用户开启了这个特性,lazyPersist目录就用于将内存中的临时数据懒持久化到磁盘。
Datanode最重要的功能就是管理磁盘上存储的HDFS数据块(block)。
Datanode将这个管理功能切分为两个部分:
① 管理与组织磁盘存储目录(由dfs.data.dir指定) , 如current、previous、 detach、 tmp等, 这个功能由DataStorage类实现;
②管理与组织数据块及其元数据文件, 这个功能主要由FsDatasetImpl(对应每一个存储目录)相关类实现。
本文主要介绍第一部分:磁盘存储目录的管理。
2 磁盘存储目录管理在dn中负责管理磁盘存储的主要继承结构如下:
因此我们分别了解一下这几个概念。
2.1 Storage根据注释,此类用于存储storage file信息,本地的storage 信息存储在VERSION文件中,主要包括node的类型,存储布局版本,namespaceId,fs state create time。
本地存储可以驻留在多个目录中。每个目录都应包含与其他目录相同的VERSION 文件。在启动Hadoop服务器(名称节点和数据节点)期间读取它们的本地存储来自他们的信息。
服务器在运行时为每个存储目录持有一个锁,以便其他节点无法启动共享相同的存储。当服务器停止(正常或异常)时释放锁。
Storage是一个抽象类, 为Datanode、 Namenode提供抽象的存储服务。 Storage类管理着当前节点上( 可以是Datanode或者Namenode) 所有的存储目录, 每个存储目录都由一个StorageDirectory对象管理,StorageDirectory类定义了存储目录上的通用 *** 作。 由于HDFS 2.X版本引入了Federation机制 , Datanode会为多个块池保存数据块, HDFS定义了BlockPoolSliceStorage类来管理Datanode上的一个块池, 这个块池分布在Datanode配置的所有存储目录中。Storage用一个线性表字段storageDirs存储它管理的所有StorageDirectory, 并通过Dirlterator迭代器进行遍历。
private final List2.1.1 Storage.StorageStatestorageDirs = new CopyOnWriteArrayList<>();
StorageState定义了存储空间可能出现的所有状态。 在升级、 回滚、 升级提交、 检查点等 *** 作中, 节点(Datanode或者Namenode) 的存储空间可能出现各种异常, 例如误 *** 作、 断电、 宕机等情况, 这个时候存储空间就可能处于某种中间状态, 引入中间状态, 有利于HDFS从错误中恢复过来。 存储状态的确定, 是在StorageDirectory.analyzeStorage()方法中进行的.
public enum StorageState { NON_EXISTENT, // 存储不存在 NOT_FORMATTED, // 存储未格式化 COMPLETE_UPGRADE, // 完成升级 RECOVER_UPGRADE, // 恢复升级 COMPLETE_FINALIZE, // 完成升级提交 COMPLETE_ROLLBACK, // 完成回滚 *** 作 RECOVER_ROLLBACK, // 恢复回滚 COMPLETE_CHECKPOINT, // 完成检查点 *** 作 RECOVER_CHECKPOINT, // 恢复检查点 *** 作 NORMAL; // 正常状态 }2.1.2 Storage.StorageDirectory
我们知道Datanode和Namenode都可以定义多个存储目录来存储数据,StorageDirectory是Storage的内部类, 定义了管理存储目录的通用方法。
重点字段:
// 存储目录的根, 就是java.io.File文件。 final File root; // root directory // 指示当前目录是否是共享的。 // 例如在HA部署中, 不同的Namenode之间共享存储目录, // 或者在Federation部署中不同的块池之间共享存储目录。 final boolean isShared; // 当前存储目录的类型。 final StorageDirType dirType; // storage dir type // 独占锁, java.nio.FileLock类型, // 用来支持Datanode或者Namenode线程独占存储目录的锁 *** 作。 FileLock lock; // storage lock // 权限信息 private final FsPermission permission; // 存储目录的标识符。 private String storageUuid = null; // Storage directory identifier. // 位置信息 private final StorageLocation location;
StorageDirectory的 *** 作主要包含:
2.1.2.1 获取文件夹
主要是获取存储目录中的各个文件和目录的方法:
包括:
- current和current Version文件
- previous和previous Version文件
- previous tmp目录
- removed tmp目录
- finlized tmp目录
- lastCheckpoint tmp目录
- previous.checkpoint文件
public File getCurrentDir() { if (root == null) { return null; } return new File(root, STORAGE_DIR_CURRENT); } public File getVersionFile() { if (root == null) { return null; } return new File(new File(root, STORAGE_DIR_CURRENT), STORAGE_FILE_VERSION); } public File getPreviousVersionFile() { if (root == null) { return null; } return new File(new File(root, STORAGE_DIR_PREVIOUS), STORAGE_FILE_VERSION); } public File getPreviousDir() { if (root == null) { return null; } return new File(root, STORAGE_DIR_PREVIOUS); } public File getPreviousTmp() { if (root == null) { return null; } return new File(root, STORAGE_TMP_PREVIOUS); } public File getRemovedTmp() { if (root == null) { return null; } return new File(root, STORAGE_TMP_REMOVED); } public File getFinalizedTmp() { if (root == null) { return null; } return new File(root, STORAGE_TMP_FINALIZED); } public File getLastCheckpointTmp() { if (root == null) { return null; } return new File(root, STORAGE_TMP_LAST_CKPT); } public File getPreviousCheckpoint() { if (root == null) { return null; } return new File(root, STORAGE_PREVIOUS_CKPT); }
2.1.2.2 加锁解锁 *** 作
Datanode磁盘存储结构中,存储目录下会有一个in_use.lock文件,这个文件用于对当前存储目录加锁, 以保证Datanode进程对存储目录的独占使用。 当Datanode进程退出执行时, in_use.lock文件会被删除。 StorageDirectory提供了tryLock()与unlock()两个锁方法, 分别实现了对存储目录加锁以及解锁的功能。
StorageDirectory中真正进行加锁 *** 作的是tryLock()方法。tryLock()方法会首先构造锁文件, 然后调用file.getChannel.lock()方法尝试获得存储目录的独占锁, 如果已经有进程占有锁文件, 那么file.getChannel.lock()就会返回一个null的引用, 表明有另一个节点运行在当前的存储目录上, tryLock()方法会抛出异常并退出执行。 如果加锁成功, tryLock()方法会在锁文件中写入虚拟机信息。
加锁成功后, tryLock()方法会调用deleteonExit()方法, 在Java虚拟机运行结束时删除in_use.lock文件。
@SuppressWarnings("resource") FileLock tryLock() throws IOException { boolean deletionHookAdded = false; // 构造in_use.lock File lockF = new File(root, STORAGE_FILE_LOCK); // 如果in_use.lock创建失败 if (!lockF.exists()) { lockF.deleteonExit(); deletionHookAdded = true; } RandomAccessFile file = new RandomAccessFile(lockF, "rws"); String jvmName = ManagementFactory.getRuntimeMXBean().getName(); FileLock res = null; try { // 给文件加锁 res = file.getChannel().tryLock(); // 已有程序获取了锁 if (null == res) { LOG.error("Unable to acquire file lock on path {}", lockF); throw new OverlappingFileLockException(); } // 加锁成功,往文件中添加虚拟机信息 file.write(jvmName.getBytes(Charsets.UTF_8)); LOG.info("Lock on {} acquired by nodename {}", lockF, jvmName); } catch(OverlappingFileLockException oe) { // Cannot read from the locked file on Windows. // 已有程序获取了锁,关闭锁文件 String lockingJvmName = Path.WINDOWS ? "" : (" " + file.readLine()); LOG.error("It appears that another node {} has already locked the " + "storage directory: {}", lockingJvmName, root, oe); file.close(); return null; } catch(IOException e) { // 读取锁文件失败,关闭锁 LOG.error("Failed to acquire lock on {}. If this storage directory is" + " mounted via NFS, ensure that the appropriate nfs lock services" + " are running.", lockF, e); file.close(); throw e; } // 在虚拟机任务运行成功后,删除锁文件 if (!deletionHookAdded) { // If the file existed prior to our startup, we didn't // call deleteonExit above. But since we successfully locked // the dir, we can take care of cleaning it up. lockF.deleteonExit(); } return res; } public void unlock() throws IOException { if (this.lock == null) return; this.lock.release(); lock.channel().close(); lock = null; }
2.1.2.3 状态恢复 *** 作
Datanode在执行升级、 回滚、 提交 *** 作的过程中会出现各种异常, 例如误 *** 作、 断电、 宕机等情况。
StorageDirectory提供了doRecover()和analyzeStorage()两个方法, Datanode会首先调用analyzeStorage()方法分析当前节点的存储状态, 然后根据分析所得的存储状态调用doRecover()方法执行恢复 *** 作。
- analyzeStorage
public StorageState analyzeStorage(StartupOption startOpt, Storage storage, boolean checkCurrentIsEmpty) throws IOException { if (location != null && location.getStorageType() == StorageType.PROVIDED) { // currently we assume that PROVIDED storages are always NORMAL // 如果是外部存储 目前,我们假设提供的存储始终为“正常” return StorageState.NORMAL; } assert root != null : "root is null"; boolean hadMkdirs = false; String rootPath = root.getCanonicalPath(); // NON_EXISTENT: 以非FORMAT选项启动时,目录不存在; // 或者目录不可写、路径为文件时, 存储目录状态都为NOT_EXISTENT状态。 try { // check that storage exists // 判断存储是否存在 if (!root.exists()) { // storage directory does not exist // 如果启动方式不是format和hotswap,则标识存储不存在 if (startOpt != StartupOption.FORMAT && startOpt != StartupOption.HOTSWAP) { LOG.warn("Storage directory {} does not exist", rootPath); return StorageState.NON_EXISTENT; } // directory无法创建 LOG.info("{} does not exist. Creating ...", rootPath); if (!root.mkdirs()) { throw new IOException("Cannot create directory " + rootPath); } hadMkdirs = true; } // 指定的存储是否是目录 // or is inaccessible if (!root.isDirectory()) { LOG.warn("{} is not a directory", rootPath); return StorageState.NON_EXISTENT; } // 是否有写的权限 if (!FileUtil.canWrite(root)) { LOG.warn("Cannot access storage directory {}", rootPath); return StorageState.NON_EXISTENT; } } catch(SecurityException ex) { // 权限异常 LOG.warn("Cannot access storage directory {}", rootPath, ex); return StorageState.NON_EXISTENT; } // 存储存在,加锁 this.lock(); // lock storage if it exists // If startOpt is HOTSWAP, it returns NOT_FORMATTED for empty directory, // while it also checks the layout version. // 如果startOpt是hotswap,则状态未not_formatted if (startOpt == HdfsServerConstants.StartupOption.FORMAT || (startOpt == StartupOption.HOTSWAP && hadMkdirs)) { if (checkCurrentIsEmpty) { checkEmptyCurrent(); } return StorageState.NOT_FORMATTED; } // 启动方式是否为导入检查点 if (startOpt != HdfsServerConstants.StartupOption.import) { storage.checkOldLayoutStorage(this); } // check whether current directory is valid // 如果没有version file则创建,这里主要做判断是否存在 File versionFile = getVersionFile(); boolean hasCurrent = versionFile.exists(); // check which directories exist boolean hasPrevious = getPreviousDir().exists(); boolean hasPreviousTmp = getPreviousTmp().exists(); boolean hasRemovedTmp = getRemovedTmp().exists(); // 检查是否有Finalized文件 boolean hasFinalizedTmp = getFinalizedTmp().exists(); boolean hasCheckpointTmp = getLastCheckpointTmp().exists(); // 各个指定的文件存在才执行下面的判断 ,如果current存在,则存储状态为normal // 如果前一个文件也存在,则抛出非一致性异常,如果判断不通过,则标识为not_formatted if (!(hasPreviousTmp || hasRemovedTmp || hasFinalizedTmp || hasCheckpointTmp)) { // no temp dirs - no recovery if (hasCurrent) return StorageState.NORMAL; if (hasPrevious) throw new InconsistentFSStateException(root, "version file in current directory is missing."); if (checkCurrentIsEmpty) { checkEmptyCurrent(); } return StorageState.NOT_FORMATTED; } // 判断tmp目录存在多于多个,抛出非一致性异常 if ((hasPreviousTmp?1:0) + (hasRemovedTmp?1:0) + (hasFinalizedTmp?1:0) + (hasCheckpointTmp?1:0) > 1) // more than one temp dirs throw new InconsistentFSStateException(root, "too many temporary directories."); // # of temp dirs == 1 should either recover or complete a transition // 标识状态是否完成checkpoint if (hasCheckpointTmp) { return hasCurrent ? StorageState.COMPLETE_CHECKPOINT : StorageState.RECOVER_CHECKPOINT; } // 判断是否完成finalize if (hasFinalizedTmp) { if (hasPrevious) throw new InconsistentFSStateException(root, STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_FINALIZED + "cannot exist together."); return StorageState.COMPLETE_FINALIZE; } if (hasPreviousTmp) { if (hasPrevious) throw new InconsistentFSStateException(root, STORAGE_DIR_PREVIOUS + " and " + STORAGE_TMP_PREVIOUS + " cannot exist together."); // 如果current目录存在,标识完成更新,否则标识为recover_upgrade if (hasCurrent) return StorageState.COMPLETE_UPGRADE; return StorageState.RECOVER_UPGRADE; } assert hasRemovedTmp : "hasRemovedTmp must be true"; if (!(hasCurrent ^ hasPrevious)) throw new InconsistentFSStateException(root, "one and only one directory " + STORAGE_DIR_CURRENT + " or " + STORAGE_DIR_PREVIOUS + " must be present when " + STORAGE_TMP_REMOVED + " exists."); if (hasCurrent) return StorageState.COMPLETE_ROLLBACK; return StorageState.RECOVER_ROLLBACK; }
总结起来就是包含这几种状态:
NON_EXISTENT: 以非FORMAT选项启动时,目录不存在;或者目录不可写、路径为文件时, 存储目录状态都为NOT_EXISTENT状态。 NOT_FORMATTED: 以FORMAT选项启动时, 都为NOT_FORMATTED状态。 NORMAL: 没有tmp中间状态文件夹, 则存储目录为正常状态。 COMPLETE_UPGRADE: 存在current/VERSION文件, 存在previous.tmp文件夹,则存储目录为升级完成状态。 RECOVER_UPGRADE: 存在previous.tmp文件夹, 不存在current/VERSION文件, 存储目录应该从升级中恢复。 COMPLETE_ROLLBACK: 存在removed.tmp文件夹, 也存在current/VERSION文件, 则存储目录的回滚 *** 作成功完成。 RECOVER_ROLLBACK: 存在removed.tmp文件夹, 不存在current/VERSION文件, 存储目录应该从回滚中恢复。 COMPLETE_FINALIZE: 存在finalized.tmp文件夹, 存储目录可以继续执行提交 *** 作。
- doRecover
调用analyzeStorage()之后, Datanode就可以确定存储目录的状态了。 对于异常状态,可以通过调用doRecover()方法进行恢复, 使存储空间的状态恢复到NORMAL状态。
恢复过程源码中的注释很清晰。
public void doRecover(StorageState curState) throws IOException { File curDir = getCurrentDir(); if (curDir == null || root == null) { // at this point, we do not support recovery on PROVIDED storages return; } String rootPath = root.getCanonicalPath(); switch(curState) { case COMPLETE_UPGRADE: // mv previous.tmp -> previous LOG.info("Completing previous upgrade for storage directory {}", rootPath); rename(getPreviousTmp(), getPreviousDir()); return; case RECOVER_UPGRADE: // mv previous.tmp -> current LOG.info("Recovering storage directory {} from previous upgrade", rootPath); if (curDir.exists()) deleteDir(curDir); rename(getPreviousTmp(), curDir); return; case COMPLETE_ROLLBACK: // rm removed.tmp LOG.info("Completing previous rollback for storage directory {}", rootPath); deleteDir(getRemovedTmp()); return; case RECOVER_ROLLBACK: // mv removed.tmp -> current LOG.info("Recovering storage directory {} from previous rollback", rootPath); rename(getRemovedTmp(), curDir); return; case COMPLETE_FINALIZE: // rm finalized.tmp LOG.info("Completing previous finalize for storage directory {}", rootPath); deleteDir(getFinalizedTmp()); return; case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint LOG.info("Completing previous checkpoint for storage directory {}", rootPath); File prevCkptDir = getPreviousCheckpoint(); if (prevCkptDir.exists()) deleteDir(prevCkptDir); rename(getLastCheckpointTmp(), prevCkptDir); return; case RECOVER_CHECKPOINT: // mv lastcheckpoint.tmp -> current LOG.info("Recovering storage directory {} from failed checkpoint", rootPath); if (curDir.exists()) deleteDir(curDir); rename(getLastCheckpointTmp(), curDir); return; default: throw new IOException("Unexpected FS state: " + curState + " for storage directory: " + rootPath); } }2.2 StorageInfo
StorageInfo用于表示存储的基本信息。
@InterfaceAudience.Private public class StorageInfo { // 存储系统布局版本号, 当节点存储的目录结构发生改变或者fsimage和editlog的格式发生改变时, // 存储系统布局版本号会更新。 这个版本号一般是负数 public int layoutVersion; // layout version of the storage data // 存储名称空间id,每一个bp对应一个 public int namespaceID; // id of the file system // 系统的集群id public String clusterID; // id of the cluster // fs state的创建时间 public long cTime; // creation time of the file system state // 存储类型:有DATA_NODE、 NAME_NODE、 JOURNAL_NODE等类型 protected final NodeType storageType; // Type of the node using this storage // 上述提及的信息都存储在VERSION中 protected static final String STORAGE_FILE_VERSION = "VERSION"; }2.3 DataStorage
DataStorage继承自Storage抽象类, 提供了管理Datanode存储空间的功能。
在HDFSFederation架构中, 一个Datanode可以保存多个命名空间的数据块,每个命名空间在Datanode磁盘上都拥有一个独立的块池( BlockPool),这个块池会分布在Datanode的所有存储目录下,它们共同保存了这个块池在当前Datanode上的所有数据块。HDFS定义了BlockPoolSliceStorage类管理Datanode上单个块池的存储空间,DataStorage类则定义了bpStorageMap字段保存Datanode上所有块池BlockPoolSliceStorage对象的引用。
// Maps block pool IDs to block pool storage private final MapbpStorageMap = Collections.synchronizedMap(new HashMap ());
dn在启动时会调用DataStorage提供的方法初始化Datanode的存储空间, 在HDFS Federation架构中, Datanode会保存多个命名空间的数据块。 对于每一个命名空间,Datanode都会构造一个BPOfferService类维护与这个命名空间Namenode的通信 。 当BPOfferService中的BPServiceActor类与该命名空间的Namenode握手成功后, 就会调用DataNode.initBlockPool()初始化该命名空间的块池。 DataNode.initBlockPool()方法中会调用DataNode的initStorage方法完成storage的初始化。而intiStorage中会调用DataStorage.recoverTransitionRead()来执行块池存储的初始化 *** 作。且DataNode#initStorage初始化只在和第一个namenode的握手完成时完成一次(第一次握手是在dn启动中的connectToNNAndHandshake 中完成)。
void initBlockPool(BPOfferService bpos) throws IOException { NamespaceInfo nsInfo = bpos.getNamespaceInfo(); if (nsInfo == null) { throw new IOException("NamespaceInfo not found: Block pool " + bpos + " should have retrieved namespace info before initBlockPool."); } setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID()); // Register the new block pool with the BP manager. // 通过BP manager注册新的block pool blockPoolManager.addBlockPool(bpos); // In the case that this is the first block pool to connect, initialize // the dataset, block scanners, etc. // 调用此方法完成storage的初始化 initStorage(nsInfo); try { data.addBlockPool(nsInfo.getBlockPoolID(), getConf()); } catch (AddBlockPoolException e) { handleAddBlockPoolError(e); } // HDFS-14993: check disk after add the block pool info. checkDiskError(); blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); initDirectoryScanner(getConf()); initDiskBalancer(data, getConf()); } private void initStorage(final NamespaceInfo nsInfo) throws IOException { final FsDatasetSpi.Factory extends FsDatasetSpi>> factory = FsDatasetSpi.Factory.getFactory(getConf()); if (!factory.isSimulated()) { final StartupOption startOpt = getStartupOption(getConf()); if (startOpt == null) { throw new IOException("Startup option not set."); } final String bpid = nsInfo.getBlockPoolID(); // read storage info, lock data dirs and transition fs state if necessary // 初始化storage info,加载数据目录,必要时转换fs state synchronized (this) { storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt); } final StorageInfo bpStorage = storage.getBPStorage(bpid); LOG.info("Setting up storage: nsid={};bpid={};lv={};" + "nsInfo={};dnuuid={}", bpStorage.getNamespaceID(), bpid, storage.getLayoutVersion(), nsInfo, storage.getDatanodeUuid()); } // If this is a newly formatted DataNode then assign a new DatanodeUuid. checkDatanodeUuid(); // 如果数据目录未初始化完成,则根据完成了初始化的storage初始化FsDatasetImpl实例 synchronized(this) { if (data == null) { data = factory.newInstance(this, storage, getConf()); } } }
在上述方法中调用recoverTransitionRead分析指定块池的存储目录,如果有需要,还可以从以前的状态中恢复,且这个方法同步时还需要在多个dn线程之间,只有第一个dn才可以执行dn级别的存储目录状态恢复或转换。
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, CollectiondataDirs, StartupOption startOpt) throws IOException { if (addStorageLocations(datanode, nsInfo, dataDirs, startOpt).isEmpty()) { throw new IOException("All specified directories have failed to load."); } } @VisibleForTesting synchronized List addStorageLocations( DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt) throws IOException { // 获取并行卷加载的线程数 final int numThreads = getParallelVolumeLoadThreadsNum( dataDirs.size(), datanode.getConf()); // 创建一个线程池 final ExecutorService executor = Executors.newFixedThreadPool(numThreads); try { // 并发执行加载数据卷,返回完成加载的数据目录 final List successLocations = loadDataStorage( datanode, nsInfo, dataDirs, startOpt, executor); return loadBlockPoolSliceStorage( datanode, nsInfo, successLocations, startOpt, executor); } finally { executor.shutdown(); } }
使用loadDataStorage加载数据卷:
private ListloadDataStorage(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt, ExecutorService executor) throws IOException { // 用于记录完成加载的数据卷 final List success = Lists.newArrayList(); final List tasks = Lists.newArrayList(); // 遍历数据目录 for (StorageLocation dataDir : dataDirs) { // 是否完成了加载 if (!containsStorageDir(dataDir)) { try { // It first ensures the datanode level format is completed. final List > callables = Lists.newArrayList(); // 加载一个数据目录,如果需要从过去状态中恢复 final StorageDirectory sd = loadStorageDirectory( datanode, nsInfo, dataDir, startOpt, callables); // 如果在loadStorageDirectory中存在状态转换即升级时callables不为空 if (callables.isEmpty()) { // 如果不是升级状态,那么将存储目录添加到storageDirs addStorageDir(sd); success.add(dataDir); } else { for(Callable c : callables) { tasks.add(new UpgradeTask(dataDir, executor.submit(c))); } } } catch (IOException e) { LOG.warn("Failed to add storage directory {}", dataDir, e); } } else { LOG.info("Storage directory {} has already been used.", dataDir); success.add(dataDir); } } // 将更新状态中的数据卷添加到storageDirs中 if (!tasks.isEmpty()) { LOG.info("loadDataStorage: {} upgrade tasks", tasks.size()); for(UpgradeTask t : tasks) { try { addStorageDir(t.future.get()); success.add(t.dataDir); } catch (ExecutionException e) { LOG.warn("Failed to upgrade storage directory {}", t.dataDir, e); } catch (InterruptedException e) { throw DFSUtilClient.toInterruptedIOException("Task interrupted", e); } } } return success; }
下面的代码为具体的分析加载数据目录的逻辑:
private StorageDirectory loadStorageDirectory( DataNode datanode, NamespaceInfo nsInfo, StorageLocation location, StartupOption startOpt, List2.4 总结> callables) throws IOException { // 根据指定地址创建一个存储目录 StorageDirectory sd = new StorageDirectory(null, false, location); try { // 分析当前存储目录的状态供后续 *** 作 // 调用analyzeStorage()方法分析当前StorageDirectory的状态 StorageState curState = sd.analyzeStorage(startOpt, this, true); // sd is locked but not opened switch (curState) { // 如果状态正常,不做任何事 case NORMAL: break; // 如果不存在,则是抛出异常 case NON_EXISTENT: LOG.info("Storage directory with location {} does not exist", location); throw new IOException("Storage directory with location " + location + " does not exist"); // 未格式化则进行格式化 case NOT_FORMATTED: // format LOG.info("Storage directory with location {} is not formatted for " + "namespace {}. Formatting...", location, nsInfo.getNamespaceID()); format(sd, nsInfo, datanode.getDatanodeUuid(), datanode.getConf()); break; // 默认则从其他状态中恢复 default: // recovery part is common sd.doRecover(curState); } // 2. Do transitions // Each storage directory is treated individually. // During startup some of them can upgrade or roll back // while others could be up-to-date for the regular startup. // doTransition()方法判断如果启动选项是ROLLBACK, 则调用doRollback()方法 // 进行回滚 *** 作。 如果存储目录记录的文件系统布局版本号(VERSION文件记录)与 // 内存中的版本号一致, 则Datanode正常启动; 如果存储目录记录的版本号小于 // 内存中的版本号, 则调用doUpgrade()方法升级(注意layoutVersion为负数) 。 if (!doTransition(sd, nsInfo, startOpt, callables, datanode.getConf())) { // 3. Update successfully loaded storage. setServiceLayoutVersion(getServiceLayoutVersion()); writeProperties(sd); } return sd; } catch (IOException ioe) { sd.unlock(); throw ioe; } }
在HDFS Federation架构中, 一个Datanode可以保存多个块池的数据块, 每个块池的数据块都会分布在Datanode所有的存储目录下。 HDFS定义了BlockPoolSliceStorage类管理Datanode上单个块池的存储空间, DataStorage类则定义了bpStorageMap字段保存Datanode上所有块池的BlockPoolSliceStorage对象的引用。
DataStorage类管理着整个Datanode的存储, 包括Datanode定义的多个存储目录。BlockPoolSliceStorage类则管理着一个块池的存储, 包括分布在Datanode的多个存储目录下的块池目录。因此从设计上看,BlockPoolSliceStorage和DataStorage的功能类似,只不过一个是管理存储,一个是管理块池。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)