基于 Flink + Hive 构建流批一体准实时数仓

基于 Flink + Hive 构建流批一体准实时数仓,第1张

基于 Flink + Hive 构建流批一体准实时数仓 一、背景

        基于 Hive 的离线数仓往往是企业大数据生产系统中不可缺少的一环。Hive 数仓有很高的成熟度和稳定性,但由于它是离线的,延时很大。在一些对延时要求比较高的场景,需要另外搭建基于 Flink 的实时数仓,将链路延时降低到秒级。但是一套离线数仓加一套实时数仓的架构会带来超过两倍的资源消耗,甚至导致重复开发。 

        想要搭建流式链路就必须得抛弃现有的 Hive 数仓吗?并不是,借助 Flink 可以实现已有的 Hive 离线数仓准实时化。        

  1. 离线数仓实时化的难点 

  2. Flink 在流批一体的探索 

  3. 构建流批一体准实时数仓应用实践

离线数仓实时化的难点

离线数仓

        上图是一个典型的离线数仓,假设现在公司有一个需求,目前公司的数据量很大,需要每天出一个报表且输出到业务数据库中。首先是刚入库的业务数据,大致分为两种,一种是 MySQL 的 binlog,另外一种是业务系统中的业务打点,这个日志打点信息可以通过 Flume 等工具去采集,再离线入库到数仓中。然后随着业务越来越多,业务中的各个表可以做一些抽象,抽象的好处是更好的管理和更高效的数据复用和计算复用。所以数仓就分成了多层 (明细层、中间层、服务层等等),每一层存的是数据表,数据表之间通过 HiveSQL 的计算来实现 ETL 转换。

不止是 HiveSQL ,Hive 只是静态的批计算,而业务每天都要出报表,这意味着每天都要进行计算,这种情况下会依赖于调度工具和血缘管理:

  • 调度工具:按照某个策略把批计算调度起来。

  • 血缘管理:一个任务是由许多个作业组合而成,可能有非常复杂的表结构层次,整个计算是一个非常复杂的拓扑,作业间的依赖关系非常复杂 (减少冗余存储和计算,也可以有较好的容错),只有当一级结束后才能进行下一级的计算。

当任务十分庞大的时候,我们得出结果往往需要很长的一段时间,也就是我们常说的 T+1,H+1 ,这就是离线数仓的问题。

第三方工具

上面说过,离线数仓不仅仅是简单的 Hive 计算,它还依赖了其它的第三方工具,比如:

  • 使用 Flume 来入库,但存在一定的问题,首先,它的容错可能无法保证 Exactly-once 效果,需要下游再次进行去重 *** 作。其次,自定义逻辑需要通过一些手段,比如脚本来控制。第三,离线数仓并不具备良好的扩展能力,当数据剧增时,增加原本的并发数就比较困难了。

  • 基于调度工具的作业调度会带来级联的计算延迟,比如凌晨 1 点开始计算昨天的数据,可能需要到早上 6、7 点才能做完,并且无法保证在设置的调度时间内数据可以完全 ready 。此外,级联的计算还会带来复杂的血缘管理问题,大任务的 Batch 计算可能会突然打满集群的资源,所以也要求我们对于负载管理进行考量,这些都会给业务增加负担。

无论是离线数仓还是第三方工具,其实主要的问题还是“慢”,如何解决慢的问题,此时就该实时数仓出场了。

实时数仓

实时数仓其实是从 Hive+HDFS 的组合换成了 Kafka,ETL 的功能通过 Flink 的流式处理解决。此时就不存在调度和血缘管理的问题了,通过实时不断的增量更新,最终输出到业务的 DB 中。

虽然延时降低了,但此时我们会面临另外一些问题:

  • 历史数据丢失,因为 Kafka 只是临时的存储介质,数据会有一个超时的时间 (比如只保存 7 天的数据),这会导致我们的历史数据丢失。

  • 成本相对较高,实时计算的成本要大于离线计算。

Lambda 架构

        所以此时很多人就会选择一套实时一套离线的做法,互不干扰,根据任务是否需要走实时的需求来对需求进行分离。

        这套架构看似解决了所有问题,但实际带来的问题也是非常多。首先,Lambda 架构造成了离线和实时的割裂问题,它们解决的业务问题都是一样的,但是两套方案让同样的数据源产生了不同的计算结果。不同层级的表结构可能不一致,并且当数据产生不一致的问题时,还需要去进行比对排查。随着这套 Lambda 架构越走越远,开发团队、表结构表依赖、计算模型等都可能会被割裂开,越到后面越会发现,成本越来越高,而统一的代价越来越大。

        那么问题来了,实时数仓会耗费如此大的资源,且还不能保留历史数据,Lambda 架构存在如此多的问题,有什么方案可以解决呢?

数据湖

         数据湖拥有不少的优点,原子性可以让我们做到准实时的批流一体,并且支持已有数据的修改 *** 作。但是毕竟数据湖是新一代数仓存储架构,各方面都还不是很完美,目前已有的数据湖都强依赖于 Spark(当然 Flink 也正在拥抱数据湖),将数据迁移到数据湖需要团队对迁移成本和人员学习成本进行考量。如果没有这么大的决心迁移数据湖,那有没有一个稍微缓和一些的方案加速已有的离线数仓呢?

二、Flink 在批流一体上的探索

 统一元数据

        Flink 一直持续致力于离线和实时的统一,首先是统一元数据。简单来说就是把 Kafka 表的元数据信息存储到 HivemetaStore 中,做到离线和实时的表 meta 的统一。(目前开源的实时计算并没有一个较为完善的持久化 metaStore,Hive metaStore 不仅能保存离线表,也可以承担实时计算的 metaStore 能力)。

统一计算引擎

 同样的元数据之后,实时和离线的表结构和层次可以设计成一样,接下来就是可以共用:

  • 同一套 SQL,Flink 自身提供批流一体的 ANSI-SQL 语法,可以大大减小用户 SQL 开发者和运维者的负担,让用户专注于业务逻辑。

  • 同一个引擎,Flink 的流和批复用一套优化和 Runtime 框架,现阶段的大数据引擎还远远达不到完全稳定的情况,所以仍然有很多时候需要我们去深入的分析和优化,一套引擎可以让开发者专注单个技术栈,避免需要接触多个技术栈,而只有技术广度,没有技术深度。

统一数据

         分析了元数据和计算引擎的统一,更进一步,是否能统一实时和离线的数据,避免数据的不一致,避免数据的重复存储和重复计算。ETL 计算是否能统一呢?既然实时表设计上可以和离线表一模一样,是否可以干脆只有实时表的 ETL 计算,离线表从实时表里获取数据?

