flinksql从kafka中消费mysql的binlog日志

flinksql从kafka中消费mysql的binlog日志,第1张

*使用canal采集mysql的binlog,输出到kafka,然后使用flinksql消费kafka,并输出到屏幕

1、vim /home/jaming/docker_work/mysql/conf/my.cnf

2、docker查看mysql的binlog开启是否成功

版本:canal.deployer-1.1.5-SNAPSHOT.tar.gz

1、vim /home/jaming/software/canal/conf/example/instance.properties

2、vim /home/jaming/software/canal/conf/canal.properties

略。。。。

目前业务上有同步数据到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,

科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波

主要实现getUpsertStatement的方法,本来以为能直接copy一波flinkStreamSql 的实现,结果发现

报错 SQL statement must not contain ? character.

查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改

完整的SqlServerDialect文件

最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了

注意url写法为:jdbc:jtds:sqlserver://xxx:1433databaseName=master

[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/7273983.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-03
下一篇 2023-04-03

发表评论

登录后才能评论

评论列表(0条)

保存