需求:实现FlinkSQL sink到ArangoDB图数据库
分析:自定义Flink Table & SQL connector 支持flink-connector-arangodb,只需要实现sink部分
官网支持user-defined sources&sinks,对Table SQL的source/sink定义提供了解释
Metadata:对表的声明,封装为Catalog,定义外部存储系统的元数据
Planning:Factory实例由Java SPI机制创建,将外部表元数据配置封装为参数化实例(DynamicTableSource/Sink)
Runtime:读取/写入核心逻辑,实现InputFormat/OutputFormat或SourceFunction/SinkFunction接口,构建与外部存储系统的连接和实现读取和写入逻辑。
需要我们扩展的地方:
Dynamic Table Factory
自定义工场类实现org.apache.flink.table.factories.DynamicTableSinkFactory(我这里仅要支持sink,如果要支持source,需要实现org.apache.flink.table.factories.DynamicTableSourceFactory)
DDL语句中的‘connector’配置项作为标识符用来发现对应的工厂类实例
Factory工场类是由Java SPI来实例化的,我们需要在自定义connector模块的resource下添加文件
META-INF/services/org.apache.flink.table.factories.Factory
文件中指定工厂类的全路径
Dynamic Table SinkFactory工厂类主要构建DynamicTableSource/DynamicTableSink,这是个参数化实例对象,定义connector配置参数。在DynamicTableSource/Sink中数据的传递要使用Flink内部数据结构org.apache.flink.table.data.RowData,这里获取到数据需要对value做一下转换,value数据提取封装为RowData,RowData接口的实现类也比较多,可根据情况选择合适的实现类。
Sink接口实现:
有三个接口的实现会影响DML语句的执行
接口 | 描述 |
SupportsOverwrite | 实现此接口可以使用INSERT OVERWRITE语句覆盖现有的表或分区数据 |
SupportsPartitioning | 允许写入分区数据 |
SupportsWritingMetadata | 保存持久化DDL中定义的列和类型 |
Runtime Provider
这里官网并没有对实际读取写入的Runtime实现作详细解释。说一下个人的理解
Dynamic Table Source/Sink 提供了获取RuntimeProvider实例函数getSinkRuntimeProvider(Context context),这个函数需要我们自定义逻辑去声明InputFormat/OutputFormat或者source/sinkFunction实例化对象
两种方式运行Provider
1.OutputFormatProvider.of(InputFormat/OutputFormat)
2.SinkFunctionProvider.of(source/sinkFunction)
使用lambda表达式,执行return () -> xxxFormat/xxxFunction;构建静态provider
关于InputFormat/OutputFormat或者source/sinkFunction
关键方法
1.xxxxFunction
// 建立连接
open();
// 执行读取/写入逻辑
invoke(T value,Context context);
// 关闭连接
close();
2.xxxxFormat
// 建立连接
open(int taskNumber, int numTasks);
// 执行读取/写入逻辑
writeRecord(In record)
// 关闭连接
close();
Encoding / Decoding Formats(待完善)
阅读flink-connector模块,对比几个connector的源码,分析后得出简单的connector主体架构:
// 1.工厂类构造source和sink,java SPI创建实例
ArangoDBDynamicTableFactory imp DynamicTableSinkFactory (如果支持source,需imp DynamicTableSourceFactory)
- createDynamicTableSink // 创建连接器的参数化实例 -> 封装connector的参数
- optionalOptions
- requiredOptions
- factoryIdentifier
// 2.arangodbsink引出sinkfunction
ArangodbDynamicTableSink
// 3.sinkFunction/outputFormat建立连接执行写入
- open() // 建立连接
- invoke() // arangodb API
- writeRecord
- ArangoCollection.insertDocuments(values)
- ArangoCollection.updateDocument(key,value)
- close() // 关闭连接
// 4.掺杂着其他的辅助类
- convert
rowData -> document
serialize/deserialize
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)