并且,通过实时链路可以加速离线链路的数据准备,批计算可以把调度换成流输入。

Flink Hive/File Streaming Sink 即为解决这个问题,实时 Kafka 表可以实时的同步到对于的离线表中:

  • 离线表作为实时的历史数据,填补了实时数仓不存在历史数据的空缺。

  • 数据批量准实时摄入为 Ad-hoc 查询离线表提供了准实时输入。

此时离线的批计算也可以交由实时调度,在实时任务处理中某个契机 (Partition Commit 见后续) 自行调度离线那块的任务进行数据同步 *** 作。

此时实时和离线的表已经基本统一,那么问题来了,Kafka 中的表和 Hive 中的表能否就共用一张表呢?我的想法是之后可能会出现以下情况,在数仓中定义一张表,分别对应着 Kafka 和 Hive+HDFS 两种物理存储:

  • 用户在进行 insert *** 作时,就自然插入到了 Kafka 的实时 table 当中,同时生成另外一条链路,自动同步到 Hive Table 当中。这样这一张表就非常的完整,不仅满足实时的需求,而且拥有历史的数据。

  • 一个 SQL 读取这样的一个 Hybrid Source ,根据你的查询语句后面的 where 条件,自动路由到 Hive 的历史数据,或者是 Kafka 的实时数据。根据一定的规则先读 Hive 历史数据,再读 Kafka 实时数据,当然这里有一个问题,它们之间通过什么标识来切换呢?一个想法是数据中或者 Kafka 的 Timestamp。

Hive Streaming Sink 的实现

Flink 1.11 前已经有了 StreamingFileSink,在 1.11 中不但把它集成到 SQL 中,让这个 Hive Streaming Sink 可以像离线的 Hive SQL 那样,所有的业务逻辑都由 SQL 去处理,而且带来了进一步的增量。

接下来介绍下 Hive/File Streaming Sink,分为两个组件,FileWriter 和 PartitionCommitter:

  • FileWriter 组件可以做到分区感知,通过 checkpoint 机制可以保证 Exactly-once(分布式场景是不可靠的,需要通过两阶段提交 + 文件 Rename 的幂等性),FileWriter 也提供了 Rolling 相关的参数,这个 Rolling 指的是我们的流式处理过程,它可以通过两个参数来控制执行频率,file-size 就是每个数据流的大小,rollover-interval 就是时长间隔。但是需要注意,checkpoint 不宜设置太频繁,以免产生过多的小文件。

  • Partition Committer,通过一系列的业务逻辑处理后得到的 Finished Flies 就直接可用了吗?因为我们典型的 Hive 表都是分区表,当一个分区就绪后,还需要通知下游,Partition 已经处理完成,可以同步到 Hive metastore 中了。我们需要在合适的时机来有效的 trigger 特定的 Partition commit。Partition committer 总的来说,就是完成了 Hive 分区表的数据及元数据的写入,甚至可以完成通知调度系统开始执行之后的 Batch 作业。

因为流式作业是不间断的在运行的,如何设置分区提交的时间,某个分区什么时候提交它呢?

  • 第一种是默认策略 Process time ,也就是我们所说的事件被处理时的当前系统时间,但是缺点也比较明显,可能出现各种各样的数据不完整。

  • 推荐策略就是 partition-time,这种策略可以做到提交时的语义明确且数据完整,partition 字段就是由 event time ,也就是事件产生的时间所得到的。

如果当前时间 Current time > 分区产生的时间 + commitDelay 延时,即是可以开始进行分区提交的时间。一个简单的例子是小时分区,比如当前已经 12 点过 1 分了,已经过了 11 点的分区 + 一个小时,所以我们可以说不会再有 11 点分区的数据过来了,就可以提交 11 点的分区。(要是有 LateEvent 怎么办?所以也要求分区的提交是幂等的。)

接下来介绍分区的提交具体作用,最直接的就是写 SuccessFile 和 Add partition 到 Hive metastore。

Flink 内置支持了 Hive-metaStore 和 SuccessFile,只要配置"sink.partition-commit.policy.kind" 为 "metastore,success-file",即可做到在 commit 分区的时候自动 add 分区到 Hive 中,而且写 SuccessFile,当 add *** 作完成的时候,这个 partition 才真正的对 Hive 可见。

Custom 机制允许自定义一个 Partition Commit Policy 的类,实现这个类可以做到在这个分区的任务处理完成后:比如触发下游的调度、Statistic Analysis、又或者触发 Hive 的小文件合并。(当然触发 Hive 的小文件合并不但需要启动另一个作业,而且做不到一致性保证,后续 Flink 也会有进一步的探索,在 Flink 作业中,主动完成小文件的合并)。

实时消费

不止是准实时的数据摄入,Flink 也带来了维表关联 Hive 表和流实时消费 Hive 表。

我们知道 Flink 是支持维表关联查询 MySQL 和 Hbase 的,在计算中维护一个 LRU 的缓存,未命中查询 MySQL 或 Hbase。但是没有 Lookup 的能力怎么办呢?数据一般是放在离线数仓中的,所以业务上我们一般采用 Hive Table 定期同步到 Hbase 或者 MySQL。Flink 也可以允许直接维表关联 Hive 表,目前的实现很简单,需要在每个并发中全量 Load Hive 表的所有数据,只能针对小表的关联。

传统的 Hive Table 只支持按照批的方式进行读取计算,但是我们现在可以使用流的方式来监控 Hive 里面的分区 / 文件生成,也就是每一条数据过来,都可以实时的进行消费计算,它也是完全复用 Flink Streaming SQL 的方式,可以和 Hbase、MySQL、Hive Table 进行 Join *** 作,最后再通过 FileWriter 实时写入到 Hive Table 中。

构建流批一体准实时数仓应用实践

案例如下:通过 Flume 采集日志打点 Logs,计算各年龄层的 PV,此时我们存在两条链路:

  • 一条是实时链路,通过输入访问日志,关联 Hive 的 User 表来计算出所需要的结果到业务 DB 中。

  • 而另一条则是离线链路,我们需要 Hive 提供小时分区表,来实现对历史数据的 Ad-hoc 查询。

这里就是我们刚刚提到的,虽然是对应两个 database:realtime_db 和 offline_db,但是它们共用一份元数据。

对于 Hive 表我们可以通过 Flink SQL 提供的 Hive dialect 语法,然后通过 Hive 的 DDL 语法来在 Flink 中创建 Hive 表,这里设置 PARTITION BY 天和小时,是与实时链路的不同之处,因为实时链路是没有分区概念的。

