01.Flink实时数据管理—自定义MysqlSource + 广播变量创建

01.Flink实时数据管理—自定义MysqlSource + 广播变量创建,第1张

需求背景:ETL离线作业,需要实时监控运行状况,由于调度使用Azkaban,故同步获取其后台配置库Mysql;本文记录第一步:获取projects工程码表,并作为广播变量,供后续使用。

Flink + Scalikejdbc + Scala

注意事项:

1.1首先加入JDBC依赖

1.2定义JDBCInputFormat

1.3获取Row类型的DataStreamSource

1.4转化DataStream<Row>为DataStream<Student>

目前业务上有同步数据到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,

科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波

主要实现getUpsertStatement的方法,本来以为能直接copy一波flinkStreamSql 的实现,结果发现

报错 SQL statement must not contain ? character.

查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改

完整的SqlServerDialect文件

最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了

注意url写法为:jdbc:jtds:sqlserver://xxx:1433databaseName=master

[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/8644009.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-19
下一篇 2023-04-19

发表评论

登录后才能评论

评论列表(0条)

保存