2021SC@SDUSC
Hadoop源码分析(八)——NameNode实现(4)Hadoop源码分析(五)—— NameNode实现(1)
Hadoop源码分析(六)—— NameNode实现(2)
Hadoop源码分析(七)—— NameNode实现(3)
- Hadoop源码分析(八)——NameNode实现(4)
- 6.FSImage文件系统镜像
- 6.1 StorageInfo类
- 6.2 DirIterator类
- 6.3 FSImage 具体实现
- 6.3.1 枚举类
- 6.3.1.1NameNodeFile
- 6.3.1.2 CheckpointStates
- 6.3.1.3 NameDirType
- 6.3.2 内部类
- 6.3.3 成员变量
- 6.3.4 成员方法
FSImage 所在的包为 org.apache.hadoop.hdfs.server.namenode,该类用于处理对于 namenode 的命名空间的更新 *** 作,它会将更新 *** 作记录到日志文件中,而且会定期生成对应的检查点。FSImage继承自 Storage,而Storage进一步继承自StorageInfo,所以我们首先来分析Storagelnfo类。Storagelnfo所在的包为org.apache.hadoop.hdfs.server.common,该类是存储信息的通用实体类。
6.1 StorageInfo类public int layoutversion; //从存储的文件中读取的版本信息。 public int namespacelD; //存储介质对应的命名空间ID。它是根据address + port来进行散列得到的。 public long cTime; //文件或者目录的创建时间。 public Storageinfo () { this(0, 0, 0L); } //默认的构造方法,将成员变量都初始化为0。 public Storageinfo(int layoutV, int nsID, long cT) { layoutversion = layoutV; namespacelD = nsID; cTime = cT; } //根据指定的参数值来初始化成员变量。 public Storageinfo(Storageinfo from) { setstorageinfo(from); } //根据另一个Storageinfo对象的信息来初始化此Storageinfo的成员变量。
根据另一个Storageinfo对象的信息来初始化此Storageinfo的成员变量。
Storage 所在的包为 org.apache.hadoop.hdfk.server.common,它继承自上面的 Storagelnfb 类。 本地存储信息会被保存到一个单独的VERSION文件中,它的内容如下:
namespaceID=281506863 //文件系统命名空间的ID cTime=0 //文件系统状态的创建时间。 storageType=NAME_NODE //存储的类型。 layoutVersion=-l8 //版本号信息。
本地存储可以被保存到多个储存目录中,每个目录中应该保存相同的VERSION文件。当hadoop的 namenode或者datanode启动的时候,会从VERSION文件中读取本地的存储信息,此时会对每个存储目 录加上一把锁,从而保证其他的结点不会因为共享同一个存储目录而启动。当hadoop的namenode或者 datanode终止的时候,会释放掉它们所持有的存储目录的锁,从而使其他的结点来使用该存储目录。
接下来看一下Storage中定义存储状态的StorageState枚举类型:
NON_EXISTENT, //指定的目录不存在。 NOT_FORMATTED, //指定的目录存在但是未被格式化。 COMPLETE_UPGRADE, //完成升级,此时previous.tmp和current目录同时存在。 RECOVER_UPGRADE, //恢复升级,此时previous.tmp目录存在,但是current目录不存在。 COMPLETE*1 INAL I ZE, //完成确认,此时finalized.tmp和current目录都存在。 COMPLETE_ROLLBACK, //完成回滚,此时removed.tmp和current目录存在,但是previous目录不存在。 RECOVER_ROLLBACK, //恢复回滚,此时removed.tmp和previous目录存在,但是current目录不存在。 COMPLETE_CHECKPOINT, 完成检查点,此时lastcheckpoint.tmp和current目录都存在。 RECOVER_CHECKPOINT, //恢复检查点,此时lastcheckpoint.tmp目录存在,但是current目录不存在。 NORMAL; //正常。
接下来看一下Storage中用于定义与存储相关的文件或者目录的成员变量:
private static final String STORAGE_FILE_LOCK = "in_use.lock"; //用于保存存储目录的锁的文件。 protected static final String STORAGE_FILE_VERSION = "VERSION"; //用于保存存储目录的版本信息的文件。 public static final String STORAGE_DIR_CURRENT = "current"; //当前的存储目录。 private static final String STORAGE_DIR_PREVIOUS = "previous"; //用于升级之后保存以前版本的目录。 private static final String STORAGE_TMP_REMOVED = "removed, tmp"; //用于回滚过程中保存文件的目录。如果回滚成功,removed.tmp目录会被删除;否则,removed.tmp 目录会在系统启动而执行恢复的过程中重命名为currento private static final String STORAGE_TMP_PREVIOUS = "previous.tmpn"; //用于升级过程中保存以前版本的目录。当升级成功之后,previous.tmp将被重命名为previous;否贝U, previous.tmp将会被重命名为current。 private static final String STORAGE_TMP_FINALIZED = "finalized. tmp"; //用于提交过程中保存文件的目录。该目录下保存了升级前的文件系统的状态信息。 private static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.trnp"; //用于保存最新的检查点信息的临时目录。当有新的文件系统状态被保存到current目录中时, lastcheckpoint.tmp会保存当前文件系统的状态。如果最新的文件系统状态保存成功,贝0 lastcheckpoint.tmp 会被重命名为 previous.checkpoint;否则 lastcheckpoint.tmp 会被重命名为 currento private static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint"; //用于保存最后一次对文件系统的状态进行更新之前的文件系统的状态信息的目录。
接下来看一下Storage中的与存储相关的3个内部类:
StorageDirType接口中定义了与存储目录相关的方法,源代码如下:
public StorageDirType getStorageDirType(); //该方法用于获得存储目录的类型。 public boolean isOfType(StorageDirType type); //该方法用于检查某个目录是否为指定的存储目录。
StorageDirectory类是代表一个存储目录的实体类,它的源代码如下:
File root; //存储目录的根目录。 FileLock lock; //存储目录的锁对象。 StorageDirType dirType; //存储目录的类型。 public File getVersionFile() { return new File(new File(root, STORAGE_DIR_CURRENT), STORAGE_FILE_VERSION); }
该方法用于返回VERSION文件的路径,VERSION会保存在current目录下。
public void read() throws lOException { read(getVersionFile()); } public void read(File from) throws IOException { RandomAccessFile file = new RandomAccessFile (from, "rws"); FileInputStream in = null; try { in = new FilelnputStream(file.getFD()); file.seek(O); Properties props = new Properties(); props,load(in); getFields(props, this); } finally { if (in != null) { in.close(); } file.close(); ) }
该方法用于读取存储目录下的VERSION文件中的内容。
public void write() throws IOException { corruptPreUpgradeStorage(root); write(getVersionFile()); } public void write(File to) throws IOException { Properties props = new Properties(); setFields(props, this); RandomAccessFile file = new RandomAccessFile(to, "rws"); FileOutputStream out = null; try { file.seek(O); out = new FileOutputStream(file.getFD()); props.store(outr null); file.setLength(out.getChannel().position()); } finally { if (out != null) { out.close(); } file.close(); } }
该方法用于向 VERSION 文件中写入 namespacelD、cTime、storageType 和 layoutVersion 信息。
public File getCurrentDir() { return new File(root, STORAGE_DIR_CURRENT) };
取得current目录的路径。
public File getPreviousDir() { return new File(root, STORAGE_DIR_PREVIOUS); }
取得previous目录的路径。其中previous目录下保存了升级前的HDFS文件系统的存储状态信息。
public File getPreviousVersionFile() { return new File(new File(root, STORAGE_DIR_PREVIOUS), STORAGE_FILE_VERSION); }
取得previous目录下的VERSION文件的路径。
public File getRemovedTmp() { return new File(root, STORAGE_TMP_REMOVED); }
取得removed.tmp目录的路径。
public File getFinalizedTmp() { return new File(root, STORAGE_TMP_FINALIZED); }
取得finalized.tmp目录的路径。
public File getLastCheckpointTmp() { return new File(root, STORAGE_TMP_LAST_CKPT); }
取得lastcheckpointtmp目录的路径。
public File getPreviousCheckpoint() { return new File(root, STORAGE_PREVIOUS_CKPT); }
取得previous.checkpoint目录的路径。
6.2 DirIterator类Diriterator类实现了 lterator< StorageDirectory>接口,该类主要用于对存储目录的集合进行迭代 *** 作, 它的源代码如下:
StorageDirType dirType; //存储目录的类型。 int prevIndex; //前一个被访问的StorageDirectory索引。 int nextIndex; //下一个将要被访问的StorageDirectory的索引。
Diriterator中也定义了用于对StorageDirectory的集合进行遍历 *** 作的hasNext> next和remove方法。 接下来看一下Storage中的其他成员变量和成员方法:
private NodeType storageType; //结点类型。NodeType是一个枚举类型,它包括如下的枚举项: NAME_N0DE //名称结点。 DATA_NODE 数据结点。 protected ListstorageDirs = new ArrayList (); //存储目录集合。
public int getNumStorageDirs() { return storageDirs.size(); }
取得存储目录的数量。
public StorageDirectory getStorageDir(int idx) { return storageDirs.get(idx); }
取得某个特定索引位置处的存储目录。
protected void addStorageDir(StorageDirectory sd) { storageDirs.add(sd); }
添加一个存储目录。
protected void getFields(Properties props,StorageDirectory sd)
该方法用于从VERSION属性文件所对应的Properties对象中获得对应字段的属性值。
protected void setFields(Properties props,StorageDirectory sd)
该方法用于将VERSION属性文件中对应的字段保存到Properties对象中。
public static void rename(File from, File to) throws IOException { if (!from.renameTo(to)) throw new lOException ("Failed to rename "+ from.getCanonicalPath() + " to " + to.getCanonicalPath()); }
该方法用于将from文件重命名为to文件。
protected static void deleteDir(File dir) throws IOException { if (!FileUtil.fullyDelete(dir)) throw new IOException("Failed to delete " + dir.getCanonicalPath()); }
通过调用FileUtil的fullyDelete方法来删除指定的目录。
public void writeAll() throws IOException { this.layoutversion = FSConstants.LAYOUT_VERSION; for (Iteratorit = storageDirs.iterator(); it.hasNext();) { it.next () . write (); } }
将所有的存储目录的信息持久化到对应VERSION文件中。
public void unlockAll() throws IOException { for (Iterator6.3 FSImage 具体实现 6.3.1 枚举类 6.3.1.1NameNodeFileit = storageDirs.iterator(); it.hasNext();) { it.next().unlock(); } }
该枚举类定义了用于存储文件系统镜像的文件的名称,包括的枚举项如下:
IMAGE ("fsimage") //存储文件系统的目录树的文件名称。 TIME ("fstime") //存储上一次新打开一个 *** 作日志的时间的文件名称。 EDITS ("edits") //存储目录树上执行的 *** 作日志的文件名称。 IMAGE_NEW ("fsimage. ckpt") //存储文件系统的目录树的检查点的文件名称。 EDITS_NEW ("edits.new") //存储目录树上执行的最新 *** 作日志的文件名称。6.3.1.2 CheckpointStates
该枚举类定义了检查点的状态,包括的枚举项如下:
START //开始。 ROLLED_EDITS //回滚编辑日志。 UPLOAD_START //上传开始。 UPLOAD_DONE //上传完成。6.3.1.3 NameDirType
该枚举类型实现了 StorageDirType接口,定义了NameNode结点下的目录的类型。NameNode下的目录的类型可以是仅存储fsimage文件的IMAGE类型,也可以是存储edits文件的EDITS类型,或者是 存储fsimage与edits文件的IMAGE_AND_EDITS类型。NameNodeDirType包括的枚举项如下:
UNDEFINED //未定义的目录类型。 IMAGE //存储fsimage文件的目录。 EDITS //存储edits文件的目录。 IMAGE_AND_EDITS //存储fsimage和edits文件的目录。6.3.2 内部类
Datanodelmage实现了 Writable接口,该类用于将DataNode的持久化信息保存到fsimage文件中。
DatanodeDescriptor node = new DatanodeDescriptor();
保存DataNode基本信息的对象。
需要被保存到fsimage文件中的DataNode的持久化信息包括;Datanode的ID(hostname:portNumber, storageID, infoPort 和 ipcPort)、DataNode 上的磁盘总容量、DataNode 上剩余的磁盘容量、DataNode 最 后被更新的时间和DataNode上xceiver的数量。
private static final SimpleDateFormat DATE_FORM = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss"); //定义的时间格式。 protected long checkpointTime = -1L; //检査点的写入时间。 protected FSEditLog editLog = null; //文件系统的编辑日志。 private boolean isUpgradeFinalized = false; //是否完成升级 *** 作的标识。 protected List6.3.4 成员方法removedStorageDirs = new ArrayList (); //由于出错而需要被移除的StorageDirectory的列表。 private Collection checkpointDirs; //检查点目录集合,其中检查点目录下保存了 fsimage文件。 private Collection checkpointEditsDirs; //检查点对应的编辑目录集合,其中编辑目录下保存了 edits文件。 volatile private Checkpointstates ckptState = FSImage.Checkpointstates.START; //检査点的状态。 static private final FsPermission FILE_PERM = new FsPermission((short)0); static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR); //上面的两个变量用于将文件系统的镜像持久化到磁盘中,第一个变量定义了文件的 *** 作权限,第二 个变量定义了存储目录的分隔符。
FSImage () { super(NodeType.NAME_NODE); this.editLog = new FSEditLog(this); }
默认的构造方法会为NameNode创建一个新的存储,同时初始化editLog成员变量。
FSImage(CollectionfsDirs, Collection fsEditsDirs)throws lOException ( this (); setStorageDirectories(fsDirs, fsEditsDirs); }
根据指定的文件系统目录集合和文件系统的日志文件的目录集合来创建FSImage对象。
public FSImage(StorageInfo storageinfo) { super(NodeType.NAME_NODE, storageinfo); }
根据指定的存储信息来为NameNode创建一个新的存储。
public FSImage(File imageDir) throws IOException { this (); ArrayListdirs = new ArrayList (1); ArrayList editsDirs = new ArrayList (1); dirs.add(imageDir); editsDirs.add(imageDir); setStorageDirectories(dirs, editsDirs); }
根据指定的存放了 fsimage和edits文件的目录来创建FSImage对象。
FSImage中提供了两个重载的通过加载fsimage文件来构建FSImage对象的loadFSImage方法,一个 loadFSImage方法需要一个File参数,它会从指定的文件中加载镜像到文件系统中;而另一个loadFSImage 方法不需要任何的参数,它会从一个目录中选择最新的镜像文件并加载到内存中,同时将与同一目录下 的edits文件中的内容应用到内存中的镜像上。
我们首先看一下不带任何参数的loadFSImage方法的实现逻辑:
long latestNameCheckpointTime = Long.MIN_VALUE; long latestEditsCheckpointTime = Long.MIN_VALUE;
用于保存最新的fsimage所在的目录和edits文件所在的目录对应的检查点时间的long变量。
StorageDirectory latestNameSD = null; StorageDirectory latestEditsSD = null;
用于保存最新的fsimage所在的目录和edits文件所在的目录对应StorageDirectory变量。
boolean needToSave = false;
是否需要将该FSImage强制持久化到fsimage文件中的标识。
isUpgradeFinalized = true;
升级是否完成的标识。
CollectionimageDirs = new ArrayList (); Collection editsDirs = new ArrayList ();
用于保存fsimage所在的目录和edits文件所在的目录的集合。
for (Iteratorit = dirIterator(); it.hasNext();)
开始循环所有的系统目录,并找岀保存最新的fsimage和edits文件的目录。
StorageDirectory sd = it.next(); if (!sd.getVersionFile().exists()) { needToSave |= true; continue; }
如果此存储目录中不存在VERSION文件,则需要将needToSave设置为true,并对下一个存储目录 进行处理。
boolean imageExists = false, editsExists = false;
用于标识存储目录下是否存在fsimage和edits文件的变量。
if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) { imageExists = getlmageFile(sd, NameNodeFile.IMAGE).exists(); imageDi rs.add(sd.getRoot().getCanonicalPath()); }
如果存储目录的类型为IMAGE,而且目录下存在fsimage文件,则需要将该目录添加到imageDirs 集合中。
if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { editsExists = getlmageFile(sd, NameNodeFile.EDITS).exists(); editsDirs.add(sd.getRoot().getCanonicalPath()); }
如果存储目录的类型为EDITS,而且目录下存在edits文件,则需要将该目录添加到editsDirs集合中。
checkpointTime = readCheckpointTime(sd);
取得目录对应的检查点时间。
if ((checkpointTime != Long.MIN_VALUE) && ((checkpointTime != latestNameCheckpointTime) || (checkpointTime != latestEditsCheckpointTime))) { needToSave |= true; }
如果不同存储目录对应的检查点时间不一致,则需要将needToSave设置为true。
if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE) && (latestNameCheckpointTime < checkpointTime) && imageExists) { latestNameCheckpointTime = checkpointTime; latestNameSD = sd; }
找到保存最新的fsimage文件的目录和对应的检查点时间。
if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS) && (latestEditsCheckpointTime < checkpointTime) && editsExists) { latestEditsCheckpointTime = checkpointTime; latestEditsSD = sd; }
找到保存最新的edits文件的目录和对应的检查点时间。
if (checkpointTime <= OL) needToSave |= true; isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists(); }
如果目录的检查点的时间小于等于0,则needToSave设置为true,并设置升级完成标识。
至此,查找最新的存储目录的循环结束。
if (latestNameSD == null) throw new IOException ("Image file is not found in " + imageDirs); if (latestEditsSD == null) throw new IOException ("Edits file is not found in " + editsDirs);
如果没有找到用于保存最新的fsimage或者edits文件的目录,则抛出对应的异常。
if (latestNameCheckpointTime > latestEditsCheckpointTime&& latestNameSD != latestEditsSD && latestNameSD.getStorageDirType() == NameNodeDirType.IMAGE && latestEditsSD.getStorageDirType() == NameNodeDirType.EDITS) { LOG.error("This is a rare failure scenario!!!"); LOG.error("Image checkpoint time " + latestNameCheckpointTime + "> edits checkpoint time " + latestEditsCheckpointTime); LOG.error ("Name-node will treat the image as the latest state of " + "the namespace. Old edits will be discarded."); }else if (latestNameCheckpointTime != latestEditsCheckpointTime) throw new lOException("Inconsistent storage detected, " + "image and edits checkpoint times do not match. " +"image checkpoint time = " + latestNameCheckpointTime +"edits checkpoint time = " + latestEditsCheckpointTime)
必须保证从相同的检查点来加载fsimage和edits文件,如果检査点不同,则抛出对应的异常。
needToSave |= recoverInterruptedCheckpoint(latestNameSD, latestEditsSD);
通过recoverlntemiptedCheckpoint方法来判断是否需要从上一个中断的检查点来执行恢复 *** 作。
long startTime = FSNamesystem.now();
取得加载fsimage文件的开始时间。
long imageSize = getlmageFile(latestNameSD, NameNodeFile.IMAGE).length();
取得fsimage文件的长度。
latestNameSD.read();
读取VERSION文件的内容。
needToSave |= loadFSImage(getlmageFile(latestNameSD, NameNodeFile.IMAGE));
调用重载的loadFSImage方法来加载fsimage镜像文件。
if (latestNameCheckpointTime > latestEditsCheckpointTime) needToSave |= true; else needToSave |= (loadFSEdits(latestEditsSD) > 0);
如果保存最新的fsimage文件的目录的检查点时间大于保存最新的edits文件的目录的检査点时间, 则不需要加载edits文件的内容;否则,需要调用loadFSEdits方法来加载edits文件的内容。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)