Hudi

Hudi,第1张

Hudi

目录

数据湖技术选型 - Hudi ;Iceberg ;Data Lake

Hudi

Hudi 的前世今生:

1.1.1 什么是Apache Hudi

1.1.2 Hudi 支持的文件格式

1.1.3 表格式

1.1.2 使用Hudi的优点

1.1.3 Hoodie 的基本概念梳理

1.1.4 Hudi的设计动机

1.1.5 Hudi可以避免小文件问题

1.1.6  Hudi 典型应用场景 ---  近实时摄取 / 分析、增量处理管道、DFS上数据分发

2. Hive和Presto与hudi的集成

2.1 hive

2.2 Presto

3. DeltaStreamer工具写数据到Hudi

4.  Hudi可以避免小文件问题

4.1.1 Hudi 避免小文件问题

4.1.2 Clustering架构

5. Hudi代码源码

5.1 CopyonWrite 模式 *** 作(默认模式)

5.2  Hudi 源码

5.3.1  删除hudi中的数据

6. 使用Spark *** 作hudi

7. 问题整理

1. Merge on Read问题

2. spark pom依赖问题

3. hive视图同步问题


数据湖技术选型 - Hudi ;Iceberg ;Data Lake

点卡这个链接,你就清晰明了了。source : 深度对比Delta、Iceberg和Hudi三大开源数据湖方案-InfoQ

Data Lake 支持的文件格式单一,基本被Pass了。

Hudi: Apache Hudi 是由 Uber 的工程师为满足其内部数据分析的需求而设计的数据湖项目,它提供的 fast upsert/delete 以及 compaction 等功能可以说是精准命中广大人民群众的痛点,加上项目各成员积极地社区建设,包括技术细节分享、国内社区推广等等,也在逐步地吸引潜在用户的目光。

Apache Iceberg :目前看则会显得相对平庸一些,简单说社区关注度暂时比不上 Delta,功能也不如 Hudi 丰富,但却是一个野心勃勃的项目,因为它具有高度抽象和非常优雅的设计,为成为一个通用的数据湖方案奠定了良好基础。但是因为 Iceberg 是一个统一的数据组织格式,想要全面使用的话必须使所有的上层引擎能够对接适配,因此这一块环节的补足是当前最为重要的。

Delta 和 Hudi 两个项目在开源社区的建设和推动方面,做的比较好。

Hudi

source : Apache Hudi入门指南(含代码示例)

source : Apache Hudi 介绍与应用 - ZacksTang - 博客园 (cnblogs.com)

源码git 下载: https://github.com/apache/hudi/releases/tag/release-0.9.0

Hudi 的前世今生:

        Uber 的业务场景主要为:将线上产生的行程订单数据,同步到一个统一的数据中心,然后供上层各个城市运营同事用来做分析和处理。在 2014 年的时候,Uber 的数据湖架构相对比较简单,业务日志经由 Kafka 同步到 S3 上,上层用 EMR 做数据分析;线上的关系型数据库以及 NoSQL 则会通过 ETL(ETL 任务也会拉去一些 Kakfa 同步到 S3 的数据)任务同步到闭源的 Vertica 分析型数据库,城市运营同学主要通过 Vertica SQL 实现数据聚合。当时也碰到数据格式混乱、系统扩展成本高(依赖收 Vertica 商业收费软件)、数据回填麻烦等问题。后续迁移到开源的 Hadoop 生态,解决了扩展性问题等问题,但依然碰到 Databricks 上述的一些问题,其中最核心的问题是无法快速 upsert 存量数据。

        ETL 任务每隔 30 分钟定期地把增量更新数据同步到分析表中,全部改写已存在的全量旧数据文件,导致数据延迟和资源消耗都很高。此外,在数据湖的下游,还存在流式作业会增量地消费新写入的数据,数据湖的流式消费对他们来说也是必备的功能。所以,他们就希望设计一种合适的数据湖方案,在解决通用数据湖需求的前提下,还能实现快速的 upsert 以及流式增量消费。

        Uber 团队在 Hudi 上同时实现了 Copy On Write 和 Merge On Read 的两种数据格式,其中 Merge On Read 就是为了解决他们的(快插) fast upsert 而设计的。简单来说,就是每次把增量更新数据都写入到一批独立的 delta 文件集,定期地通过 compaction 合并 delta 文件和存量的 data 文件。同时给上层分析引擎提供三种不同的读取视角:仅读取 delta 增量文件、仅读取 data 文件、合并读取 delta 和 data 文件。

