Flink SQL 知其所以然(五)| 自定义 protobuf format

Flink SQL 知其所以然(五)| 自定义 protobuf format,第1张

protobuf 作为目前各大公司中最广泛使用的高效的协议数据交换格式工具库,会大量作为流式数据传输的序列化方式,所以在 flink sql 中如果能实现 protobuf 的 format 会非常有用( 目前社区已经有对应的实现,不过目前还没有 merge,预计在 1.14 系列版本中能 release )。

issue 见: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC

pr 见: https://github.com/apache/flink/pull/14376

这一节主要介绍 flink sql 中怎么自定义实现 format ,其中以最常使用的 protobuf 作为案例来介绍。

如果想在本地直接测试下:

关于为什么选择 protobuf 可以看这篇文章,写的很详细:

http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral

在实时计算的领域中,为了可读性会选择 json ,为了效率以及一些已经依赖了 grpc 的公司会选择 protobuf 来做数据序列化,那么自然而然,日志的序列化方式也会选择 protobuf 。

而官方目前已经 release 的版本中是没有提供 flink sql api 的 protobuf format 的。如下图,基于 1.13 版本。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

因此本文在介绍怎样自定义一个 format 的同时,实现一个 protobuf format 来给大家使用。

预期效果是先实现几种最基本的数据类型,包括 protobuf 中的 message (自定义 model)、 map (映射)、 repeated (列表)、其他基本数据类型等,这些都是我们最常使用的类型。

预期 protobuf message 定义如下:

测试数据源数据如下,博主把 protobuf 的数据转换为 json,以方便展示,如下图:

预期 flink sql:

数据源表 DDL:

数据汇表 DDL:

Transform 执行逻辑:

下面是我在本地跑的结果:

可以看到打印的结果,数据是正确的被反序列化读入,并且最终输出到 console。

目前业界可以参考的实现如下: https://github.com/maosuhan/flink-pb , 也就是这位哥们负责目前 flink protobuf 的 format。

这种实现的具体使用方式如下:

其实现有几个特点:

[图片上传失败...(image-66c35b-1644940704671)]

其实上节已经详细描述了 flink sql 对于 source\sink\format 的加载机制。

如图 serde format 是通过 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 创建的

所有通过 SPI 的 source\sink\formt 插件都继承自 Factory 。

整体创建 format 方法的调用链如下图。

最终实现如下,涉及到了几个实现类:

具体流程:

上述实现类的具体关系如下:

介绍完流程,进入具体实现方案细节:

ProtobufFormatFactory 主要创建 format 的逻辑:

resources\META-INF 文件:

ProtobufRowDataDeserializationSchema 主要实现反序列化的逻辑:

可以注意到上述反序列化的主要逻辑就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定义的。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 其实就是一个 convertor 接口:

其作用就是将 protobuf message 中的每一个字段转换成为 RowData 中的每一个字段。

ProtobufToRowDataConverters 中就定义了具体转换逻辑,如截图所示,每一个 LogicalType 都定义了 protobuf message 字段转换为 flink 数据类型的逻辑:

源码后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取。

本文主要是针对 flink sql protobuf format 进行了原理解释以及对应的实现。

如果你正好需要这么一个 format,直接后台回复 flink sql 知其所以然(五)| 自定义 protobuf format 获取源码吧。

当然上述只是 protobuf format 一个基础的实现,用于生产环境还有很多方面可以去扩展的。

序列化和反序列化用来保存内存中的数据,他不是C#中独有的技术,比如win7的休眠就是该技术的应用,

在C#程序中可以用来保存对象,和对象当前状态,下次打开时通过反序列化获得,

一般用在服务器启动(反序列化)和关闭(序列化),保存数据


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/6845656.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-29
下一篇 2023-03-29

发表评论

登录后才能评论

评论列表(0条)

保存