如何在表结构里避免分区引起的 Schema 差异?一个可以解决的方案是考虑引入 Hidden Partition 的定义,Partition 的字段可以是某个字段的 Computed Column,这也可以与实际常见的情况做对比,如天或小时是由时间字段计算出的,之后是下面的三个参数:

  • sink.partition-commit.trigger,指定什么时候进行 partition 的 commit,这里设置了 partition-time,用于保证 exactly-once;

  • partition.time-extractor.timestamp-pattern,怎样从 partition 中提取时间,相当于设置了一个提取格式;

  • sink.partition-commit.policy.kind,既 partition commit 所要进行的 *** 作,也就是刚刚提到的 metastore,success-file。

之后设置回默认的 Flink dialect,创建 Kafka 的实时表,通过 insert into 将 Kafka 中的数据同步到 Hive 之中。

这部分是关于 Kafka 中的表如何通过 Dim join 的方式,拿到 User 表的年龄字段。图中需要关心的是 lookup.join.cache.ttl 这个参数,我们会将 user 这张表用类似于 broadcast 的方式,广播到每一个 task 中,但是这个过程中可能出现 Hive 中的 table 存在更新 *** 作,这里的 1h 就说明,数据有效期仅为 1 小时。创建 view 的目的是将 Dim join 所需要的 process time 加上(Dim Join 需要定义 Process time 是个不太自然的过程,后续也在考虑如何在不破坏 SQL 语义的同时,简化 DimJoin 的语法。)

通过实时 Pipeline 的手段消费 Hive Table,而不是通过调度或者以往手动触发的 batch 作业,第一个参数 streaming-source.enable,打开流处理机制,然后使用 start-offset 参数指定从哪个分区 / 文件开始消费。此时,整个流批一体准实时数仓应用基本算是完成啦。

未来规划

Hive 作为分区级别管理的 Table Format 在一些方便有比较大的限制,如果是新型的 Table Format 比如 Iceberg 会有更好的支持,未来 Flink 会在下面几个方面加强:

  • Flink Hive/File Streaming Sink 的 Auto Compaction(Merging) 能力,小文件是实时的最大阻碍之一。

  • Flink 拥抱 Iceberg,目前在社区中已经开发完毕 Iceberg Sink,Iceberg Source 正在推进中,可以看见在不远的将来,可以直接将 Iceberg 当做一个消息队列,且,它保存了所有的历史数据,达到真正的流批统一。

  • 增强 Flink Batch 的 Shuffle,目前完全的 Hash Shuffle 带来了很多问题,比如小文件、随机 IO、Buffer 管理带来的 OOM,后续开源 Flink (1.12) 会加强力量引入 SortedShuffle 以及 ShuffleService。

  • Flink Batch BoundedStream 支持,旧的 Dataset API 已经不能满足流批统一的架构,社区 (1.12) 会在 DataStream 上提供 Batch 计算的能力。

三、Flink 在批流一体实战 Flink集成Hive的基本方式

Flink 与 Hive 的集成主要体现在以下两个方面:

  • 持久化元数据

        Flink利用 Hive 的 metaStore 作为持久化的 Catalog,我们可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive metastore 中。例如,我们可以使用HiveCatalog将其 Kafka的数据源表存储在 Hive metastore 中,这样该表的元数据信息会被持久化到Hive的metaStore对应的元数据库中,在后续的 SQL 查询中,我们可以重复使用它们。

  • 利用 Flink 来读写 Hive 的表。

        Flink打通了与Hive的集成,如同使用SparkSQL或者Impala *** 作Hive中的数据一样,我们可以使用Flink直接读写Hive中的表。HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive表。不需要修改现有的 Hive metastore,也不需要更改表的数据位置或分区。

Flink SQL Cli集成Hive

将上面的三个jar包添加至Flink的lib目录下之后,就可以使用Flink *** 作Hive的数据表了。

  • flink-sql-connector-hive-2.3.6
  • flink-connector-hive_2.11-1.12.0.jar
  • hive-exec-2.3.4.jar

以FlinkSQL Cli为例:

配置sql-client-defaults.yaml

该文件时Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下,具体的配置如下,主要是配置catalog:

  *** 作Hive中的表
首先启动FlinkSQL Cli,命令如下:

./bin/sql-client.sh embedded

接下来,我们可以查看注册的catalog

Flink SQL> show catalogs;
default_catalog
myhive

使用注册的myhive catalog

Flink SQL> use catalog myhive;

假设Hive中有一张users表,在Hive中查询该表:

hive (default)> select * from users;
OK
users.id        users.mame
1       jack
2       tom
3       robin
4       haha
5       haha

查看对应的数据库表,我们可以看到Hive中已经存在的表,这样就可以使用FlinkSQL *** 作Hive中的表,比如查询,写入数据。

Flink SQL> show tables;
Flink SQL> select * from users;

向Hive表users中插入一条数据:

Flink SQL> insert into users select 6,'bob';

再次使用Hive客户端去查询该表的数据,会发现写入了一条数据。

SQL DDL 中的 watermark 和计算列语法

Flink提供了几种常用的watermark策略。

  • 1,严格意义上递增的时间戳,发出到目前为止已观察到的最大时间戳的水印。时间戳小于最大时间戳的行不会迟到。

WATERMARK FOR rowtime_column AS rowtime_column

  • 2,递增的时间戳,发出到目前为止已观察到的最大时间戳为负1的水印。时间戳等于或小于最大时间戳的行不会迟到。

WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND.

  • 3,有界时间戳(乱序) 发出水印,它是观察到的最大时间戳减去指定的延迟,例如

WATERMARK FOR rowtime_column AS rowtime_column-INTERVAL'5'SECOND是5秒的延迟水印策略。

WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit.

使用 DDL 创建 Kafka 表、接下来,我们再在FlinkSQL Cli中创建一张kafka的数据源表:

Flink SQL>  CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用户id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品类id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT, -- 用户行为发生的时间戳
    `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior', -- kafka主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格式为json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
    'is_generic' = 'false'
); 

Flink SQL> DESCRIBE user_behavior;

在FlinkSQL Cli中创建一张kafka表,该表默认为普通表,即is_generic=true,说明该表是一张普通表,如果在Hive中去查看该表,则会报错。

上面创建的表是普通表,该表不能使用Hive去查询。那么,该如何创建一张Hive兼容表呢?我们只需要在建表的属性中显示指定is_generic=false即可,具体如下:

        在Flink中创建一张表,会把该表的元数据信息持久化到Hive的metastore中,我们可以在Hive的metastore中查看该表的元数据信息 

我们可以在Hive的客户端中执行下面命令查看刚刚在Flink SQLCli中创建的表

hive (default)> desc formatted  user_behavior;
# Detailed Table Information             
Database:               default                  
Owner:                  null                     
CreateTime:             Sun Dec 20 16:04:59 CST 2020     
LastAccessTime:         UNKNOWN                  
Retention:              0                        
Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/user_behavior   
Table Type:             MANAGED_TABLE            
Table Parameters:                
        flink.connector         kafka               
        flink.format            json                
        flink.json.fail-on-missing-field        true                
        flink.json.ignore-parse-errors  false               
        flink.properties.bootstrap.servers      kms-2:9092,kms-3:9092,kms-4:9092
        flink.properties.group.id       group1              
        flink.scan.startup.mode earliest-offset     
        flink.schema.0.data-type        BIGINT              
        flink.schema.0.name     user_id             
        flink.schema.1.data-type        BIGINT              
        flink.schema.1.name     item_id             
        flink.schema.2.data-type        BIGINT              
        flink.schema.2.name     cat_id              
        flink.schema.3.data-type        VARCHAr(2147483647) 
        flink.schema.3.name     action              
        flink.schema.4.data-type        INT                 
        flink.schema.4.name     province            
        flink.schema.5.data-type        BIGINT              
        flink.schema.5.name     ts                  
        flink.schema.6.data-type        TIMESTAMP(3) NOT NULL
        flink.schema.6.expr     PROCTIME()          
        flink.schema.6.name     proctime            
        flink.schema.7.data-type        TIMESTAMP(3)        
        flink.schema.7.expr     TO_TIMESTAMP(FROM_UNIXTIME(`ts`, 'yyyy-MM-dd HH:mm:ss'))
        flink.schema.7.name     eventTime           
        flink.schema.watermark.0.rowtime        eventTime           
        flink.schema.watermark.0.strategy.data-type     TIMESTAMP(3)        
        flink.schema.watermark.0.strategy.expr  `eventTime` - INTERVAL '5' SECOND
        flink.topic             user_behavior       
        is_generic              true                
        transient_lastDdlTime   1608451499          
# Storage Information            
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       
InputFormat:            org.apache.hadoop.mapred.TextInputFormat         
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   
Compressed:             No                       
Num Buckets:            -1                       
Bucket Columns:         []                       
Sort Columns:           []                       
Storage Desc Params:             
        serialization.format    1                   
NOTE:black_flag::在Flink中创建一张表,会把该表的元数据信息持久化到Hive的metastore中,我们可以在Hive的metastore中查看该表的元数据信息

进入Hive的元数据信息库,本文使用的是MySQL。执行下面的命令:

SELECt 
    a.tbl_id, -- 表id
    from_unixtime(create_time) AS create_time, -- 创建时间
    a.db_id, -- 数据库id
    b.name AS db_name, -- 数据库名称
    a.tbl_name -- 表名称
FROM TBLS AS a
LEFT JOIN DBS AS b ON a.db_id =b.db_id
WHERe a.tbl_name = "user_behavior";


使用代码连接到 Hive

代码、代码中使用Hive Catalog

public class HiveIntegrationDemo {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);
 
        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";
        
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);
        // 使用注册的catalog
        tableEnv.useCatalog("myhive");
        // 向Hive表中写入一条数据 
        String insertSQL = "insert into users select 10,'lihua'";
 
        TableResult result2 = tableEnv.executeSql(insertSQL);
        System.out.println(result2.getJobClient().get().getJobStatus());
    }
}

提交程序,观察Hive表的变化:

bin/flink run -m kms-1:8081
-c com.flink.sql.hiveintegration.HiveIntegrationDemo
./original-study-flink-sql-1.0-SNAPSHOT.jar

在SQL Cli中使用Hive dialect

使用hive dialect只需要配置一个参数即可,该参数名称为:table.sql-dialect。我们就可以在sql-client-defaults.yaml配置文件中进行配置,也可以在具体的会话窗口中进行设定,对于SQL dialect的切换,不需要进行重启session。

execution:
  planner: blink
  type: batch
  result-mode: table
configuration:
  table.sql-dialect: hive

 如果我们需要在SQL Cli中进行切换hive dialect,可以使用如下命令:

Flink SQL> set table.sql-dialect=hive; -- 使用hive dialect
Flink SQL> set table.sql-dialect=default; -- 使用default dialect

一旦切换到了hive dialect,就只能使用Hive的语法建表,如果尝试使用Flink的语法建表,则会报错 

Flink写入Hive表 (批处理写入、流式写入)

        Flink支持以**批处理(Batch)和流处理(Streaming)**的方式写入Hive表。当以批处理的方式写入Hive表时,只有当写入作业结束时,才可以看到写入的数据。批处理的方式写入支持append模式和overwrite模式。

批处理模式写入

  • 向非分区表写入数据

Flink SQL> use catalog myhive; -- 使用catalog
Flink SQL> INSERT INTO users SELECT 2,'tom';
Flink SQL> set execution.type=batch; -- 使用批处理模式
Flink SQL> INSERT OVERWRITE users SELECT 2,'tom';
  • 向分区表写入数据

-- 向静态分区表写入数据
Flink SQL> INSERT OVERWRITE INTO myparttable PARTITION (my_type='type_1', my_date='2019-08-08')
            SELECT 'Tom', 25;
-- 向动态分区表写入数据
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
 

流处理模式写入

流式写入Hive表,不支持**Insert overwrite **方式,否则报如下错误:

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.

下面的示例是将kafka的数据流式写入Hive的分区表 

-- 使用流处理模式
Flink SQL> set execution.type=streaming;
-- 使用Hive方言
Flink SQL> SET table.sql-dialect=hive; 

-- 创建一张Hive分区表,hive创建表

CREATE TABLE user_behavior_hive_tbl (
   `user_id` BIGINT, -- 用户id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品类id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT -- 用户行为发生的时间戳
) PARTITIonED BY (dt STRING,hr STRING,mi STRING) 
  STORED AS parquet  
  BLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='0S',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

关于Hive表的一些属性解释:

partition.time-extractor.timestamp-pattern

  • 默认值:(none)
  • 解释:分区时间抽取器,与 DDL 中的分区字段保持一致,如果是按天分区,则可以是**year-day ,如果是按天时进行分区,则该属性值为:dt $hour:00:00`;

sink.partition-commit.trigger

  • process-time:不需要时间提取器和水位线,当当前时间大于分区创建时间 + sink.partition-commit.delay 中定义的时间,提交分区;
  • partition-time:需要 Source 表中定义 watermark,当 watermark > 提取到的分区时间 +sink.partition-commit.delay 中定义的时间,提交分区;
  • 默认值:process-time
  • 解释:分区触发器类型,可选 process-time 或partition-time。