1.1.1 什么是Apache Hudi

source :Apache Hudi - 数据湖平台|阿帕奇胡迪!

source : Apache Hudi入门指南(含代码示例) - 云+社区 - 腾讯云 (tencent.com)      

          一个spark 库 大数据更新解决方案,大数据中没有传统意义的更新,只有 append和重写(Hudi就是采用重写方式)

          Hudi 充分利用了像 HDFS 之类的存储模式所支持的“append"特性。这有助于 Hudi 提供流式写入,而不会导致文件计数 / 表元数据激增。不幸的是,目前大多数云 / 对象存储都不提供“append”功能(Azure 除外)。未来我们计划利用主流云对象存储的低级 API,在流式摄取延迟时提供对文件计数的类似控制。

XN:现在还是用的重写模式。将来有望使用类似append的的方式写入。

格式转化: Hudi 并非设计为通用 表格格式( 表格格式知识表格元数据的表示 ),用于跟踪文件、文件夹以进行批处理。相反hudi 致力于构建自己的原生表格,着眼于增量处理。用户可以从hudi转换为其他格式。

状态存储:若想构建一个支持高校更新和提取数据流的数据库,同时保持针对大批量的查询的优化。 可以使用hudi 作为状态存储和可更新接收器来构建增量管道。

       数据湖用户将数据写入开放的文件格式(如 Apache Parquet / ORC),这些文件格式存储在高度可扩展的云存储或分布式文件系统之上。Hudi 提供了一个自管理的数据平面来摄取、转换和管理这些数据并解锁了对它们进行增量处理的方式。


1.1.2 Hudi 支持的文件格式

2021-08-19source:Apache Hudi - 数据湖平台|阿帕奇胡迪!

2021-08-19日前支持的基本文件格式包括parquet(列访问)和HFile(索引访问)。

        Hudi 是围绕基本文件和增量日志文件的概念设计的,它们将更新 / 增量数据存储到给定的基本文件(称为文件片,file slice)。它们的格式是可插拔的,目前支持的基本文件格式包括 Parquet(列访问)和 HFile(索引访问)。增量日志以 Avro(面向行)格式对数据进行编码,以实现更快的日志记录(就像 Kafka topic 一样)。

        展望未来,我们计划在即将发布的版本中将每种基本文件格式内联到日志块中,根据块大小提供对增量日志的列式访问。未来的计划还包括支持 ORC 基础 / 日志文件格式、非结构化数据格式(自由的 json 格式、图像),甚至使用事件流系统 /OLAP 引擎 / 数仓的分层存储层的原生文件格式。

1.1.3 表格式

        表格式仅包括:表的文件布局、表的 schema 和对表更改的元数据跟踪。Hudi 使用 Avro 模式来存储、管理和演进表的 schema。目前 Hudi 强制执行 schema-on-write,虽然比 schema-on-read 更严格,但在流处理领域被广泛采用,以确保管道不会因无法向后兼容的变更而中断。


1.1.2 使用Hudi的优点

使用Bloomfilter机制+二次查找,可快速确定记录是更新还是新增

更新范围小,是文件级别,不是表级别

文件大小与hdfs的Blocksize保持一致

数据文件使用parquet格式,充分利用列存的优势(dremal论文实现)

提供了可扩展的大数据更新框架

并发度由spark控制

