大数据高级开发工程师——Hadoop学习笔记(2)

大数据高级开发工程师——Hadoop学习笔记(2),第1张

数据高级开发工程师——Hadoop学习笔记(2)

文章目录
  • Hadoop进阶篇
    • HDFS:Hadoop分布式文件系统
      • 分布式文件系统的理解
      • HDFS 架构详细剖析
        • 1. 分块存储
        • 2. 3副本存储
        • 3. [扩展:机架感知](https://hadoop.apache.org/docs/r3.1.4/hadoop-project-dist/hadoop-common/RackAwareness.html)
        • 4. 抽象成数据块的好处
        • 5. HDFS架构
        • 6. 扩展
          • 块缓存
          • hdfs的文件权限验证
      • HDFS 的 Shell 命令 *** 作
        • hdfs 常用命令
        • 补充命令
      • hdfs的优缺点
        • 1. hdfs的优点
        • 2. hdfs的缺点
      • hdfs安全模式
      • hdfs的java编程
        • 1. 创建文件夹
        • 2. 文件上传
        • 3. 文件下载
        • 4. 查看hdfs文件详细信息
        • 5. IO 流 *** 作 hdfs 文件
        • 6. hdfs 的小文件合并
      • DataNode 工作机制及存储
        • 1. DataNode 工作机制
        • 2. 数据完整性
        • 3. 掉线时限参数设置
        • 4. DataNode的目录结构
        • 5. DataNode多目录配置
      • hdfs的读写流程
        • 1. hdfs 的写入流程
        • 2. hdfs 的读取流程
        • 更详细的 hdfs 写入流程图

Hadoop进阶篇 HDFS:Hadoop分布式文件系统

分布式文件系统的理解

  • 最直观的理解便是:三个臭皮匠,顶个诸葛亮。

HDFS 架构详细剖析 1. 分块存储

问:上边的文件有几个块,分别是多大?

  • 保存文件到HDFS时,会先默认按128M的单位对文件进行切分成一个个 block 块
  • 数据以block块的形式存在 HDFS 文件系统中
    • 在hadoop1当中,文件的block块默认大小是64M
    • hadoop2当中,文件的block块大小默认是128M,block块的大小可以通过 hdfs-site.xml 当中的配置文件进行指定

    dfs.blocksize
    块大小 以字节为单位

  • hdfs-default.xml 参考默认属性
  • block元数据
    • 每个 block 块的元数据大小大概为150字节
    • 一个 1k 大小的 block 与一个 128M 大小的block的元数据基本相等
    • 所以在 namenode 内存有限的情况下,存储大文件更划算。

面试题

如果有一个文件大小为1KB,它有几个块?每个块是多大?

类似于有一个水桶可以装128斤的水,但是我只装了1斤的水,那么我的水桶里面水的重量就是1斤,而不是128斤

2. 3副本存储
  • 为了保证block块的安全性,也就是数据的安全性,在hadoop2当中,采用文件默认保存三个副本,我们可以更改副本数以提高数据的安全性。
  • 在 hdfs-site.xml 当中修改以下配置属性,即可更改文件的副本数。

    dfs.replication
    3

3. 扩展:机架感知
  • 副本存放策略,不同版本稍有区别
    • 比如apache hadoop 2.7.7
    • 比如apache hadoop 2.8.5

4. 抽象成数据块的好处
  • 文件可能大于集群中任意一个磁盘
    • 10T * 3 / 128 = xxx 块,10 T 文件方式存多个 block 块,这些 block 块属于一个文件
  • 使用块抽象而不是文件可以简化存储子系统
    • hdfs将所有的文件全部抽象成为block块来进行存储,不管文件大小,全部一视同仁都是以block块的形式进行存储,方便我们的分布式文件系统对文件的管理
  • 块非常适合用于数据备份,进而提供数据容错能力和可用性
5. HDFS架构
  • HDFS 集群包括:NameNode、DataNode 和 Secondary NameNode
    • NameNode 负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
    • DataNode 负责管理用户的文件数据块,每一个数据块都可以在多个 DataNode 上存储多个副本。
    • Secondary NameNode 用来监控 HDFS 状态的辅助后台程序,每隔一段时间获取 HDFS 元数据的快照。最主要作用是辅助 NameNode 管理元数据信息。

  • NameNode 与 DataNode 总结概述:
NameNodeDataNode存储元数据存储文件内容元数据保存在内存中文件内容保存在磁盘保存文件、block、DataNode之间的映射关系维护了block id 到 DataNode 本地文件的映射关系 6. 扩展 块缓存
  • 官网文档
  • 通常 DataNode 从磁盘中读取块,但对于访问频繁的文件,其对应的块可能被显示的缓存在 DataNode 的内存中,以堆外块缓存的形式存在。
  • 默认情况下,一个块仅缓存在一个 DataNode 的内存中,当然可以针对每个文件配置 DataNode 的数量。作业调度器通过在缓存块的 DataNode 上运行任务,可以利用块缓存的优势提高读 *** 作的性能。

例如: 连接(join) *** 作中使用的一个小的查询表就是块缓存的一个很好的候选。
用户或应用通过在缓存池中增加一个cache directive来告诉namenode需要缓存哪些文件及存多久。缓存池(cache pool)是一个拥有管理缓存权限和资源使用的管理性分组

hdfs的文件权限验证
  • hdfs的文件权限机制与linux系统的文件权限机制类似
    • r:read w:write x:execute 权限x对于文件表示忽略,对于文件夹表示是否有权限访问其内容
    • 如果linux系统用户zhangsan使用hadoop命令创建一个文件,那么这个文件在HDFS当中的owner就是zhangsan
    • HDFS文件权限的目的,防止好人做错事,而不是阻止坏人做坏事。HDFS相信你告诉我你是谁,你就是谁。
  • hdfs 的权限可以用 kerberos、ranger 来做
HDFS 的 Shell 命令 *** 作
  • HDFS命令有两种风格,两种命令均可使用,效果相同。
    • hadoop fs开头的
    • hdfs dfs开头的
hdfs 常用命令
  1. 如何查看 hdfs 或 hadoop 子命令的帮助信息,如 ls 子命令
hdfs dfs -help ls
hadoop fs -help ls #两个命令等价
  1. 查看 hdfs 文件系统中指定目录的文件列表。对比 linux 命令ls
hdfs dfs -ls /
hadoop fs -ls /
hdfs dfs -ls -R /
  1. 在hdfs文件系统中创建文件
hdfs dfs -touchz /edits.txt
  1. 向HDFS文件中追加内容
hadoop fs -appendToFile edit1.xml /edits.txt #将本地磁盘当前目录的edit1.xml内容追加到HDFS根目录 的edits.txt文件
  1. 查看HDFS文件内容
hdfs dfs -cat /edits.txt
  1. 从本地路径上传文件至HDFS
#用法:hdfs dfs -put /本地路径 /hdfs路径
hdfs dfs -put /linux本地磁盘文件 /hdfs路径文件
hdfs dfs -copyFromLocal /linux本地磁盘文件 /hdfs路径文件  #跟put作用一样
hdfs dfs -moveFromLocal /linux本地磁盘文件 /hdfs路径文件  #跟put作用一样,只不过,源文件被拷贝成功后,会被删除
  1. 在hdfs文件系统中下载文件
hdfs dfs -get /hdfs路径 /本地路径
hdfs dfs -copyToLocal /hdfs路径 /本地路径  #根get作用一样
  1. 在hdfs文件系统中创建目录
hdfs dfs -mkdir /shell
  1. 在hdfs文件系统中删除文件
hdfs dfs -rm /edits.txt
# INFO fs.TrashPolicyDefault: Moved: 'hdfs://node01:8020/edits.txt' to trash at: hdfs://node01:8020/user/hadoop/.Trash/Current/edits.txt

# 将文件彻底删除(被删除文件不放到hdfs的垃圾桶里) how?
hdfs dfs -rm -skipTrash /edit1.xml
  1. 在hdfs文件系统中修改文件名称(也可以用来移动文件到目录)
# 先创建一个文件
hdfs dfs -touchz /a.txt
# 重命名文件
hdfs dfs -mv /a.txt /b.sh
# 移动文件
hdfs dfs -mv /b.sh /shell
  1. 在hdfs中拷贝文件到目录
hdfs dfs -cp /xrsync.sh /shell
  1. 递归删除目录
hdfs dfs -rm -r /shell
  1. 列出本地文件的内容(默认是hdfs文件系统)
hdfs dfs -ls file:///home/hadoop/
  1. 查找文件
# linux find命令
find . -name 'edit*'
# HDFS find命令
hadoop fs -find / -name a*.txt # 在HDFS根目录中,查找文件名以 a开头的txt文件
  • 总结:
    • 输入hadoop fs 或hdfs dfs,回车,查看所有的HDFS命令
    • 许多命令与linux命令有很大的相似性,学会举一反三
    • 有用的help,如查看ls命令的使用说明:hadoop fs -help ls
    • 绝大多数的大数据框架的命令,也有类似的help信息
补充命令
  1. hdfs与getconf结合使用
# 获取NameNode的节点名称(可能有多个)
hdfs getconf -namenodes
# 获取hdfs最小块信息, 用相同命令可获取其他的属性值
hdfs getconf -confKey dfs.namenode.fs-limits.min-block-size
# 查找hdfs的NameNode的RPC地址
hdfs getconf -nnRpcAddresses
  1. hdfs与dfsadmin结合使用
# 查看hdfs dfsadmin的帮助信息
hdfs dfsadmin
# 同样要学会借助help查看具体命令的帮助信息
hdfs dfsadmin -help safemode
# 查看当前的模式
hdfs dfsadmin -safemode get
# 进入/退出安全模式
hdfs dfsadmin -safemode enter # 进入安全模式
hdfs dfsadmin -safemode leave #退出安全模式
  1. hdfs与fsck结合使用
# fsck指令显示HDFS块信息
hdfs fsck /a.txt -files -blocks -locations # 查看文件 a.txt 的块信息
  1. 其他命令
# 检查压缩库本地安装情况
hadoop checknative
# 格式化名称节点(慎用,一般只在初次搭建集群,使用一次;格式化成功后,不要再使用)
hadoop namenode -format
# 执行自定义jar包
hadoop jar /bigdata/install/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar pi 5 5
yarn jar /bigdata/install/hadoop-3.1.4/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.4.jar pi 5 5
hdfs的优缺点 1. hdfs的优点
  • 高容错性
    • 数据自动保存多个副本,它通过增加副本的形式,提高容错性。
    • 某一个副本丢失以后,它可以自动恢复,这是由 HDFS 内部机制自动实现。
  • 适合批处理
    • 把数据位置暴露给计算框架,通过移动计算而不是移动数据,提高效率。
  • 适合大数据处理
    • 数据规模:能够处理数据规模达到 GB、TB、甚至PB级别的数据。
    • 文件规模:能够处理百万规模以上的文件数量,数量相当之大。
    • 节点规模:能够处理10K 节点的规模。
  • 流式数据访问
    • 一次写入,多次读取。
    • 不能随机修改,只能追加。
    • 它能保证数据的一致性。
  • 可构建在廉价的机器上
    • 它通过多副本机制,提高可靠性。
    • 它提供了容错和恢复机制:比如某一个副本丢失,可以通过其它副本来恢复。
2. hdfs的缺点
  • 不适合低延时的数据访问
    • 比如毫秒级的存储、读取数据,这是不行的,它做不到。
    • 它适合高吞吐率的场景,就是在某一时间内写入大量的数据。
  • 无法高效的对大量小文件进行存储
    • 存储大量小文件的话,它会占用 NameNode大量的内存来存储文件、目录和块信息。这样是不可取的,因为NameNode的内存总是有限的。
    • 小文件存储的寻道时间会超过读取时间,它违反了HDFS的设计目标。
  • 并发写入、文件随机修改
    • 一个文件只能有一个写,不允许多个线程同时写(租约机制)。
    • 仅支持数据 append(追加),不支持文件的随机修改。
hdfs安全模式
  • 安全模式是HDFS所处的一种特殊状态
    • 文件系统只接受读请求
    • 不接受写请求,如删除、修改等变更请求
  • 在NameNode主节点启动时,HDFS首先进入安全模式
    • DataNode 在启动的时候会向 NameNode 汇报可用的 block 等状态,当整个系统达到安全标准时,HDFS自动离开安全模式。
    • 如果HDFS处于安全模式下,则文件block不能进行任何的副本复制 *** 作,因此达到最小的副本数量要求是基于 DataNode 启动时的状态来判定的。
    • 启动时不会再做任何复制(从而达到最小副本数量要求)。
    • hdfs集群刚启动的时候,默认30S钟的时间是出于安全期的,只有过了30S之后,集群脱离了安全期,然后才可以对集群进行 *** 作
  • 何时退出安全模式
    • NameNode 知道集群共多少个block(不考虑副本),假设值是total;
    • NameNode 启动后,会上报block report,NameNode 开始累加统计满足最小副本数(默认1)的block个数,假设是num;
    • 当 num/total > 99.9% 时,退出安全模式。
$ hdfs dfsadmin -safemode  
Usage: hdfs dfsadmin [-safemode enter | leave | get | wait]
hdfs的java编程
  • maven 依赖:
  
    UTF-8
    1.8
    1.8
    3.1.4
  

  
    
      org.apache.hadoop
      hadoop-client
      ${hadoop.version}
    
    
      org.apache.hadoop
      hadoop-common
      ${hadoop.version}
    

    
      org.apache.hadoop
      hadoop-hdfs
      ${hadoop.version}
    

    
      org.apache.hadoop
      hadoop-mapreduce-client-core
      ${hadoop.version}
    
    
    
      junit
      junit
      4.11
      test
    
    
      org.testng
      testng
      RELEASE
    
    
      log4j
      log4j
      1.2.17
    
  
  
    
      
        org.apache.maven.plugins
        maven-compiler-plugin
        3.0
        
          1.8
          1.8
          UTF-8
          
        
      
      
        org.apache.maven.plugins
        maven-shade-plugin
        2.4.3
        
          
            package
            
              shade
            
            
              true
            
          
        
      
    
  
  • api文档
1. 创建文件夹
public class HdfsMkdirsTest {
    // 简化版
    @Test
    public void mkdirsOnHdfs_simple() throws IOException {
        // 配置项
        Configuration configuration = new Configuration();
        // 设置要连接的 hdfs 集群 NameNode
        configuration.set("fs.defaultFS", "hdfs://node01:8020");
        // 获取文件系统
        FileSystem fileSystem = FileSystem.get(configuration);
        // 调用方法创建目录,若目录存在,则创建失败,返回false
        boolean result = fileSystem.mkdirs(new Path("/yw/dir1"));

        assertTrue(result);
        fileSystem.close();
    }

    // 指定目录所属用户
    @Test
    public void mkdirsOnHdfs_withUser() throws Exception {
        // 配置项
        Configuration configuration = new Configuration();
        // 获取文件系统
        FileSystem fileSystem = FileSystem.get(
                new URI("hdfs://node01:8020"), configuration, "test");
        // 调用方法创建目录,若目录存在,则创建失败,返回false
        boolean result = fileSystem.mkdirs(new Path("/yw/dir2"));

        assertTrue(result);
        fileSystem.close();
    }

    // 创建目录时,指定目录权限
    @Test
    public void mkdirsOnHdfs_withPermission() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://node01:8020");
        FileSystem fileSystem = FileSystem.get(configuration);
        FsPermission fsPermission = new FsPermission(FsAction.ALL, FsAction.READ, FsAction.READ);

        boolean result = fileSystem.mkdirs(new Path("hdfs://node01:8020/yw/dir3"), fsPermission);

        assertTrue(result);
        fileSystem.close();
    }
}