sink.partition-commit.delay

  • 默认值:0S
  • 解释:分区提交的延时时间,如果是按天分区,则该属性的值为:1d,如果是按小时分区,则该属性值为1h;

sink.partition-commit.policy.kind

  • metastore:添加分区的元数据信息,仅Hive表支持该值配置
  • success-file:在表的存储路径下添加一个_SUCCESS文件
  • 默认值:(none)
  • 解释:提交分区的策略,用于通知下游的应用该分区已经完成了写入,也就是说该分区的数据可以被访问读取。可选的值如下:
  • 可以同时配置上面的两个值,比如metastore,success-file
-- 使用默认SQL方言
Flink SQL> SET table.sql-dialect=default; 
-- 创建一张kafka数据源表
CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用户id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品类id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT, -- 用户行为发生的时间戳
    `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behaviors', -- kafka主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格式为json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);
 

执行流式写入Hive表

-- streaming sql,将数据写入Hive表

INSERT INTO user_behavior_hive_tbl 
SELECt 
    user_id,
    item_id,
    cat_id,
    action,
    province,
    ts,
    FROM_UNIXTIME(ts, 'yyyy-MM-dd'),
    FROM_UNIXTIME(ts, 'HH'),
    FROM_UNIXTIME(ts, 'mm')
FROM user_behavior;

-- batch sql,查询Hive表的分区数据

SELECt * FROM user_behavior_hive_tbl 
WHERe dt='2021-01-04' AND  hr='16' AND mi = '46';

 提示:

1.Flink读取Hive表默认使用的是batch模式,如果要使用流式读取Hive表,需要而外指定一些参数,见下文。

2.只有在完成 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,同时生成_SUCCESS文件,所以,Flink流式写入Hive表需要开启并配置 Checkpoint。对于Flink SQL Client而言,需要在flink-conf.yaml中开启CheckPoint,配置内容为:

state.backend: filesystem execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION execution.checkpointing.interval: 60s execution.checkpointing.mode: EXACTLY_onCE state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints
 

Flink读取Hive表 

        Flink支持以**批处理(Batch)和流处理(Streaming)**的方式读取Hive中的表。批处理的方式与Hive的本身查询类似,即只在提交查询的时刻查询一次Hive表。流处理的方式将会持续地监控Hive表,并且会增量地提取新的数据。默认情况下,Flink是以批处理的方式读取Hive表。

        关于流式读取Hive表,Flink既支持分区表又支持非分区表。对于分区表而言,Flink将会监控新产生的分区数据,并以增量的方式读取这些数据。对于非分区表,Flink会监控Hive表存储路径文件夹里面的新文件,并以增量的方式读取新的数据。

Flink读取Hive表可以配置一下参数:

streaming-source.enable

  • 默认值:false
  • 解释:是否开启流式读取 Hive 表,默认不开启。

streaming-source.partition.include

  • 默认值:all
  • 解释:配置读取Hive的分区,包括两种方式:all和latest。all意味着读取所有分区的数据,latest表示只读取最新的分区数据。值得注意的是,latest方式只能用于开启了流式读取Hive表,并用于维表JOIN的场景。

streaming-source.monitor-interval

  • 默认值:None
  • 解释:持续监控Hive表分区或者文件的时间间隔。值得注意的是,当以流的方式读取Hive表时,该参数的默认值是1m,即1分钟。当temporal join时,默认的值是60m,即1小时。另外,该参数配置不宜过短 ,最短是1 个小时,因为目前的实现是每个 task 都会查询 metastore,高频的查可能会对metastore 产生过大的压力。

streaming-source.partition-order

  • 默认值:partition-name
  • 解释:streaming source的分区顺序。默认的是partition-name,表示使用默认分区名称顺序加载最新分区,也是推荐使用的方式。除此之外还有两种方式,分别为:create-time和partition-time。其中create-time表示使用分区文件创建时间顺序。partition-time表示使用分区时间顺序。指的注意的是,对于非分区表,该参数的默认值为:create-time。

streaming-source.consume-start-offset

  • 默认值:None
  • 解释:流式读取Hive表的起始偏移量。

partition.time-extractor.kind

  • 默认值:default
  • 分区时间提取器类型。用于从分区中提取时间,支持default和自定义。如果使用default,则需要通过参数partition.time-extractor.timestamp-pattern配置时间戳提取的正则表达式。

在 SQL Client 中需要显示地开启 SQL Hint 功能

Flink SQL> set table.dynamic-table-options.enabled= true;  

使用SQLHint流式查询Hive表

SELECt * FROM user_behavior_hive_tbl ;

Hive维表JOIN

        Flink 1.12 支持了 Hive 最新的分区作为时态表的功能,可以通过 SQL 的方式直接关联 Hive 分区表的最新分区,并且会自动监听最新的 Hive 分区,当监控到新的分区后,会自动地做维表数据的全量替换。

        Flink支持的是processing-time的temporal join,也就是说总是与最新版本的时态表进行JOIN。另外,Flink既支持非分区表的temporal join,又支持分区表的temporal join。对于分区表而言,Flink会监听Hive表的最新分区数据。值得注意的是,Flink尚不支持 event-time temporal join。

Temporal Join最新分区

        对于一张随着时间变化的Hive分区表,Flink可以读取该表的数据作为一个无界流。如果Hive分区表的每个分区都包含全量的数据,那么每个分区将做为一个时态表的版本数据,即将最新的分区数据作为一个全量维表数据。值得注意的是,该功能特点仅支持Flink的STREAMING模式。

        除此之外还有一些其他的参数,关于参数的解释见上面的分析。我们在使用Hive维表的时候,既可以在创建Hive表时指定具体的参数,也可以使用SQL Hint的方式动态指定参数。一个Hive维表的创建模板如下: 

-- 使用Hive的sql方言
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...
) PARTITIonED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
  -- 方式1:按照分区名排序来识别最新分区(推荐使用该种方式)
  'streaming-source.enable' = 'true', -- 开启Streaming source
  'streaming-source.partition.include' = 'latest',-- 选择最新分区
  'streaming-source.monitor-interval' = '12 h',-- 每12小时加载一次最新分区数据
  'streaming-source.partition-order' = 'partition-name',  -- 按照分区名排序
 
  -- 方式2:分区文件的创建时间排序来识别最新分区
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.partition-order' = 'create-time',-- 分区文件的创建时间排序
  'streaming-source.monitor-interval' = '12 h'
 
  -- 方式3:按照分区时间排序来识别最新分区
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '12 h',
  'streaming-source.partition-order' = 'partition-time', -- 按照分区时间排序
  'partition.time-extractor.kind' = 'default',
  'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' 
);
 

有了上面的Hive维表,我们就可以使用该维表与Kafka的实时流数据进行JOIN,得到相应的宽表数据。

-- 使用default sql方言
SET table.sql-dialect=default;

-- kafka实时流数据表
CREATE TABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME()
) WITH (...);
 
-- 将流表与hive最新分区数据关联 
SELECT *
FROM orders_table AS orders
JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim 
ON orders.product_id = dim.product_id;

除了在定义Hive维表时指定相关的参数,我们还可以通过SQL Hint的方式动态指定相关的参数,具体方式如下: 

SELECt *
FROM orders_table AS orders
JOIN dimension_table

FOR SYSTEM_TIME AS OF orders.proctime AS dim -- 时态表(维表)
ON orders.product_id = dim.product_id;

Temporal Join最新表

        对于Hive的非分区表,当使用temporal join时,整个Hive表会被缓存到Slot内存中,然后根据流中的数据对应的key与其进行匹配。使用最新的Hive表进行temporal join不需要进行额外的配置,我们只需要配置一个Hive表缓存的TTL时间,该时间的作用是:当缓存过期时,就会重新扫描Hive表并加载最新的数据。

lookup.join.cache.ttl

  • 默认值:60min
  • 解释:表示缓存时间。由于 Hive 维表会把维表所有数据缓存在 TM 的内存中,当维表数据量很大时,很容易造成 OOM。当然TTL的时间也不能太短,因为会频繁地加载数据,从而影响性能。

当使用此种方式时,Hive表必须是有界的lookup表,即非Streaming Source的时态表,换句话说,该表的属性streaming-source.enable = false。

如果要使用Streaming Source的时态表,记得配置streaming-source.monitor-interval的值,即数据更新的时间间隔。

-- Hive维表数据使用批处理的方式按天装载
SET table.sql-dialect=hive;

CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...
) TBLPROPERTIES (
  'streaming-source.enable' = 'false', -- 关闭streaming source
  'streaming-source.partition.include' = 'all',  -- 读取所有数据
  'lookup.join.cache.ttl' = '12 h'
);

-- kafka事实表
SET table.sql-dialect=default;

CREATE TABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME()
) WITH (...);
 
-- Hive维表join,Flink会加载该维表的所有数据到内存中
SELECT *
FROM orders_table AS orders
JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim
ON orders.product_id = dim.product_id;

1.每一个子任务都需要缓存一份维表的全量数据,一定要确保TM的task Slot 大小能够容纳维表的数据量;

2.推荐将streaming-source.monitor-interval和lookup.join.cache.ttl的值设为一个较大的数,因为频繁的更新和加载数据会影响性能。

3.当缓存的维表数据需要重新刷新时,目前的做法是将整个表进行加载,因此不能够将新数据与旧数据区分开来。

Hive维表JOIN示例(kafka事实表 join hive维表)

        假设维表的数据是通过批处理的方式(比如每天)装载至Hive中,而Kafka中的事实流数据需要与该维表进行JOIN,从而构建一个宽表数据,这个时候就可以使用Hive的维表JOIN。

  • 创建一张kafka数据源表,实时流、

SET table.sql-dialect=default;

CREATE TABLE fact_user_behavior ( 
    `user_id` BIGINT, -- 用户id
    `item_id` BIGINT, -- 商品id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT, -- 用户行为发生的时间戳
    `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behaviors', -- kafka主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格式为json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);
  • 创建一张Hive维表