Hadoop生态系统有潜力作为面向分钟级延时场景的通用统一服务层。然而,为了实现这一点,这需要在 HDFS 中实现高效且低延迟的数据摄取及数据准备。

Hudi项目,这是一个增量处理框架。我们首先讨论一下为什么将Hadoop作为统一的服务层是一个不错的想法。

1.1.3 Hoodie 的基本概念梳理

source : 数据湖 | Apache Hudi 设计与架构最强解读 - 云+社区 - 腾讯云

键-值数据模型:

        在写方面,Hudi表被建模为键值对数据集,其中每条记录都有一个唯一的记录键。此外,一个记录键还可以包括分区路径,在该路径下,可以对记录进行分区和存储。这通常有助于减少索引查询的搜索空间。

Hudi表的三个主要组件:

1)有序的时间轴元数据。类似于数据库事务日志。

2)分层布局的数据文件:实际写入表中的数据。

3)索引(多种实现方式):映射包含指定记录的数据集。

1.1.4 Hudi的设计动机

Lambda架构,它的数据的处理依赖流式计算层(Streaming Layer)和批处理计算层(Batch Layer)的双重计算。------->  每隔几个小时,批处理过程被启动以计算精确的业务状态,并将批量更新加载到服务层。流式计算层对这个业务数据进行实时的状态更新。  ------->    这个流计算的状态只是一个最终结果的近似值,最终需要被批处理的计算结果所覆盖。

由于两种模式提供的状态差异,我们需要为批处理和流处理提供不同的服务层,并在这个上面再做合并抽象,或者设计应用一个相当复杂的服务系统(如Druid),用于同时在行级更新和批量加载中提供优异表现。Kappa架构认为不需要一个额外单独的批处理层,一个单独的流式计算层足以成为数据处理的通用解决方案

火山模型(Volcano Iterator model):火山模型是数据库界已经很成熟的解释计算模型,该计算模型将关系代数中每一种 *** 作抽象为一个 Operator,将整个 SQL 构建成一个 Operator 树,从根节点到叶子结点自上而下地递归调用 next() 函数。

这就意味着流式计算层可以依靠堆资源以增加并行能力的方式来对业务状态进行重算更新。这类系统可以依靠有效的检查点(checkpoint)和大量的状态管理来让流式处理的结果不再只是一个近似值。

1.1.5 Hudi可以避免小文件问题

source : 干货!Apache Hudi如何智能处理小文件问题 - leesf - 博客园

引入: 在数据湖/仓库中,需要在摄取速度和查询性能之间进行权衡,数据摄取通常更喜欢小文件以改善并行性并使数据尽快可用于查询,但很多小文件会导致查询性能下降。   

大量的小文件将会导致很差的查询分析性能,因为查询引擎执行查询时需要进行太多次文件的打开/读取/关闭。在流式场景中不断摄取数据,如果不进行处理,会产生很多小文件。

        通常情况下,Hive或Spark计算时会生成大量小文件,然后再通过一些手段将它们合并在一起,这样只能解决由小文件引起的系统可伸缩性问题,但是无法解决未合并前,对小文件进行查询时效率低下的问题。而在Hudi中,一个关键的设计是避免创建小文件,并且总是生成大小合适的文件。Hudi在 ingest/writing 上花费更多的时间,以保持查询时始终高效。与常规解决方法不同,Hudi直接在生成端避免小文件问题,使小文件无法暴露给计算引擎,也就解决了小文件的低效查询问题。

1.1.6  Hudi 典型应用场景 ---  近实时摄取 / 分析、增量处理管道、DFS上数据分发

source : 基于Apache Hudi构建数据湖的典型应用场景介绍 - leesf - 博客园