2. 文件上传
public class HdfsUploadFileTest {
    @Test
    public void uploadFile2Hdfs() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://node01:8020");
        FileSystem fileSystem = FileSystem.get(configuration);
        fileSystem.copyFromLocalFile(new Path("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/test/resources/hello.txt"),
                new Path("/yw/dir1"));
        fileSystem.close();
    }
}
3. 文件下载
public class HdfsDownloadFileTest {
    @Test
    public void downloadFile2Hdfs() throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://node01:8020");
        FileSystem fileSystem = FileSystem.get(configuration);
        fileSystem.copyToLocalFile(new Path("/yw/dir1/hello.txt"),
                new Path("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/test/resources"));

//        // 删除文件
//        fileSystem.delete()
//        // 重命名文件
//        fileSystem.rename()

        fileSystem.close();
    }
}
4. 查看hdfs文件详细信息
public class HdfsViewFileTest {
    @Test
    public void uploadFile2Hdfs() throws Exception {
        // 获取文件系统
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), configuration);
        // 获取文件详情
        RemoteIterator listFiles = fs.listFiles(new Path("/yw"), true);
        while (listFiles.hasNext()) {
            LocatedFileStatus status = listFiles.next();
            // 输出文件详情
            // 文件名称
            System.out.println(status.getPath().getName());
            // 长度
            System.out.println(status.getLen());
            // 权限
            System.out.println(status.getPermission());
            // 分组
            System.out.println(status.getGroup());
            // 获取存储的块信息
            BlockLocation[] blockLocations = status.getBlockLocations();
            for (BlockLocation blockLocation : blockLocations) {
                // 获取块存储的主机节点
                String[] hosts = blockLocation.getHosts();
                for (String host : hosts) {
                    System.out.println(host);
                }
            }
        }
        // 关闭资源
        fs.close();
    }
}
  • 输出:
hello.txt
13
rw-r--r--
supergroup
node01
node02
node03
5. IO 流 *** 作 hdfs 文件

@Test
public void putFile2Hdfs() throws Exception {
    // 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), configuration);
    // 创建输入流,不需要加 file:///,否则报错
    FileInputStream fis = new FileInputStream(new File("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/test/resources/hello.txt"));
    // 创建输出流,父目录不存在,会自动创建
    FSDataOutputStream fos = fs.create(new Path("/yw/dir2/hello.txt"));
    // 流对拷
    IOUtils.copy(fis, fos); // org.apache.commons.io.IOUtils
    // 关闭资源
    IOUtils.closeQuietly(fis);
    IOUtils.closeQuietly(fos);
    fs.close();
}
  • 通过IO流从hdfs上面下载文件类似,这里不做赘述。
6. hdfs 的小文件合并
@Test
public void mergeFile() throws Exception {
    // 获取分布式文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), configuration, "hadoop");
    FSDataOutputStream fsdos = fs.create(new Path("/yw/dir3/big.txt"));

    LocalFileSystem lfs = FileSystem.getLocal(configuration);
    FileStatus[] fileStatuses = lfs.listStatus(new Path("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/test/resources/"));
    for (FileStatus fileStatus : fileStatuses) {
        // 获取每一个本地文件路径
        Path path = fileStatus.getPath();
        // 读取本地小文件
        FSDataInputStream fsdis = lfs.open(path);
        IOUtils.copy(fsdis, fsdos);
        IOUtils.closeQuietly(fsdis);
    }
    IOUtils.closeQuietly(fsdos);
    lfs.close();
    fs.close();
}
DataNode 工作机制及存储

  • HDFS分布式文件系统也是一个主从架构
    • 主节点是我们的 NameNode,负责管理整个集群以及维护集群的元数据信息。
    • 从节点 DataNode,主要负责文件数据存储。
