- Hadoop进阶篇
- HDFS:Hadoop分布式文件系统
- 分布式文件系统的理解
- HDFS 架构详细剖析
- 1. 分块存储
- 2. 3副本存储
- 3. [扩展:机架感知](https://hadoop.apache.org/docs/r3.1.4/hadoop-project-dist/hadoop-common/RackAwareness.html)
- 4. 抽象成数据块的好处
- 5. HDFS架构
- 6. 扩展
- 块缓存
- hdfs的文件权限验证
- HDFS 的 Shell 命令 *** 作
- hdfs 常用命令
- 补充命令
- hdfs的优缺点
- 1. hdfs的优点
- 2. hdfs的缺点
- hdfs安全模式
- hdfs的java编程
- 1. 创建文件夹
- 2. 文件上传
- 3. 文件下载
- 4. 查看hdfs文件详细信息
- 5. IO 流 *** 作 hdfs 文件
- 6. hdfs 的小文件合并
- DataNode 工作机制及存储
- 1. DataNode 工作机制
- 2. 数据完整性
- 3. 掉线时限参数设置
- 4. DataNode的目录结构
- 5. DataNode多目录配置
- hdfs的读写流程
- 1. hdfs 的写入流程
- 2. hdfs 的读取流程
- 更详细的 hdfs 写入流程图
- 最直观的理解便是:三个臭皮匠,顶个诸葛亮。
问:上边的文件有几个块,分别是多大?
- 保存文件到HDFS时,会先默认按128M的单位对文件进行切分成一个个 block 块
- 数据以block块的形式存在 HDFS 文件系统中
- 在hadoop1当中,文件的block块默认大小是64M
- hadoop2当中,文件的block块大小默认是128M,block块的大小可以通过 hdfs-site.xml 当中的配置文件进行指定
dfs.blocksize 块大小 以字节为单位
- hdfs-default.xml 参考默认属性
- block元数据:
- 每个 block 块的元数据大小大概为150字节
- 一个 1k 大小的 block 与一个 128M 大小的block的元数据基本相等
- 所以在 namenode 内存有限的情况下,存储大文件更划算。
2. 3副本存储面试题
如果有一个文件大小为1KB,它有几个块?每个块是多大?
类似于有一个水桶可以装128斤的水,但是我只装了1斤的水,那么我的水桶里面水的重量就是1斤,而不是128斤
- 为了保证block块的安全性,也就是数据的安全性,在hadoop2当中,采用文件默认保存三个副本,我们可以更改副本数以提高数据的安全性。
- 在 hdfs-site.xml 当中修改以下配置属性,即可更改文件的副本数。
3. 扩展:机架感知dfs.replication 3
- 副本存放策略,不同版本稍有区别
- 比如apache hadoop 2.7.7
- 比如apache hadoop 2.8.5
- 文件可能大于集群中任意一个磁盘
- 10T * 3 / 128 = xxx 块,10 T 文件方式存多个 block 块,这些 block 块属于一个文件
- 使用块抽象而不是文件可以简化存储子系统
- hdfs将所有的文件全部抽象成为block块来进行存储,不管文件大小,全部一视同仁都是以block块的形式进行存储,方便我们的分布式文件系统对文件的管理
- 块非常适合用于数据备份,进而提供数据容错能力和可用性
- HDFS 集群包括:NameNode、DataNode 和 Secondary NameNode
- NameNode 负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
- DataNode 负责管理用户的文件数据块,每一个数据块都可以在多个 DataNode 上存储多个副本。
- Secondary NameNode 用来监控 HDFS 状态的辅助后台程序,每隔一段时间获取 HDFS 元数据的快照。最主要作用是辅助 NameNode 管理元数据信息。
- NameNode 与 DataNode 总结概述:
- 官网文档
- 通常 DataNode 从磁盘中读取块,但对于访问频繁的文件,其对应的块可能被显示的缓存在 DataNode 的内存中,以堆外块缓存的形式存在。
- 默认情况下,一个块仅缓存在一个 DataNode 的内存中,当然可以针对每个文件配置 DataNode 的数量。作业调度器通过在缓存块的 DataNode 上运行任务,可以利用块缓存的优势提高读 *** 作的性能。
hdfs的文件权限验证例如: 连接(join) *** 作中使用的一个小的查询表就是块缓存的一个很好的候选。
用户或应用通过在缓存池中增加一个cache directive来告诉namenode需要缓存哪些文件及存多久。缓存池(cache pool)是一个拥有管理缓存权限和资源使用的管理性分组
- hdfs的文件权限机制与linux系统的文件权限机制类似
- r:read w:write x:execute 权限x对于文件表示忽略,对于文件夹表示是否有权限访问其内容
- 如果linux系统用户zhangsan使用hadoop命令创建一个文件,那么这个文件在HDFS当中的owner就是zhangsan
- HDFS文件权限的目的,防止好人做错事,而不是阻止坏人做坏事。HDFS相信你告诉我你是谁,你就是谁。
- hdfs 的权限可以用 kerberos、ranger 来做
- HDFS命令有两种风格,两种命令均可使用,效果相同。
- hadoop fs开头的
- hdfs dfs开头的
- 如何查看 hdfs 或 hadoop 子命令的帮助信息,如 ls 子命令
hdfs dfs -help ls hadoop fs -help ls #两个命令等价
- 查看 hdfs 文件系统中指定目录的文件列表。对比 linux 命令ls
hdfs dfs -ls / hadoop fs -ls / hdfs dfs -ls -R /
- 在hdfs文件系统中创建文件
hdfs dfs -touchz /edits.txt
- 向HDFS文件中追加内容
hadoop fs -appendToFile edit1.xml /edits.txt #将本地磁盘当前目录的edit1.xml内容追加到HDFS根目录 的edits.txt文件
- 查看HDFS文件内容
hdfs dfs -cat /edits.txt
- 从本地路径上传文件至HDFS
#用法:hdfs dfs -put /本地路径 /hdfs路径 hdfs dfs -put /linux本地磁盘文件 /hdfs路径文件 hdfs dfs -copyFromLocal /linux本地磁盘文件 /hdfs路径文件 #跟put作用一样 hdfs dfs -moveFromLocal /linux本地磁盘文件 /hdfs路径文件 #跟put作用一样,只不过,源文件被拷贝成功后,会被删除
- 在hdfs文件系统中下载文件
hdfs dfs -get /hdfs路径 /本地路径 hdfs dfs -copyToLocal /hdfs路径 /本地路径 #根get作用一样
- 在hdfs文件系统中创建目录
hdfs dfs -mkdir /shell
- 在hdfs文件系统中删除文件
hdfs dfs -rm /edits.txt # INFO fs.TrashPolicyDefault: Moved: 'hdfs://node01:8020/edits.txt' to trash at: hdfs://node01:8020/user/hadoop/.Trash/Current/edits.txt # 将文件彻底删除(被删除文件不放到hdfs的垃圾桶里) how? hdfs dfs -rm -skipTrash /edit1.xml
- 在hdfs文件系统中修改文件名称(也可以用来移动文件到目录)
# 先创建一个文件 hdfs dfs -touchz /a.txt # 重命名文件 hdfs dfs -mv /a.txt /b.sh # 移动文件 hdfs dfs -mv /b.sh /shell
- 在hdfs中拷贝文件到目录
hdfs dfs -cp /xrsync.sh /shell
- 递归删除目录
hdfs dfs -rm -r /shell
- 列出本地文件的内容(默认是hdfs文件系统)
hdfs dfs -ls file:///home/hadoop/
- 查找文件
# linux find命令 find . -name 'edit*' # HDFS find命令 hadoop fs -find / -name a*.txt # 在HDFS根目录中,查找文件名以 a开头的txt文件
- 总结:
- 输入hadoop fs 或hdfs dfs,回车,查看所有的HDFS命令
- 许多命令与linux命令有很大的相似性,学会举一反三
- 有用的help,如查看ls命令的使用说明:hadoop fs -help ls
- 绝大多数的大数据框架的命令,也有类似的help信息
- hdfs与getconf结合使用
# 获取NameNode的节点名称(可能有多个) hdfs getconf -namenodes # 获取hdfs最小块信息, 用相同命令可获取其他的属性值 hdfs getconf -confKey dfs.namenode.fs-limits.min-block-size # 查找hdfs的NameNode的RPC地址 hdfs getconf -nnRpcAddresses
- hdfs与dfsadmin结合使用
# 查看hdfs dfsadmin的帮助信息 hdfs dfsadmin # 同样要学会借助help查看具体命令的帮助信息 hdfs dfsadmin -help safemode # 查看当前的模式 hdfs dfsadmin -safemode get # 进入/退出安全模式 hdfs dfsadmin -safemode enter # 进入安全模式 hdfs dfsadmin -safemode leave #退出安全模式
- hdfs与fsck结合使用
# fsck指令显示HDFS块信息 hdfs fsck /a.txt -files -blocks -locations # 查看文件 a.txt 的块信息
- 其他命令
# 检查压缩库本地安装情况 hadoop checknative # 格式化名称节点(慎用,一般只在初次搭建集群,使用一次;格式化成功后,不要再使用) hadoop namenode -format # 执行自定义jar包 hadoop jar /bigdata/install/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar pi 5 5 yarn jar /bigdata/install/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar pi 5 5hdfs的优缺点 1. hdfs的优点
- 高容错性
- 数据自动保存多个副本,它通过增加副本的形式,提高容错性。
- 某一个副本丢失以后,它可以自动恢复,这是由 HDFS 内部机制自动实现。
- 适合批处理
- 把数据位置暴露给计算框架,通过移动计算而不是移动数据,提高效率。
- 适合大数据处理
- 数据规模:能够处理数据规模达到 GB、TB、甚至PB级别的数据。
- 文件规模:能够处理百万规模以上的文件数量,数量相当之大。
- 节点规模:能够处理10K 节点的规模。
- 流式数据访问
- 一次写入,多次读取。
- 不能随机修改,只能追加。
- 它能保证数据的一致性。
- 可构建在廉价的机器上
- 它通过多副本机制,提高可靠性。
- 它提供了容错和恢复机制:比如某一个副本丢失,可以通过其它副本来恢复。
- 不适合低延时的数据访问
- 比如毫秒级的存储、读取数据,这是不行的,它做不到。
- 它适合高吞吐率的场景,就是在某一时间内写入大量的数据。
- 无法高效的对大量小文件进行存储
- 存储大量小文件的话,它会占用 NameNode大量的内存来存储文件、目录和块信息。这样是不可取的,因为NameNode的内存总是有限的。
- 小文件存储的寻道时间会超过读取时间,它违反了HDFS的设计目标。
- 并发写入、文件随机修改
- 一个文件只能有一个写,不允许多个线程同时写(租约机制)。
- 仅支持数据 append(追加),不支持文件的随机修改。
- 安全模式是HDFS所处的一种特殊状态
- 文件系统只接受读请求
- 不接受写请求,如删除、修改等变更请求
- 在NameNode主节点启动时,HDFS首先进入安全模式
- DataNode 在启动的时候会向 NameNode 汇报可用的 block 等状态,当整个系统达到安全标准时,HDFS自动离开安全模式。
- 如果HDFS处于安全模式下,则文件block不能进行任何的副本复制 *** 作,因此达到最小的副本数量要求是基于 DataNode 启动时的状态来判定的。
- 启动时不会再做任何复制(从而达到最小副本数量要求)。
- hdfs集群刚启动的时候,默认30S钟的时间是出于安全期的,只有过了30S之后,集群脱离了安全期,然后才可以对集群进行 *** 作
- 何时退出安全模式
- NameNode 知道集群共多少个block(不考虑副本),假设值是total;
- NameNode 启动后,会上报block report,NameNode 开始累加统计满足最小副本数(默认1)的block个数,假设是num;
- 当 num/total > 99.9% 时,退出安全模式。
$ hdfs dfsadmin -safemode Usage: hdfs dfsadmin [-safemode enter | leave | get | wait]hdfs的java编程
- maven 依赖:
UTF-8 1.8 1.8 3.1.4 org.apache.hadoop hadoop-client${hadoop.version} org.apache.hadoop hadoop-common${hadoop.version} org.apache.hadoop hadoop-hdfs${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-core${hadoop.version} junit junit4.11 test org.testng testngRELEASE log4j log4j1.2.17 org.apache.maven.plugins maven-compiler-plugin3.0 1.8 UTF-8 org.apache.maven.plugins maven-shade-plugin2.4.3 package shade true
- api文档
public class HdfsMkdirsTest { // 简化版 @Test public void mkdirsOnHdfs_simple() throws IOException { // 配置项 Configuration configuration = new Configuration(); // 设置要连接的 hdfs 集群 NameNode configuration.set("fs.defaultFS", "hdfs://node01:8020"); // 获取文件系统 FileSystem fileSystem = FileSystem.get(configuration); // 调用方法创建目录,若目录存在,则创建失败,返回false boolean result = fileSystem.mkdirs(new Path("/yw/dir1")); assertTrue(result); fileSystem.close(); } // 指定目录所属用户 @Test public void mkdirsOnHdfs_withUser() throws Exception { // 配置项 Configuration configuration = new Configuration(); // 获取文件系统 FileSystem fileSystem = FileSystem.get( new URI("hdfs://node01:8020"), configuration, "test"); // 调用方法创建目录,若目录存在,则创建失败,返回false boolean result = fileSystem.mkdirs(new Path("/yw/dir2")); assertTrue(result); fileSystem.close(); } // 创建目录时,指定目录权限 @Test public void mkdirsOnHdfs_withPermission() throws IOException { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://node01:8020"); FileSystem fileSystem = FileSystem.get(configuration); FsPermission fsPermission = new FsPermission(FsAction.ALL, FsAction.READ, FsAction.READ); boolean result = fileSystem.mkdirs(new Path("hdfs://node01:8020/yw/dir3"), fsPermission); assertTrue(result); fileSystem.close(); } }2. 文件上传
public class HdfsUploadFileTest { @Test public void uploadFile2Hdfs() throws IOException { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://node01:8020"); FileSystem fileSystem = FileSystem.get(configuration); fileSystem.copyFromLocalFile(new Path("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/test/resources/hello.txt"), new Path("/yw/dir1")); fileSystem.close(); } }3. 文件下载
public class HdfsDownloadFileTest { @Test public void downloadFile2Hdfs() throws IOException { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://node01:8020"); FileSystem fileSystem = FileSystem.get(configuration); fileSystem.copyToLocalFile(new Path("/yw/dir1/hello.txt"), new Path("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/test/resources")); // // 删除文件 // fileSystem.delete() // // 重命名文件 // fileSystem.rename() fileSystem.close(); } }4. 查看hdfs文件详细信息
public class HdfsViewFileTest { @Test public void uploadFile2Hdfs() throws Exception { // 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), configuration); // 获取文件详情 RemoteIteratorlistFiles = fs.listFiles(new Path("/yw"), true); while (listFiles.hasNext()) { LocatedFileStatus status = listFiles.next(); // 输出文件详情 // 文件名称 System.out.println(status.getPath().getName()); // 长度 System.out.println(status.getLen()); // 权限 System.out.println(status.getPermission()); // 分组 System.out.println(status.getGroup()); // 获取存储的块信息 BlockLocation[] blockLocations = status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { // 获取块存储的主机节点 String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println(host); } } } // 关闭资源 fs.close(); } }
- 输出:
hello.txt 13 rw-r--r-- supergroup node01 node02 node035. IO 流 *** 作 hdfs 文件
@Test public void putFile2Hdfs() throws Exception { // 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), configuration); // 创建输入流,不需要加 file:///,否则报错 FileInputStream fis = new FileInputStream(new File("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/test/resources/hello.txt")); // 创建输出流,父目录不存在,会自动创建 FSDataOutputStream fos = fs.create(new Path("/yw/dir2/hello.txt")); // 流对拷 IOUtils.copy(fis, fos); // org.apache.commons.io.IOUtils // 关闭资源 IOUtils.closeQuietly(fis); IOUtils.closeQuietly(fos); fs.close(); }
- 通过IO流从hdfs上面下载文件类似,这里不做赘述。
@Test public void mergeFile() throws Exception { // 获取分布式文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), configuration, "hadoop"); FSDataOutputStream fsdos = fs.create(new Path("/yw/dir3/big.txt")); LocalFileSystem lfs = FileSystem.getLocal(configuration); FileStatus[] fileStatuses = lfs.listStatus(new Path("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/test/resources/")); for (FileStatus fileStatus : fileStatuses) { // 获取每一个本地文件路径 Path path = fileStatus.getPath(); // 读取本地小文件 FSDataInputStream fsdis = lfs.open(path); IOUtils.copy(fsdis, fsdos); IOUtils.closeQuietly(fsdis); } IOUtils.closeQuietly(fsdos); lfs.close(); fs.close(); }DataNode 工作机制及存储
- HDFS分布式文件系统也是一个主从架构
- 主节点是我们的 NameNode,负责管理整个集群以及维护集群的元数据信息。
- 从节点 DataNode,主要负责文件数据存储。
- 一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件
- 一个是数据本身,一个是元数据:包括数据块的长度,块数据的校验和,以及时间戳。
- hdfs-site.xml中指定了数据存储的路径
dfs.datanode.data.dir file:///bigdata/install/hadoop-3.1.4/hadoopDatas/datanodeDatas
- DataNode 启动后向 NameNode 注册,通过后周期性(6小时)的向 NameNode 上报所有的块信息。
- 心跳周期 3 秒
- 心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块数据到另一台机器,或删除某个数据块。
- 如果超过10分钟没有收到某个 DataNode 的心跳,则认为该节点不可用。
- 集群运行中可以安全加入和退出一些机器。
- 当客户端向 hdfs 写数据时
- 会计算数据的校验和,以此保证数据通过网络传输,到达 DataNode 后,没有丢失数据
- 当 DataNode 读取 block 时
- 它会计算checksum
- 如果计算后的checksum,与 block 创建时值不一样,说明block已经损坏
- client 读取其他 DataNode 上的block
- DataNode 在其文件创建后周期验证 checksum
- DataNode 进程死亡或者网络故障造成 DataNode 无法与 NameNode 通信,NameNode 不会立即把该节点判定为死亡
- 要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:
# 以下属性,可以查看官网的`hdfs-default.xml`文件 timeout = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval
- 而默认的dfs.namenode.heartbeat.recheck-interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。
- 需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。
4. DataNode的目录结构dfs.namenode.heartbeat.recheck-interval 300000 dfs.heartbeat.interval 3
- 和 NameNode 不同的是,DataNode 的存储目录是初始阶段自动创建的,不需要额外格式化。
- 在主节点node01的目录/bigdata/install/hadoop-3.1.4/hadoopDatas/datanodeDatas/current下查看版本号
[hadoop@centos128 current]$ cat VERSION #Wed Nov 24 07:46:56 CST 2021 storageID=DS-1661390a-4348-459e-938f-220dd9984e38 clusterID=CID-d4ce4fe0-d991-4a87-bccb-fde0c80fd838 cTime=0 datanodeUuid=04b892c7-d0d5-4e65-ac96-8130476fde01 storageType=DATA_NODE layoutVersion=-57
- 具体解释:
- storageID:存储 id 号
- clusterID:集群ID,全局唯一
- cTime:标记了 DataNode 存储系统的创建时间
- 对于刚刚格式化的存储系统,这个属性为0;
- 但是在文件系统升级之后,该值会更新到新的时间戳。
- datanodeUuid:DataNode 的唯一识别码
- storageType:存储类型
- layoutVersion:是一个负整数。通常只有HDFS增加新特性时才会更新这个版本号。
- DataNode 也可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本。具体配置如下:
vim /bigdata/install/hadoop-3.1.4/etc/hadoop/hdfs-site.xml
hdfs的读写流程 1. hdfs 的写入流程dfs.datanode.data.dir /home/hadoop/develop/data/data1/hdfs, /home/hadoop/develop/data/data2/hdfs
- 创建文件:
- ①HDFS client向HDFS写入数据,先调用DistributedFileSystem.create();
- ②RPC调用namenode的create(),会在HDFS目录树中指定的路径,添加新文件;
- 并将 *** 作记录在edits.log中;
- namenode.create()方法执行完后,返回一个FSDataOutputStream,它是DFSOutputStream的包装类;
- 建立数据流管道 Pipeline:
- ③client调用DFSOutputStream.write()写数据(先写第一个块的数据,暂时叫blk1);
- ④DFSOutputStream通过RPC调用namenode的addBlock,向namenode申请一个空的数据块block;
- ⑤addBlock返回LocatedBlock对象;此对象中包含了当前blk要存储在哪三个datanode的信息,比如dn1、dn2、dn3;
- ⑥客户端,根据位置信息,建立数据流管道(图中蓝色线条)
- 向数据流管道写入当前块的数据:
- ⑦写数据时,先将数据写入一个检验块chunk中,写满512字节后,对此chunk计算校验和checksum值(4字节);
- ⑧然后将chunk及对应校验和写入packet中,一个packet是64KB;
- ⑨随着源源不断的带校验和的chunk写入packet,当packet写满后,将packet写入dataqueue数据队列中;
- ⑩packet从队列中取出,沿pipeline发送到dn1,再从dn1发送到dn2,再从dn2发送到dn3;
- ⑪同时,此packet会保存一份到一个确认队列ack queue中;
- ⑫packet到达最后一个datanode即dn3后,做校验,将校验结果逆着pipeline方向回传到客户端,具体是校验结果从dn3传到dn2,dn2也会做校验,校验结果再传到dn1,dn1也做校验;结果再传回客户端;
- ⑬客户端根据校验结果,如果“成功”,则将将保存在ack queue中的packet删除;如果失败,则将packet取出,重新放回到data queue末尾,等待再次沿pipeline发送;
- ⑭如此,将block中的一个数据一个个packet发送出去;当此block发送完毕,即dn1、dn2、dn3都接受了blk1的完整的副本,那么三个dn分别RPC调用namenode的blockReceivedAndDeleted(),namenode会更新内存中block与datanode的对应关系(比如dn1上多了一个blk1副本);
- 关闭 dn1、dn2、dn3 构建的 Pipeline,且文件还有下一个块时,再从 ④ 开始,直到文件全部数据写完:
- ⑮最终,调用DFSOutputStream的close();
- ⑯客户端调用namenode的complete(),告知namenode文件传输完成。
假设说当前构建的pipeline是dn1、dn2、dn3构成的当传输数据的过程中,dn2挂了或通信不畅了,则当前pipeline中断 HDFS 会如何做?
- 先将ack queue中的所有packet全部放回到data queue中,客户端RPC调用namenode的updateBlockForPipeline(),为当前block(假设是blk1)生成新的版本比如ts1(本质是时间戳),故障dn2会从pipeline中删除;
- DFSOutputStream 再 RPC调用namenode的getAdditionalDatanode(),让namenode分配新的datanode,比如是dn4;
- 输出流将原dn1、dn3与新的dn4组成新的管道,他们上边的blk1版本设置为新版本ts1,由于新添加的dn4上没有blk1的数据,客户端告知dn1或dn3,将其上的blk1的数据拷贝到dn4上;
- 新的数据管道建立好后,DFSOutputStream调用updatePipeline()更新namenode元数据;
- 至此,pipeline恢复,客户端按正常的写入流程,完成文件的上传;
- 故障datanode重启后,namenode发现它上边的block的blk1的时间戳是老的,会让datanode将blk1删除掉。
- ① client端读取HDFS文件,client调用文件系统对象DistributedFileSystem的open方法;
- ② 返回FSDataInputStream对象(对DFSInputStream的包装);
- ③ 构造DFSInputStream对象时,调用namenode的getBlockLocations方法,获得file的开始若干block(如blk1, blk2, blk3, blk4)的存储datanode(以下简称dn)列表;针对每个block的dn列表,会根据网络拓扑做排序,离client近的排在前;
- ④ 调用DFSInputStream的read方法,先读取blk1的数据,与client最近的datanode建立连接,读取数据;
- ⑤ 读取完后,关闭与dn建立的流;
- ⑥ 读取下一个block,如blk2的数据(重复步骤④、⑤、⑥);
- ⑦ 这一批block读取完后,再读取下一批block的数据(重复③、④、⑤、⑥、⑦);
- ⑧ 完成文件数据读取后,调用FSDataInputStream的close方法。
如何容错?
- 情况一:读取block过程中,client与datanode通信中断
- client与存储此block的第二个datandoe建立连接,读取数据;
- 记录此有问题的datanode,不会再从它上读取数据。
- 情况二:client读取block,发现block数据有问题
- client读取block数据时,同时会读取到block的校验和,若client针对读取过来的block数据,计算检验和,其值与读取过来的校验和不一样,说明block数据损坏;
- client从存储此block副本的其它datanode上读取block数据(也会计算校验和);
- 同时,client会告知namenode此情况。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)