1.近实时摄取

        将数据从外部源如事件日志、数据库提取到Hadoop数据湖 中是一个很常见的问题。在大多数Hadoop部署中,一般使用混合提取工具并以零散的方式解决该问题,尽管这些数据对组织是非常有价值的。

        对于RDBMS摄取,Hudi通过Upserts提供了更快的负载,而非昂贵且低效的批量负载。例如你可以读取MySQL binlog日志或Sqoop增量导入,并将它们应用在DFS上的Hudi表,这比批量合并作业或复杂的手工合并工作流更快/更高效。对于像Cassandra / Voldemort / Hbase这样的NoSQL数据库,即使规模集群不大也可以存储数十亿行数据,此时进行批量加载则完全不可行,需要采用更有效的方法使得摄取速度与较频繁的更新数据量相匹配。

        即使对于像Kafka这样的不可变数据源,Hudi也会强制在DFS上保持最小文件大小,从而解决Hadoop领域中的古老问题以便改善NameNode的运行状况。这对于事件流尤为重要,因为事件流(例如单击流)通常较大,如果管理不善,可能会严重损害Hadoop集群性能。

(什么叫不可变数据源呀?Kafka 是不可变数据源嘛?hudi 是有合并文件的能力,那有为什么会在DFS上保持最小文件大小呢?这和hudi 的设计不冲突嘛?)

对于所有数据源,Hudi都提供了通过提交将新数据原子化地发布给消费者,从而避免部分提取失败。

2. 近实时分析

        通常实时数据集是由专门的分析存储,如Druid、Memsql甚至OpenTSDB提供支持。这对于需要亚秒级查询响应(例如系统监视或交互式实时分析)的较小规模(相对于安装Hadoop)数据而言是非常完美的选择。但由于Hadoop上的数据令人难以忍受,因此这些系统通常最终会被较少的交互查询所滥用,从而导致利用率不足和硬件/许可证成本的浪费。

        另一方面,Hadoop上的交互式SQL解决方案(如Presto和SparkSQL),能在几秒钟内完成的查询。通过将数据的更新时间缩短至几分钟,Hudi提供了一种高效的替代方案,并且还可以对存储在DFS上多个更大的表进行实时分析。此外,Hudi没有外部依赖项(例如专用于实时分析的专用Hbase群集),因此可以在不增加运营成本的情况下,对更实时的数据进行更快的分析。

3. 增量处理管道

        Hadoop提供的一项基本功能是构建基于表的派生链,并通过DAG表示整个工作流。工作流通常取决于多个上游工作流输出的新数据,传统上新生成的DFS文件夹/Hive分区表示新数据可用。例如上游工作流 U可以每小时创建一个Hive分区,并在每小时的末尾( processing_time)包含该小时( event_time)的数据,从而提供1小时的数据新鲜度。然后下游工作流 D在 U完成后立即开始,并在接下来的一个小时进行处理,从而将延迟增加到2个小时。

       上述示例忽略了延迟到达的数据,即 processing_time和 event_time分开的情况。不幸的是在后移动和物联网前的时代,数据延迟到达是非常常见的情况。在这种情况下,保证正确性的唯一方法是每小时重复处理最后几个小时的数据,这会严重损害整个生态系统的效率。想象下在数百个工作流中每小时重新处理TB级别的数据。

      Hudi可以很好的解决上述问题,其通过记录粒度(而非文件夹或分区)来消费上游Hudi表 HU 中的新数据,下游的Hudi表 HD 应用处理逻辑并更新/协调延迟数据,这里 HU 和 HD 可以以更频繁的时间(例如15分钟)连续进行调度,并在 HD上提供30分钟的端到端延迟。

        为了实现这一目标,Hudi从流处理框架如Spark Streaming、发布/订阅系统如Kafka或数据库复制技术如Oracle XStream中引入了类似概念。若感兴趣可以在此处找到有关增量处理(与流处理和批处理相比)更多优势的更详细说明。