1. DataNode 工作机制
  • 一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件
    • 一个是数据本身,一个是元数据:包括数据块的长度,块数据的校验和,以及时间戳。
    • hdfs-site.xml中指定了数据存储的路径

    dfs.datanode.data.dir
    file:///bigdata/install/hadoop-3.1.4/hadoopDatas/datanodeDatas

  • DataNode 启动后向 NameNode 注册,通过后周期性(6小时)的向 NameNode 上报所有的块信息。
  • 心跳周期 3 秒
    • 心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块数据到另一台机器,或删除某个数据块。
    • 如果超过10分钟没有收到某个 DataNode 的心跳,则认为该节点不可用。
  • 集群运行中可以安全加入和退出一些机器。
2. 数据完整性
  • 当客户端向 hdfs 写数据时
    • 会计算数据的校验和,以此保证数据通过网络传输,到达 DataNode 后,没有丢失数据
  • 当 DataNode 读取 block 时
    • 它会计算checksum
    • 如果计算后的checksum,与 block 创建时值不一样,说明block已经损坏
    • client 读取其他 DataNode 上的block
  • DataNode 在其文件创建后周期验证 checksum
3. 掉线时限参数设置
  • DataNode 进程死亡或者网络故障造成 DataNode 无法与 NameNode 通信,NameNode 不会立即把该节点判定为死亡
  • 要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:
# 以下属性,可以查看官网的`hdfs-default.xml`文件
timeout = 2 * dfs.namenode.heartbeat.recheck-interval + 10 * dfs.heartbeat.interval
  • 而默认的dfs.namenode.heartbeat.recheck-interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。
  • 需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为

    dfs.namenode.heartbeat.recheck-interval
    300000


     dfs.heartbeat.interval 
    3

4. DataNode的目录结构
  • 和 NameNode 不同的是,DataNode 的存储目录是初始阶段自动创建的,不需要额外格式化。
  • 在主节点node01的目录/bigdata/install/hadoop-3.1.4/hadoopDatas/datanodeDatas/current下查看版本号
[hadoop@centos128 current]$ cat VERSION 
#Wed Nov 24 07:46:56 CST 2021
storageID=DS-1661390a-4348-459e-938f-220dd9984e38
clusterID=CID-d4ce4fe0-d991-4a87-bccb-fde0c80fd838
cTime=0
datanodeUuid=04b892c7-d0d5-4e65-ac96-8130476fde01
storageType=DATA_NODE
layoutVersion=-57
  • 具体解释:
    • storageID:存储 id 号
    • clusterID:集群ID,全局唯一
    • cTime:标记了 DataNode 存储系统的创建时间
      • 对于刚刚格式化的存储系统,这个属性为0;
      • 但是在文件系统升级之后,该值会更新到新的时间戳。
    • datanodeUuid:DataNode 的唯一识别码
    • storageType:存储类型
    • layoutVersion:是一个负整数。通常只有HDFS增加新特性时才会更新这个版本号。
