flink异步io应用场景之流表join维表

flink异步io应用场景之流表join维表,第1张

流表是kafka等流式数据

维表可以是一个mysql或者cassandra,redis等存储,甚至是自己定义的一些api。

根据流表join维表的字段去异步查询维表。

流表:kafka id1,id2,id3三列

维表:mysql id,age,name

sql:select id1,id2,id3,age,name from kafka join mysql on id1=id

join的结果就是: id1,id2,id3,age,name 流表的字段加上mysql维表的字段。

流表这边提供id1,给到维表,维表那边执行的sql是select * from mysql where id=id1

参考袋鼠云开源的flinkStreamSQL:

核心是

public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row>

就是从维表中查询数据,目前袋鼠云支持的几种维表有

流表来源只有kafka,太少,我们可以扩展一下读取mysql作为流。参考这里 https://www.jianshu.com/p/5faa7f822d89

Doris官网定义 mysql原始表结构 1.doris中关联mysql外表 结果如下: 2.doris中关联kafka导入数据 查看作业 State为RUNNING,表示已经成功。 停止作业 3.通过flink导入mysql数据到doris 方法1:通过mysql-cdc写入kafka,kafka关联doris表。 方法2:通过阿里云DTS->datahub,然后通过Flink写入kafka,再关联到doris外表 如何处理delete数据?对于方法1,需要手动的删除doris中的数据;对于方法2,可以通过dts_operation_flag字段来标示,dts_operation_flag可以为I/U/D,分别表示添加、更新和删除。那我们就只需要在doris表中添加一个dts_operation_flag字段来标示就可以了,查询数据的时候就不再查询等于D的值。 如何处理脏数据?delete doris中的数据,然后insert正确的值;还有个方法是将关联一个外表(这个是正确的值),然后再将doris中的表和外表中的值diff,将diff的值insert到doris中。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/8463793.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-16
下一篇 2023-04-16

发表评论

登录后才能评论

评论列表(0条)

保存