4. DFS上数据分发

        Hadoop的经典应用是处理数据,然后将其分发到在线存储以供应用程序使用。例如使用Spark Pipeline将Hadoop的数据导入到ElasticSearch供Uber应用程序使用。一种典型的架构是在Hadoop和服务存储之间使用 队列 进行解耦,以防止压垮目标服务存储,一般会选择Kafka作为队列,该架构会导致相同数据冗余存储在DFS(用于对计算结果进行离线分析)和Kafka(用于分发)上。

        Hudi可以通过以下方式再次有效地解决此问题:将Spark Pipeline 插入更新输出到Hudi表,然后对表进行增量读取(就像Kafka主题一样)以获取新数据并写入服务存储中,即使用Hudi统一存储。

2. Hive和Presto与hudi的集成

source :Apache Hudi入门指南(含代码示例) (qq.com)

2.1 hive

hive 查询hudi 数据主要是在hive中建立外部表数据路径指向hdfs 路径,同时hudi 重写了inputformat 和outpurtformat。因为hudi 在读的数据的时候会读元数据来决定我要加载那些parquet文件,而在写的时候会写入新的元数据信息到hdfs路径下。所以hive 要集成hudi 查询要把编译的jar 包放到HIVE-HOME/lib 下面。否则查询时找不到inputformat和outputformat的类。hive 外表数据结构

hive集成hudi方法:将hudi jar复制到hive lib下

cp  ./packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.5.2-SNAPSHOT.jar   $HIVE_HOME/lib

2.2 Presto

presto 集成hudi 是基于hive catalog 同样是访问hive 外表进行查询,如果要集成需要把hudi 包copy 到presto hive-hadoop2插件下面。

presto集成hudi方法: 将hudi jar复制到 presto hive-hadoop2下

cp  ./packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.5.2-SNAPSHOT.jar  $PRESTO_HOME/plugin/hive-hadoop2/

3. DeltaStreamer工具写数据到Hudi

source : https://segmentfault.com/a/1190000040440572

Hudi 维护诸如提交时间线和索引之类的元数据来管理表。

提交时间表有助于了解表上发生的 *** 作以及表的当前状态。Hudi 使用索引来维护记录键到文件 id 的映射,以有效地定位记录。目前,Hudi 仅支持写入 parquet 柱状格式。为了能够开始对现有表使用 Hudi,您需要将现有表迁移到 Hudi 托管表。有几种方法可以实现这一点。

3.1.1  Hudi机制 ---- 存储机制

hudi维护了一个时间轴,记录了在不同时刻对数据集进行的所有 *** 作。

hudi拥有2种存储优化。

读优化(Copy On Write):在每次commit后都将最新的数据compaction成列式存储(parquet);

写优化(Merge On Read):对增量数据使用行式存储(avro),后台定期将它compaction成列式存储。

        读数据

hudi维护着一个索引,以支持在记录key存在情况下,将新记录的key快速映射到对应的fileId。索引的实现是插件式的,默认是bloomFilter,也可以使用Hbase。

hudi提供3种查询视图。

读优化视图:仅提供compaction后的列式存储的数据;

增量视图:仅提供一次compaction/commit前的增量数据;

实时视图:包括读优化的列式存储数据和写优化的行式存储数据。

        更新数据

hudi写数据的时候需要指定PRECOMBINE_FIELD_OPT_KEY、RECORDKEY_FIELD_OPT_KEY和PARTITIONPATH_FIELD_OPT_KEY。

RECORDKEY_FIELD_OPT_KEY:每条记录的唯一id,支持多个字段;

PRECOMBINE_FIELD_OPT_KEY:在数据合并的时候使用到,当 RECORDKEY_FIELD_OPT_KEY 相同时,默认取 PRECOMBINE_FIELD_OPT_KEY 属性配置的字段最大值所对应的行;

PARTITIONPATH_FIELD_OPT_KEY:用于存放数据的分区字段。

hudi更新数据和插入数据很相似(写法几乎一样),更新数据时,会根据 RECORDKEY_FIELD_OPT_KEY、PRECOMBINE_FIELD_OPT_KEY 以及 PARTITIONPATH_FIELD_OPT_KEY三个字段对数据进行Merge。

