flink-cdc 实现MySQL变更捕获

flink-cdc 实现MySQL变更捕获,第1张

flink-cdc 实现MySQL变更捕获

着重点在本地调试flink-cdc踩坑

工具选型
  1. MySQL 5.7.31
  2. flink 1.13.3 , flink-connector-mysql-cdc 2.2-SNAPSHOT
  3. Flink-CDC文档

选择flink-cdc的原因是为简化cdc过程中依赖的工具链,flink-cdc通过复用debezium的connect和kafka-connect实现直连flink,再者可通过flink平台适配的各种source sink和SQL client 轻松实现数据源同步。

参考过程

本地调试也需要flink的依赖, 注意冲突此处使用1.13.3 [国内仓库下载]
(https://mirrors.huaweicloud.com/apache/flink/)

需要把flink包下lib文件夹拷贝到本地项目调试,idea启动项也需要勾选 include provided

项目骨架

示例代码

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;


public class Application {

    public static void main(String[] args) throws Exception {
        mysql_cdc();
    }
    public static void mysql_cdc() throws Exception {
        MySqlSource mySqlSource = MySqlSource.builder()
                .hostname("192.168.31.233")
                .port(3306)
                .databaseList("test_database") // set captured database
                .tableList("test_database.test_table") // set captured table
                .username("root")
                .password("123456")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // enable checkpoint
        env.enableCheckpointing(3000);
        env.getCheckpointConfig().setCheckpointStorage(
                new FileSystemCheckpointStorage("file:///flink-ck/checkpoints"));

        env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL Snapshot + Binlog");
    }

}

依赖参考, 注意冲突天坑



    4.0.0

    org.example
    flink-cdc
    1.0-SNAPSHOT

    
        UTF-8
        1.13.3
        1.8
        2.11
        ${java.version}
        ${java.version}
        1.5.4.Final
        2.2.0
    

    
        
            org.apache.flink
            flink-table-common
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-core
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-table-api-java-bridge_${scala.binary.version}
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-core
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
            provided
        

        
            com.ververica
            flink-connector-mongodb-cdc
            2.2-SNAPSHOT
        

        
            com.ververica
            flink-connector-mysql-cdc
            2.2-SNAPSHOT
        
        
            mysql
            mysql-connector-java
            8.0.22
        

    

    
        

            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.1
                
                    ${java.version}
                    ${java.version}
                
            

            
            
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.0.0
                
                    
                    
                        package
                        
                            shade
                        
                        
                            false
                            
                                
                                    org.apache.flink:force-shading
                                    com.google.code.findbugs:jsr305
                                    org.slf4j:*
                                    log4j:*
                                
                            
                            
                                
                                    
                                    *:*
                                    
                                        meta-INF/*.SF
                                        meta-INF/*.DSA
                                        meta-INF/*.RSA
                                    
                                
                            
                            
                                
                                    com.demo.cdc.Application
                                
                            
                        
                    
                
            
        

    

完了开跑

第一次是全量快照, 之后是binlog的offset拉取,Flink Checkpoint 持久化断点续传

ERROR StatusLogger No Log4j 2 configuration file found. Using default configuration (logging only errors to the console), or user programmatically provided configurations. Set system property 'log4j2.debug' to show Log4j 2 internal initialization logging. See https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions on how to configure Log4j 2
{"before":null,"after":{"id":1,"name":"zhangsan"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288811,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288815,"transaction":null}
{"before":null,"after":{"id":3,"name":"wangwu"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288816,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288816,"transaction":null}
{"before":null,"after":{"id":2,"name":"lisi"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288816,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288816,"transaction":null}
十二月 28, 2021 3:41:30 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 192.168.31.233:3308 at mysql-bin.000003/2526 (sid:6257, cid:36)
{"before":null,"after":{"id":5,"name":"qianqi"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677322000,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":2724,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1640677323291,"transaction":null}
{"before":{"id":5,"name":"qianqi"},"after":{"id":5,"name":"钱七"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677338000,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":3001,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1640677338533,"transaction":null}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5683002.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存