5. DataNode多目录配置
  • DataNode 也可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本。具体配置如下:
vim /bigdata/install/hadoop-3.1.4/etc/hadoop/hdfs-site.xml

   dfs.datanode.data.dir
   
     /home/hadoop/develop/data/data1/hdfs,
     /home/hadoop/develop/data/data2/hdfs		
  

hdfs的读写流程 1. hdfs 的写入流程
  • 创建文件:
    • ①HDFS client向HDFS写入数据,先调用DistributedFileSystem.create();
    • ②RPC调用namenode的create(),会在HDFS目录树中指定的路径,添加新文件;
    • 并将 *** 作记录在edits.log中;
    • namenode.create()方法执行完后,返回一个FSDataOutputStream,它是DFSOutputStream的包装类;
  • 建立数据流管道 Pipeline:
    • ③client调用DFSOutputStream.write()写数据(先写第一个块的数据,暂时叫blk1);
    • ④DFSOutputStream通过RPC调用namenode的addBlock,向namenode申请一个空的数据块block;
    • ⑤addBlock返回LocatedBlock对象;此对象中包含了当前blk要存储在哪三个datanode的信息,比如dn1、dn2、dn3;
    • ⑥客户端,根据位置信息,建立数据流管道(图中蓝色线条)
  • 向数据流管道写入当前块的数据:
    • ⑦写数据时,先将数据写入一个检验块chunk中,写满512字节后,对此chunk计算校验和checksum值(4字节);
    • ⑧然后将chunk及对应校验和写入packet中,一个packet是64KB;
    • ⑨随着源源不断的带校验和的chunk写入packet,当packet写满后,将packet写入dataqueue数据队列中;
    • ⑩packet从队列中取出,沿pipeline发送到dn1,再从dn1发送到dn2,再从dn2发送到dn3;
    • ⑪同时,此packet会保存一份到一个确认队列ack queue中;
    • ⑫packet到达最后一个datanode即dn3后,做校验,将校验结果逆着pipeline方向回传到客户端,具体是校验结果从dn3传到dn2,dn2也会做校验,校验结果再传到dn1,dn1也做校验;结果再传回客户端;
    • ⑬客户端根据校验结果,如果“成功”,则将将保存在ack queue中的packet删除;如果失败,则将packet取出,重新放回到data queue末尾,等待再次沿pipeline发送;
    • ⑭如此,将block中的一个数据一个个packet发送出去;当此block发送完毕,即dn1、dn2、dn3都接受了blk1的完整的副本,那么三个dn分别RPC调用namenode的blockReceivedAndDeleted(),namenode会更新内存中block与datanode的对应关系(比如dn1上多了一个blk1副本);
  • 关闭 dn1、dn2、dn3 构建的 Pipeline,且文件还有下一个块时,再从 ④ 开始,直到文件全部数据写完:
    • ⑮最终,调用DFSOutputStream的close();
    • ⑯客户端调用namenode的complete(),告知namenode文件传输完成。