4.  Hudi可以避免小文件问题 4.1.1 Hudi 避免小文件问题

source : 干货!Apache Hudi如何智能处理小文件问题 - leesf - 博客园

引入: 在数据湖/仓库中,需要在摄取速度和查询性能之间进行权衡,数据摄取通常更喜欢小文件以改善并行性并使数据尽快可用于查询,但很多小文件会导致查询性能下降。   

        大量的小文件将会导致很差的查询分析性能,因为查询引擎执行查询时需要进行太多次文件的打开/读取/关闭。在流式场景中不断摄取数据,如果不进行处理,会产生很多小文件。

        通常情况下,Hive或Spark计算时会生成大量小文件,然后再通过一些手段将它们合并在一起,这样只能解决由小文件引起的系统可伸缩性问题,但是无法解决未合并前,对小文件进行查询时效率低下的问题。而在Hudi中,一个关键的设计是避免创建小文件,并且总是生成大小合适的文件。Hudi在 ingest/writing 上花费更多的时间,以保持查询时始终高效。与常规解决方法不同,Hudi直接在生成端避免小文件问题,使小文件无法暴露给计算引擎,也就解决了小文件的低效查询问题。

 写入时 vs 写入后:

        一种常见的处理方法先写入很多小文件,然后再合并成大文件以解决由小文件引起的系统扩展性问题,但由于暴露太多小文件可能导致不能保证查询的SLA。实际上对于Hudi表,通过Hudi提供的Clustering功能可以非常轻松的做到这一点. [至本文4.1.2]

        本篇文章将介绍Hudi的文件大小优化策略,即在写入时处理。Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用。在进行 insert/upsert  *** 作时,Hudi可以将文件大小维护在一个指定文件大小(注意:bulk_insert *** 作暂无此特性,其主要用于替换spark.write.parquet 方式将数据快速写入Hudi)。

配置

source : Configurations | Apache Hudi!

我们使用COPY_ON_WRITE表来演示Hudi如何自动处理文件大小特性。

关键配置项如下:

hoodie.parquet.max.file.size:数据文件最大大小,Hudi将试着维护文件大小到该指定值;

hoodie.parquet.small.file.limit:小于该大小的文件均被视为小文件;

hoodie.copyonwrite.insert.split.size:单文件中插入记录条数,此值应与单个文件中的记录数匹配(可以根据最大文件大小和每个记录大小来确定)

        例如如果你第一个配置值设置为120MB,第二个配置值设置为100MB,则任何大小小于100MB的文件都将被视为一个小文件,如果要关闭此功能,可将 hoodie.parquet.small.file.limit  配置值设置为0。

小于100MB 的会执行合并 *** 作, 大于120MB的会才拆解开,分散到新的file中.

