工具选型
- MySQL 5.7.31
- flink 1.13.3 , flink-connector-mysql-cdc 2.2-SNAPSHOT
- Flink-CDC文档
选择flink-cdc的原因是为简化cdc过程中依赖的工具链,flink-cdc通过复用debezium的connect和kafka-connect实现直连flink,再者可通过flink平台适配的各种source sink和SQL client 轻松实现数据源同步。
参考过程本地调试也需要flink的依赖, 注意冲突此处使用1.13.3 [国内仓库下载]
(https://mirrors.huaweicloud.com/apache/flink/)
需要把flink包下lib文件夹拷贝到本地项目调试,idea启动项也需要勾选 include provided
项目骨架
示例代码
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; public class Application { public static void main(String[] args) throws Exception { mysql_cdc(); } public static void mysql_cdc() throws Exception { MySqlSourcemySqlSource = MySqlSource. builder() .hostname("192.168.31.233") .port(3306) .databaseList("test_database") // set captured database .tableList("test_database.test_table") // set captured table .username("root") .password("123456") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); env.getCheckpointConfig().setCheckpointStorage( new FileSystemCheckpointStorage("file:///flink-ck/checkpoints")); env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // set 4 parallel source tasks .setParallelism(4) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute("Print MySQL Snapshot + Binlog"); } }
依赖参考, 注意冲突天坑
完了开跑4.0.0 org.example flink-cdc1.0-SNAPSHOT UTF-8 1.13.3 1.8 2.11 ${java.version} ${java.version} 1.5.4.Final 2.2.0 org.apache.flink flink-table-common${flink.version} provided org.apache.flink flink-core${flink.version} provided org.apache.flink flink-table-api-java-bridge_${scala.binary.version}${flink.version} provided org.apache.flink flink-core${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version}${flink.version} provided com.ververica flink-connector-mongodb-cdc2.2-SNAPSHOT com.ververica flink-connector-mysql-cdc2.2-SNAPSHOT mysql mysql-connector-java8.0.22 org.apache.maven.plugins maven-compiler-plugin3.1 ${java.version} org.apache.maven.plugins maven-shade-plugin3.0.0 package shade false org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* log4j:* *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA com.demo.cdc.Application
第一次是全量快照, 之后是binlog的offset拉取,Flink Checkpoint 持久化断点续传
ERROR StatusLogger No Log4j 2 configuration file found. Using default configuration (logging only errors to the console), or user programmatically provided configurations. Set system property 'log4j2.debug' to show Log4j 2 internal initialization logging. See https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions on how to configure Log4j 2 {"before":null,"after":{"id":1,"name":"zhangsan"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288811,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288815,"transaction":null} {"before":null,"after":{"id":3,"name":"wangwu"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288816,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288816,"transaction":null} {"before":null,"after":{"id":2,"name":"lisi"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288816,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288816,"transaction":null} 十二月 28, 2021 3:41:30 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect 信息: Connected to 192.168.31.233:3308 at mysql-bin.000003/2526 (sid:6257, cid:36) {"before":null,"after":{"id":5,"name":"qianqi"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677322000,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":2724,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1640677323291,"transaction":null} {"before":{"id":5,"name":"qianqi"},"after":{"id":5,"name":"钱七"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677338000,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":3001,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1640677338533,"transaction":null}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)