SET table.sql-dialect=hive;

CREATE TABLE dim_item (
  item_id BIGINT,
  item_name STRING,
  unit_price DECIMAL(10, 4)
) PARTITIonED BY (dt STRING) TBLPROPERTIES (
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '12 h',
  'streaming-source.partition-order' = 'partition-name'
);
  • 关联Hive维表的最新数据

SELECt 
    fact.item_id,
    dim.item_name,
    count(*) AS buy_cnt
FROM fact_user_behavior AS fact
LEFT JOIN dim_item FOR SYSTEM_TIME AS OF fact.proctime AS dim
ON fact.item_id = dim.item_id
WHERe fact.action = 'buy'
GROUP BY fact.item_id,dim.item_name;

使用SQL Hint方式,关联非分区的Hive维表: 

set table.dynamic-table-options.enabled= true; 
SELECt 
    fact.item_id,
    dim.item_name,
    count(*) AS buy_cnt
FROM fact_user_behavior AS fact
LEFT JOIN dim_item1

FOR SYSTEM_TIME AS OF fact.proctime AS dim
ON fact.item_id = dim.item_id
WHERe fact.action = 'buy'
GROUP BY fact.item_id,dim.item_name;
Flink upsert-kafka连接器

        Upsert Kafka Connector允许用户以upsert的方式从Kafka主题读取数据或将数据写入Kafka主题。

-- 创建一张kafka表,用户存储sink的数据
CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
  'key.format' = 'avro',
  'value.format' = 'avro'
);

        要使用 upsert-kafka connector,必须在创建表时使用PRIMARY KEY定义主键,并为键(key.format)和值(value.format)指定序列化反序列化格式。 

value.fields-include

  • 可选,默认为ALL。控制key字段是否出现在 value 中。当取ALL时,表示消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。当取EXCEPT_KEY时,表示记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。

key.fields-prefix

  • 可选。为了避免与value字段命名冲突,为key字段添加一个自定义前缀。默认前缀为空。一旦指定了key字段的前缀,必须在DDL中指明前缀的名称,但是在构建key的序列化数据类型时,将移除该前缀。见下面的示例。在需要注意的是:使用该配置属性,value.fields-include的值必须为EXCEPT_KEY。
-- 创建一张upsert表,当指定了qwe前缀,涉及的key必须指定qwe前缀
CREATE TABLE result_total_pvuv_min_prefix (
    qwedo_date     STRING,     -- 统计日期,必须包含qwe前缀
    qwedo_min      STRING,      -- 统计分钟,必须包含qwe前缀
    pv          BIGINT,     -- 点击量
    uv          BIGINT,     -- 一天内同个访客多次访问仅计算一个UV
    currenttime TIMESTAMP,  -- 当前时间
    PRIMARY KEY (qwedo_date, qwedo_min) NOT ENFORCED -- 必须包含qwe前缀
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'result_total_pvuv_min_prefix',
  'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
  'key.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields-prefix'='qwe', -- 指定前缀qwe
  'value.fields-include' = 'EXCEPT_KEY' -- key不出现kafka消息的value中
);
-- 向该表中写入数据
INSERT INTO result_total_pvuv_min_prefix
SELECt
  do_date,    --  时间分区
  cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分钟级别的时间
  pv,
  uv,
  CURRENT_TIMESTAMP AS currenttime -- 当前时间