(eg.假设hoodie.parquet.max.file.size = 120MB,现有一个130MB 的文件大小, 则会分成12k(每个记录数是1k )记录数+10k 记录数的两个文件. 后续如需继续添加, 会在10k记录数上进行添加补充.

4.1.2 Clustering架构

source : 查询时间降低60%!Apache Hudi数据布局黑科技了解下

        Hudi通过其写入客户端API提供了不同的 *** 作,如insert/upsert/bulk_insert来将数据写入Hudi表。为了能够在文件大小和摄取速度之间进行权衡,Hudi提供了一个 hoodie.parquet.small.file.limit  配置来设置最小文件大小。用户可以将该配置设置为0以强制新数据写入新的文件组,或设置为更高的值以确保新数据被"填充"到现有小的文件组中,直到达到指定大小为止,但其会增加摄取延迟。

Clustering服务可以异步或同步运行,Clustering会添加了一种新的REPLACE *** 作类型,该 *** 作类型将在Hudi元数据时间轴中标记Clustering *** 作。

总体而言Clustering分为两个部分:

•调度Clustering:使用可插拔的Clustering策略创建Clustering计划。

•执行Clustering:使用执行策略处理计划以创建新文件并替换旧文件。

调度Clustering

识别符合Clustering条件的文件:根据所选的Clustering策略,调度逻辑将识别符合Clustering条件的文件。根据特定条件对符合Clustering条件的文件进行分组。每个组的数据大小应为 targetFileSize的倍数。分组是计划中定义的"策略"的一部分。此外还有一个选项可以限制组大小,以改善并行性并避免混排大量数据。最后将Clustering计划以avro元数据格式保存到时间线。

运行Clustering

读取Clustering计划,并获得 clusteringGroups,其标记了需要进行Clustering的文件组。•对于每个组使用strategyParams实例化适当的策略类(例如:sortColumns),然后应用该策略重写数据。•创建一个REPLACE提交,并更新HoodieReplaceCommitmetadata中的元数据。Clustering服务基于Hudi的MVCC设计,允许继续插入新数据,而Clustering *** 作在后台运行以重新格式化数据布局,从而确保并发读写者之间的快照隔离。注意:现在对表进行Clustering时还不支持更新,将来会支持并发更新。

Clustering配置  使用Spark可以轻松设置内联Clustering

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val df =  //generate data frame
df.write.format("org.apache.hudi").
        options(getQuickstartWriteConfigs).
        option(PRECOMBINE_FIELD_OPT_KEY, "ts").
        option(RECORDKEY_FIELD_OPT_KEY, "uuid").
        option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
        option(TABLE_NAME, "tableName").
        option("hoodie.parquet.small.file.limit", "0").
        option("hoodie.clustering.inline", "true").
        option("hoodie.clustering.inline.max.commits", "4").
        option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
        option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
        option("hoodie.clustering.plan.strategy.sort.columns", "column1,column2"). //optional, if sorting is needed as part of rewriting data
        mode(Append).
        save("dfs://location");

使用Clustering,我们可以通过以下方式提高查询性能:

•利用空间填充曲线之类的概念来适应数据湖布局并减少查询读取的数据量。

•将小文件合并成较大的文件以减少查询引擎需要扫描的文件总数。

5. Hudi代码源码 5.1 CopyonWrite 模式 *** 作(默认模式)

source :Apache Hudi入门指南(含代码示例)

5.2  Hudi 源码

source  : Spark Guide | Apache Hudi!

// Insert data 插入数据
// spark-shell

val inserts = convertToStringList(dataGen.generateInserts(10))
// 通过自带的类生成10个随机数,然后转成集合
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
// 读集合, 然后把读出来的东西通过json的形式插入(写入)到对应的路径中去
df.write.format("hudi").   // format("hudi")是输出格式
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").    //时间戳
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").   //主键
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").   //分区
  option(TABLE_NAME, tableName).             //表名
  mode(Overwrite).                           // *** 作模式
  save(basePath)                             //输出路径
5.3.1  删除hudi中的数据

source : Hudi 实践 | Apache Hudi 删除数据的多种姿势-技术圈

6. 使用Spark *** 作hudi

以下是整合spark结构化流+hudi的示意代码,由于Hudi OutputFormat目前只支持在spark rdd对象中调用,因此写入HDFS *** 作采用了spark structured streaming的forEachBatch算子。具体说明见注释。【task :代码现在运行正确, 但需要就需要逐一解析】

source : 实战|使用Spark结构化流写入Hudi - 知乎

这个没完全弄完哦source : Hudi 系列(四)- 使用 Spark *** 作 Hudi - 寂寞黄沙一杯酒

import java.time.LocalDateTime

import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY, TABLE_TYPE_OPT_KEY}
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
import org.apache.log4j.Logger
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Dataframe, Row, SaveMode}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

object SparkHudi {
  val logger = Logger.getLogger(SparkHudi.getClass)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder
      .appName("SparkHudi")
      .master("local[*]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.default.parallelism", 9)
      .config("spark.sql.shuffle.partitions", 9)
      .enableHiveSupport()
      .getOrCreate()