假设说当前构建的pipeline是dn1、dn2、dn3构成的当传输数据的过程中,dn2挂了或通信不畅了,则当前pipeline中断 HDFS 会如何做?

  • 先将ack queue中的所有packet全部放回到data queue中,客户端RPC调用namenode的updateBlockForPipeline(),为当前block(假设是blk1)生成新的版本比如ts1(本质是时间戳),故障dn2会从pipeline中删除;
  • DFSOutputStream 再 RPC调用namenode的getAdditionalDatanode(),让namenode分配新的datanode,比如是dn4;
  • 输出流将原dn1、dn3与新的dn4组成新的管道,他们上边的blk1版本设置为新版本ts1,由于新添加的dn4上没有blk1的数据,客户端告知dn1或dn3,将其上的blk1的数据拷贝到dn4上;
  • 新的数据管道建立好后,DFSOutputStream调用updatePipeline()更新namenode元数据;
  • 至此,pipeline恢复,客户端按正常的写入流程,完成文件的上传;
  • 故障datanode重启后,namenode发现它上边的block的blk1的时间戳是老的,会让datanode将blk1删除掉。
2. hdfs 的读取流程
  • ① client端读取HDFS文件,client调用文件系统对象DistributedFileSystem的open方法;
  • ② 返回FSDataInputStream对象(对DFSInputStream的包装);
  • ③ 构造DFSInputStream对象时,调用namenode的getBlockLocations方法,获得file的开始若干block(如blk1, blk2, blk3, blk4)的存储datanode(以下简称dn)列表;针对每个block的dn列表,会根据网络拓扑做排序,离client近的排在前;
  • ④ 调用DFSInputStream的read方法,先读取blk1的数据,与client最近的datanode建立连接,读取数据;
  • ⑤ 读取完后,关闭与dn建立的流;
  • ⑥ 读取下一个block,如blk2的数据(重复步骤④、⑤、⑥);
  • ⑦ 这一批block读取完后,再读取下一批block的数据(重复③、④、⑤、⑥、⑦);
  • ⑧ 完成文件数据读取后,调用FSDataInputStream的close方法。

如何容错?

  • 情况一:读取block过程中,client与datanode通信中断
    • client与存储此block的第二个datandoe建立连接,读取数据;
    • 记录此有问题的datanode,不会再从它上读取数据。
  • 情况二:client读取block,发现block数据有问题
    • client读取block数据时,同时会读取到block的校验和,若client针对读取过来的block数据,计算检验和,其值与读取过来的校验和不一样,说明block数据损坏;
    • client从存储此block副本的其它datanode上读取block数据(也会计算校验和);
    • 同时,client会告知namenode此情况。
更详细的 hdfs 写入流程图

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5619528.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存