记一次自定义实现Flink-sql-connector-xxx过程

记一次自定义实现Flink-sql-connector-xxx过程,第1张

需求:实现FlinkSQL sink到ArangoDB图数据库

分析:自定义Flink Table & SQL connector 支持flink-connector-arangodb,只需要实现sink部分

官网支持user-defined sources&sinks,对Table SQL的source/sink定义提供了解释

Metadata:对表的声明,封装为Catalog,定义外部存储系统的元数据

Planning:Factory实例由Java SPI机制创建,将外部表元数据配置封装为参数化实例(DynamicTableSource/Sink)

Runtime:读取/写入核心逻辑,实现InputFormat/OutputFormat或SourceFunction/SinkFunction接口,构建与外部存储系统的连接和实现读取和写入逻辑。

需要我们扩展的地方:

Dynamic Table Factory

自定义工场类实现org.apache.flink.table.factories.DynamicTableSinkFactory(我这里仅要支持sink,如果要支持source,需要实现org.apache.flink.table.factories.DynamicTableSourceFactory)

DDL语句中的‘connector’配置项作为标识符用来发现对应的工厂类实例

Factory工场类是由Java SPI来实例化的,我们需要在自定义connector模块的resource下添加文件

META-INF/services/org.apache.flink.table.factories.Factory

文件中指定工厂类的全路径

Dynamic Table Sink

 Factory工厂类主要构建DynamicTableSource/DynamicTableSink,这是个参数化实例对象,定义connector配置参数。在DynamicTableSource/Sink中数据的传递要使用Flink内部数据结构org.apache.flink.table.data.RowData,这里获取到数据需要对value做一下转换,value数据提取封装为RowData,RowData接口的实现类也比较多,可根据情况选择合适的实现类。

Sink接口实现:

有三个接口的实现会影响DML语句的执行

接口描述
SupportsOverwrite实现此接口可以使用INSERT OVERWRITE语句覆盖现有的表或分区数据
SupportsPartitioning允许写入分区数据
SupportsWritingMetadata保存持久化DDL中定义的列和类型

Runtime Provider

这里官网并没有对实际读取写入的Runtime实现作详细解释。说一下个人的理解

Dynamic Table Source/Sink 提供了获取RuntimeProvider实例函数getSinkRuntimeProvider(Context context),这个函数需要我们自定义逻辑去声明InputFormat/OutputFormat或者source/sinkFunction实例化对象

两种方式运行Provider

1.OutputFormatProvider.of(InputFormat/OutputFormat)
2.SinkFunctionProvider.of(source/sinkFunction)

使用lambda表达式,执行return () -> xxxFormat/xxxFunction;构建静态provider

关于InputFormat/OutputFormat或者source/sinkFunction

关键方法

1.xxxxFunction

// 建立连接
open();
// 执行读取/写入逻辑
invoke(T value,Context context);
// 关闭连接
close();

2.xxxxFormat

// 建立连接
open(int taskNumber, int numTasks);
// 执行读取/写入逻辑
writeRecord(In record)
// 关闭连接
close();

Encoding / Decoding Formats(待完善)

阅读flink-connector模块,对比几个connector的源码,分析后得出简单的connector主体架构:

// 1.工厂类构造source和sink,java SPI创建实例
ArangoDBDynamicTableFactory imp DynamicTableSinkFactory (如果支持source,需imp DynamicTableSourceFactory)
    - createDynamicTableSink // 创建连接器的参数化实例 -> 封装connector的参数
    - optionalOptions
    - requiredOptions
    - factoryIdentifier

// 2.arangodbsink引出sinkfunction
ArangodbDynamicTableSink

// 3.sinkFunction/outputFormat建立连接执行写入
        - open()   // 建立连接
        - invoke() // arangodb API
                - writeRecord
                - ArangoCollection.insertDocuments(values)
                - ArangoCollection.updateDocument(key,value)
       - close()  // 关闭连接

// 4.掺杂着其他的辅助类
    - convert 
        rowData -> document
        serialize/deserialize

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/720162.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存