如何同步mysql数据到Doris中

如何同步mysql数据到Doris中,第1张

Doris官网定义 mysql原始表结构 1.doris中关联mysql外表 结果如下: 2.doris中关联kafka导入数据 查看作业 State为RUNNING,表示已经成功。 停止作业 3.通过flink导入mysql数据到doris 方法1:通过mysql-cdc写入kafka,kafka关联doris表。 方法2:通过阿里云DTS->datahub,然后通过Flink写入kafka,再关联到doris外表 如何处理delete数据?对于方法1,需要手动的删除doris中的数据;对于方法2,可以通过dts_operation_flag字段来标示,dts_operation_flag可以为I/U/D,分别表示添加、更新和删除。那我们就只需要在doris表中添加一个dts_operation_flag字段来标示就可以了,查询数据的时候就不再查询等于D的值。 如何处理脏数据?delete doris中的数据,然后insert正确的值;还有个方法是将关联一个外表(这个是正确的值),然后再将doris中的表和外表中的值diff,将diff的值insert到doris中。

目前业务上有同步数据到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,

科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波

主要实现getUpsertStatement的方法,本来以为能直接copy一波flinkStreamSql 的实现,结果发现

报错 SQL statement must not contain ? character.

查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改

完整的SqlServerDialect文件

最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了

注意url写法为:jdbc:jtds:sqlserver://xxx:1433databaseName=master

[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java

mysql同步数据到hive大部分公司目前都是走的jdbc的方式。 这种方式有两个好处: 也有不好的地方: 这一步最主要的细节是将mysql库的所有binlog数据全部打入一个kafka topic,格式使用json。格式如下: 这一步的主要的细节在于写入到hdfs的结构,以及为什么不直接写入hive。 不写入到hive表的原因在于,binlog的数据结构是不固定的,而hive的结构相对是比较固定的。如果要写入到hive的话,就需要将不同的表的binlog写入到不同的hive表中,这个维护成本太高了。而且spark其实可以直接读取hdfs的json文件,因此直接放hdfs就好了。 写入到hdfs的话,考虑到后续读这个数据是要按照表去读增量数据,所以写入的目录一定是要带日期和表名称的。我这边用的目录结构是这样的: 也就是说要在flink根据数据所属的db、table_name、和日期将数据写入到不同的目录里。 在这一步的处理的过程中遇到了一些比较重要的参数问题。 2.如上所述checkpoint的时间间隔。不仅仅会影响checkpoint的频率,而且会影响hdfs文件的大小,而hdfs文件的大小可能会对hdfs的性能有很大影响。这个值如果太大,就会造成数据延迟太高,如果太小就会造成小文件过多。我这边设置的是5分钟。 细心的看官,这个时候会问了,既然你的目录是分table的,那么每个table每5分钟的binlog数据量是不一样的。对于某些大的mysql表,我们可能每5分钟生成一个文件还能接受。对于一些比较小的表,每五分钟生成一个文件那么文件就会非常小。所以我这边又做了一层的筛选,我把mysql的大的表筛选出来,只同步大的表到hdfs,用以binlog的数据同步。因为本身binlog的方式同步mysql数据为的就是节约mysql的读取压力,而小的表对于不会有太大压力,这些表可以直接通过jdbc的方式去同步。 这个是整个环节里面最复杂的一部分,涉及的细节也比较多。 首先,我们要明确一下总体的思路是什么。总体的思路就是要读取hdfs上的老的历史数据,然后和新的binlog数据合并生成新的快照。 其实这中间还涉及到一些其他的细节,比如mysql表结构变更,或者mysql和hive的数据结构不一致的情况。 另外我们这边还存在多个db的相同的表导入到hive的一张表中的其他问题,我就不赘述了。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/7300316.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-04
下一篇 2023-04-04

发表评论

登录后才能评论

评论列表(0条)

保存