from
  view_total_pvuv_min;

如果指定了key字段前缀,但在DDL中并没有添加该前缀字符串,那么在向该表写入数时,会抛出下面异常:

[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: All fields in 'key.fields' must be prefixed with 'qwe' when option 'key.fields-prefix' is set but field 'do_date' is not prefixed.

 upsert-kafka 使用案例

本文以实时地统计网页PV和UV的总量为例,介绍upsert-kafka基本使用方式:

  • Kafka 数据源

用户的ippv信息,一个用户在一天内可以有很多次pv

CREATE TABLE source_ods_fact_user_ippv (
    user_id      STRING,       -- 用户ID
    client_ip    STRING,       -- 客户端IP
    client_info  STRING,       -- 设备机型信息
    pagecode     STRING,       -- 页面代码
    access_time  TIMESTAMP,    -- 请求时间
    dt           STRING,       -- 时间分区天
    WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND  -- 定义watermark
) WITH (
   'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_ippv', -- kafka主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格式为json
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
);
  • Kafka Sink表

统计每分钟的PV、UV,并将结果存储在Kafka中

CREATE TABLE result_total_pvuv_min (
    do_date     STRING,     -- 统计日期
    do_min      STRING,      -- 统计分钟
    pv          BIGINT,     -- 点击量
    uv          BIGINT,     -- 一天内同个访客多次访问仅计算一个UV
    currenttime TIMESTAMP,  -- 当前时间
    PRIMARY KEY (do_date, do_min) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'result_total_pvuv_min',
  'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
  'key.json.ignore-parse-errors' = 'true',
  'value.json.fail-on-missing-field' = 'false',
  'key.format' = 'json',
  'value.format' = 'json',
  'value.fields-include' = 'EXCEPT_KEY' -- key不出现kafka消息的value中
);
  • 计算逻辑

-- 将聚合结果写入 Kafka 写入数据
INSERT INTO result_total_pvuv_min
SELECt
     dt AS do_date,                    -- 时间分区
     max(access_time) AS do_min,       -- 请求的时间
     count (client_ip) AS pv,          -- 客户端的IP
     count (DISTINCT client_ip) AS uv, -- 客户端去重
     max(access_time) AS access_time,  -- 请求的时间
     CURRENT_TIMESTAMP AS currenttime  -- 当前时间
FROM
    source_ods_fact_user_ippv
GROUP BY dt;
  • 生产用户访问数据到kafka,向kafka中的user_ippv插入数据:

{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:32:24","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-08 11:32:55","dt":"2021-01-08"}
{"user_id":"2","client_ip":"192.165.12.1","client_info":"pc",   "pagecode":"1031","access_time":"2021-01-08 11:32:59","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1101","access_time":"2021-01-08 11:33:24","dt":"2021-01-08"}
{"user_id":"3","client_ip":"192.168.10.3","client_info":"pc",   "pagecode":"1001","access_time":"2021-01-08 11:33:30","dt":"2021-01-08"}
{"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-08 11:34:24","dt":"2021-01-08"}
  • 查询结果表:

select * from result_total_pvuv_min;

可以看出:每分钟的pv、uv只显示一条数据,即代表着截止到当前时间点的pv和uv

查看Kafka中result_total_pvuv_min主题的数据,如下:

 可以看出:针对每一条访问数据,触发计算了一次PV、UV,每一条数据都是截止到当前时间的累计PV和UV。

默认情况下,如果在启用了检查点的情况下执行查询,Upsert Kafka接收器会将具有至少一次保证的数据提取到Kafka主题中。

这意味着,Flink可能会将具有相同键的重复记录写入Kafka主题。但是,由于连接器在upsert模式下工作,因此作为源读回时,同一键上的最后一条记录将生效。因此,upsert-kafka连接器就像Hbase接收器一样实现幂等写入。

  upsert-kafka 使用案例二

在 Mysql 中执行以下命令:

CREATE DATAbase flink;
USE flink;

CREATE TABLE users (
  user_id BIGINT,
  user_name VARCHAr(1000),
  region VARCHAr(1000)
);

INSERT INTO users VALUES 
(1, 'Timo', 'Berlin'),
(2, 'Tom', 'Beijing'),
(3, 'Apple', 'Beijing');

用Sql client在Flink中创建相应的表。

CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  region STRING
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'database-name' = 'flink',
  'table-name' = 'users',
  'username' = 'root',
  'password' = '123456'
);

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  view_time TIMESTAMP(3),
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = 'localhost:9092',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

并利用Flink 往 Kafka中灌入相应的数据 

INSERT INTO pageviews VALUES
  (1, 101, TO_TIMESTAMP('2020-11-23 15:00:00')),
  (2, 104, TO_TIMESTAMP('2020-11-23 15:00:01.00'));

将 left join 结果写入 Kafka

我们首先测试是否能将Left join的结果灌入到 Kafka 之中。

首先,我们在 Sql client 中创建相应的表

CREATE TABLE enriched_pageviews (
  user_id BIGINT,
  user_region STRING,
  page_id BIGINT,
  view_time TIMESTAMP(3),
  WATERMARK FOR view_time as view_time - INTERVAL '5' SECOND,
  PRIMARY KEY (user_id, page_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'enriched_pageviews',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

并利用以下语句将left join的结果插入到kafka对应的topic之中。

INSERT INTO enriched_pageviews
SELECt pageviews.user_id, region, pageviews.page_id, pageviews.view_time
FROM pageviews
LEFT JOIN users ON pageviews.user_id = users.user_id;

        当作业跑起来后,我们可以另起一个 Terminal 利用命令docker exec -it kafka bash 进入kafka所在的容器之中。 Kafka的安装路径在于/opt/kafka,利用以下命令,我们可以打印topic内的数据

./kafka-console-consumer.sh --bootstrap-server kafka:9094 
--topic "enriched_pageviews" 
--from-beginning --property print.key=true
#预期结果
{"user_id":1,"page_id":101} {"user_id":1,"user_region":null,"page_id":101,"view_time":"2020-11-23 15:00:00"}
{"user_id":2,"page_id":104} {"user_id":2,"user_region":null,"page_id":104,"view_time":"2020-11-23 15:00:01"}
{"user_id":1,"page_id":101} null
{"user_id":1,"page_id":101} {"user_id":1,"user_region":"Berlin","page_id":101,"view_time":"2020-11-23 15:00:00"}
{"user_id":2,"page_id":104} null
{"user_id":2,"page_id":104} {"user_id":2,"user_region":"Beijing","page_id":104,"view_time":"2020-11-23 15:00:01"}

 Left join中,右流发现左流没有join上但已经发射了,此时会发送DELETe消息,而非UPDATE-BEFORE消息清理之前发送的消息。详见org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator#processElement

我们可以进一步在mysql中删除或者修改一些数据,来观察进一步的变化。

UPDATE users SET region = 'Beijing' WHERe user_id = 1;

DELETE FROM users WHERe user_id = 1;

将聚合结果写入 Kafka

我们进一步测试将聚合的结果写入到 Kafka 之中。

在Sql client 中构建以下表

CREATE TABLE pageviews_per_region (
  user_region STRING,
  cnt BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'json',
  'value.format' = 'json'
)

 我们再用以下命令将数据插入到upsert-kafka之中。

INSERT INTO pageviews_per_region
SELECt
  user_region,
  COUNT(*)
FROM enriched_pageviews
WHERe user_region is not null
GROUP BY user_region;

我们可以通过以下命令查看 Kafka 中对应的数据 

./kafka-console-consumer.sh --bootstrap-server kafka:9094 --topic "pageviews_per_region" --from-beginning --property print.key=true

# 预期结果
{"user_region":"Berlin"}    {"user_region":"Berlin","cnt":1}
{"user_region":"Beijing"}   {"user_region":"Beijing","cnt":1}
{"user_region":"Berlin"}    null
{"user_region":"Beijing"}   {"user_region":"Beijing","cnt":2}
{"user_region":"Beijing"}   {"user_region":"Beijing","cnt":1}
Flink CDC的connector、变化数据捕获change data capture (CDC)

简介

        Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC)从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。

特点

  • 支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义
  • 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。
  • 对于Table/SQL API 的CDC connector,用户可以使用SQL DDL创建CDC数据源,来监视单个表上的数据变更。

 使用场景

  • 数据库之间的增量数据同步
  • 审计日志
  • 数据库之上的实时物化视图
  • 基于CDC的维表join

Flink提供的 table format
Flink提供了一系列可以用于table connector的table format,具体如下:


1、mysql-cdc的 *** 作实践

创建MySQL数据源表

在创建MySQL CDC表之前,需要先创建MySQL的数据表,如下:

-- MySQL


DROp TABLE IF EXISTS `order_info`;

CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1表示下单,2表示支付',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `operate_time` datetime DEFAULT NULL COMMENT ' *** 作时间',
  `expire_time` datetime DEFAULT NULL COMMENT '失效时间',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  `province_id` int(20) DEFAULT NULL COMMENT '地区',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';

-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info` 
VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
INSERT INTO `order_info`
VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
INSERT INTO `order_info`
VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
CREATE TABLE `order_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';
 
-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` 
VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
INSERT INTO `order_detail` 
VALUES (1330, 477, 9, '荣耀10 GT游戏加速 AIS手持夜景 6GB+64GB 幻影蓝全网通 移动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
INSERT INTO `order_detail`
VALUES (1331, 478, 4, '小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');

 Flink SQL Cli创建CDC数据源

启动 Flink 集群,再启动 SQL CLI,执行下面命令:

-- 创建订单信息表
CREATE TABLE order_info(
    id BIGINT,
    user_id BIGINT,
    create_time TIMESTAMP(0),
    operate_time TIMESTAMP(0),
    province_id INT,
    order_status STRING,
    total_amount DECIMAL(10, 5)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_info'
);

在Flink SQL Cli中查询该表的数据:result-mode: tableau,+表示数据的insert

 在SQL CLI中创建订单详情表:

CREATE TABLE order_detail(
    id BIGINT,
    order_id BIGINT,
    sku_id BIGINT,
    sku_name STRING,
    sku_num BIGINT,
    order_price DECIMAL(10, 5),
 create_time TIMESTAMP(0)
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_detail'
);

查询结果如下:

 执行JOIN *** 作:

SELECT
    od.id,
    oi.id order_id,
    oi.user_id,
    oi.province_id,
    od.sku_id,
    od.sku_name,
    od.sku_num,
    od.order_price,
    oi.create_time,
    oi.operate_time
FROM
   (
    SELECt * 
    FROM order_info
    WHERe 
      order_status = '2'-- 已支付
   ) oi
   JOIN
  (
    SELECt *
    FROM order_detail
  ) od 
  ON oi.id = od.order_id;
2、canal-json的 *** 作实践

        关于cannal的使用方式,可以参考我的另一篇文章:基于Canal与Flink实现数据实时增量同步(一)。我已经将下面的表通过canal同步到了kafka,具体格式为:

{
    "data": [
        {
            "id": "1",
            "region_name": "华北"
        },
        {
            "id": "2",
            "region_name": "华东"
        },
        {
            "id": "3",
            "region_name": "东北"
        },
        {
            "id": "4",
            "region_name": "华中"
        },
        {
            "id": "5",
            "region_name": "华南"
        },
        {
            "id": "6",
            "region_name": "西南"
        },
        {
            "id": "7",
            "region_name": "西北"
        }
    ],
    "database": "mydw",
    "es": 1597128441000,
    "id": 102,
    "isDdl": false,
    "mysqlType": {
        "id": "varchar(20)",
        "region_name": "varchar(20)"
    },
    "old": null,
    "pkNames": null,
    "sql": "",
    "sqlType": {
        "id": 12,
        "region_name": 12
    },
    "table": "base_region",
    "ts": 1597128441424,
    "type": "INSERT"
}

在SQL CLI中创建该canal-json格式的表:

CREATE TABLE region (
  id BIGINT,
  region_name STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
);

查询结果如下:

3、changelog-json的 *** 作实践

创建MySQL数据源

参见上面的order_info



Flink SQL Cli创建changelog-json表

CREATE TABLE order_gmv2kafka (
  day_str STRING,
  gmv DECIMAL(10, 5)
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_gmv_kafka',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);
 
INSERT INTO order_gmv2kafka
SELECt DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv
FROM order_info
WHERe order_status = '2' -- 订单已支付
GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd'); 

查询表看一下结果:

 再查一下kafka的数据:

{"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}

当将另外两个订单的状态order_status更新为2时,总金额=443+772+88=1293再观察数据: 

 再看kafka中的数据:

 总结

        本文主要介绍了基于Flink构建实时数仓的技术点,并对其使用方式进行了详细描述,通过本文你或许对实时数仓和流批一体的应用会有一个深刻认识,希望本文对你有所帮助。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5652816.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存