    // 添加监听器,每一批次处理完成,将该批次的相关信息,如起始offset,抓取记录数量,处理时间打印到控制台
    spark.streams.addListener(new StreamingQueryListener() {
      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
      }
      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
      }
      override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
      }
    })

    // 定义kafka流
    val dataStreamReader = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "172.16.2.120:9092")
      .option("subscribe", "kk")
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger", 100000)
      .option("failOnDataLoss", false)
      .load()

    // 加载流数据,这里因为只是测试使用,直接读取kafka消息而不做其他处理,是spark结构化流会自动生成每一套消息对应的kafka元数据,如消息所在主题,分区,消息对应offset等。
    val df = dataStreamReader.selectExpr(
        "topic as kafka_topic",
    "CAST(partition AS STRING) kafka_partition",
    "cast(timestamp as String) kafka_timestamp",
    "CAST(offset AS STRING) kafka_offset",
    "CAST(key AS STRING) kafka_key",
    "CAST(value AS STRING) kafka_value",
    "current_timestamp() current_time",
    )
    .selectExpr(
      "kafka_topic",
    "concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",    // concat 拼接。 用”-“将kafka_partition和kafka_offset拼接完,然后改名为kafka_partition_offset
    "kafka_offset",
    "kafka_timestamp",
    "kafka_key",
    "kafka_value",
    "substr(current_time,1,10) partition_date")     // substr 截取。 截取当前时间第一到第十位的东西。


    // 创建并启动query
    val query = df.writeStream.queryName("kk").foreachBatch{(batchDF: Dataframe, _: Long) => {        // 这个queryName 不知道显示在哪里了,始终没显示出来。
      batchDF.persist()
      println(LocalDateTime.now() + "start writing cow table")
      batchDF.write.format("org.apache.hudi")
        .option(TABLE_TYPE_OPT_KEY,"COPY_ON_WRITE")
        .option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp")
        .option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset") // 以kafka分区和偏移量作为组合主键
        .option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date")  // 以当前日期作为分区
        .option("hoodie.table.name", "table")
        .option(HIVE_STYLE_PARTITIONING_OPT_KEY, true)
        .mode(SaveMode.Append)
        .save("/tmp/sparkHudi/COPY_ON_WRITE")

      println(LocalDateTime.now() + "start writing mor table")
      batchDF.write.format("org.apache.hudi")
        .option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ")
        .option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
        .option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp")
        .option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset")
        .option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date")
        .option("hoodie.table.name", "merge_on_read_table")
        .option(HIVE_STYLE_PARTITIONING_OPT_KEY, true)
        .mode(SaveMode.Append)
        .save("/tmp/sparkHudi/MERGE_ON_READ")
      batchDF.unpersist()        // 这句话不能放在最后,会报foreachBatch的错。在Question Accumulate中有记录
      println(LocalDateTime.now() + "finish")
     
    }
    }
      .option("checkpointLocation", "/tmp/sparkHudi/checkpoint/")
      .start()
    query.awaitTermination()
  }
}

source :Hudi 系列(四)- 使用 Spark *** 作 Hudi - 寂寞黄沙一杯酒

7. 问题整理 1. Merge on Read问题

merge on read 要配置option(DataSourceWriteOptions.TABLETYPEOPTKEY, DataSourceWriteOptions.MORTABLETYPEOPTVAL)才会生效,配置为option(HoodieTableConfig.HOODIETABLETYPEPROPNAME, HoodieTableType.MERGEON_READ.name())将不会生效。

2. spark pom依赖问题

不要引入spark-hive 的依赖里面包含了hive 1.2.1的相关jar包,而hudi 要求的版本是2.x版本。如果一定要使用请排除相关依赖。

3. hive视图同步问题

代码与hive视图同步时resources要加入hive-site.xml 配置文件,不然同步hive metastore 会报错。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5715624.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存