科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波
主要实现getUpsertStatement的方法,本来以为能直接copy一波flinkStreamSql 的实现,结果发现
报错 SQL statement must not contain ? character.
查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改
完整的SqlServerDialect文件
最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了
注意url写法为:jdbc:jtds:sqlserver://xxx:1433databaseName=master
[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java
需求背景:ETL离线作业,需要实时监控运行状况,由于调度使用Azkaban,故同步获取其后台配置库Mysql;本文记录第一步:获取projects工程码表,并作为广播变量,供后续使用。
Flink + Scalikejdbc + Scala
注意事项:
链接地址: FlinkX
FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等
1、前置
需要安装maven、java8、配置好github相关参数
2、Fork FlinX项目到自己的仓库中
2、Clone项目到本地
git clone https://github.com/liukunyuan/flinkx.git
3、安装额外的jar包
1)、cd flinkx/bin
2)、执行sh ./install_jars.sh(windows执行install_jars.bat脚本)
4、打包
1)、回到flinkx目录:cd ..
2)、执行打包命令:mvn clean package -Dmaven.test.skip=true
1、配置flink conf文件(暂时不需要安装flink)
1)、进入flinkconf目录
cd flinkconf
2)、修改flink-conf.yaml文件添加一行
rest.bind-port: 8888
2、配置mysqltomysql的json文件,路径:/Users/jack/Documents/jack-project/flinkx/flinkconf/mysql2mysql.json
3、运行任务
4、查看监控网页和log.txt文件: http://localhost:8888/
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)