FlinkX快速开始(一)

FlinkX快速开始(一),第1张

链接地址: FlinkX

FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等

1、前置

需要安装maven、java8、配置好github相关参数

2、Fork FlinX项目到自己的仓库中

2、Clone项目到本地

git clone https://github.com/liukunyuan/flinkx.git

3、安装额外的jar包

1)、cd flinkx/bin

2)、执行sh ./install_jars.sh(windows执行install_jars.bat脚本)

4、打包

1)、回到flinkx目录:cd ..

2)、执行打包命令:mvn clean package -Dmaven.test.skip=true

1、配置flink conf文件(暂时不需要安装flink)

1)、进入flinkconf目录

cd flinkconf

2)、修改flink-conf.yaml文件添加一行

rest.bind-port: 8888

2、配置mysqltomysql的json文件,路径:/Users/jack/Documents/jack-project/flinkx/flinkconf/mysql2mysql.json

3、运行任务

4、查看监控网页和log.txt文件: http://localhost:8888/

目前业务上有同步数据到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,

科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波

主要实现getUpsertStatement的方法,本来以为能直接copy一波flinkStreamSql 的实现,结果发现

报错 SQL statement must not contain ? character.

查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改

完整的SqlServerDialect文件

最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了

注意url写法为:jdbc:jtds:sqlserver://xxx:1433databaseName=master

[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/7558068.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-07
下一篇 2023-04-07

发表评论

登录后才能评论

评论列表(0条)

保存