Flink-CDC

Flink-CDC,第1张

Flink-CDC

目录
  • Flink-CDC简介
  • DataStream方式应用
  • FlinkSQL方式应用
  • 自定义反序列化器

Flink-CDC简介

官网地址https://ververica.github.io/flink-cdc-connectors/master/

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的 变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC主要分为基于查询和基于Binlog两种方式

基于查询的CDC基于Binlog的CDC开源产品Sqoop、Kafka JDBC SourceCanal、MAxwell、Debezium执行模式Batch批处理Streaming流处理是否可以捕获所有数据变化否是延迟性高延迟低延迟是否增加数据库压力是否 DataStream方式应用

引入依赖


    
        org.apache.flink
        flink-java
        1.12.0
    
    
        org.apache.flink
        flink-streaming-java_2.12
        1.12.0
    
    
        org.apache.flink
        flink-clients_2.12
        1.12.0
    
    
        org.apache.hadoop
        hadoop-client
        3.1.3
    
    
        mysql
        mysql-connector-java
        5.1.49
    
    
        com.alibaba.ververica
        flink-connector-mysql-cdc
        1.2.0
    
    
        com.alibaba
        fastjson
        1.2.75
    

打包方式:全量包


    
        
            org.apache.maven.plugins
            maven-assembly-plugin
            3.0.0
            
                
                    jar-with-dependencies
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
    

监控数据库变化

public static void main(String[] args) throws Exception {
    // 1. 获取执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // Flink-cdc将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断电续传,需要从CheckPoint或者SavePoint启动程序

    // 开启CK,每隔5秒钟做一次CK,CK超时时间10s,最大并发数2,CK之间间隔1,访问HDFS的用户名root,重启策略延时2秒重启3次
    env.enableCheckpointing(5000L);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
    env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-210325/ck"));
    System.setProperty("HADOOP_USER_NAME", "root");
    env.getCheckpointConfig().setCheckpointTimeout(10000L);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1L);

    // 2. 通过FlinkCDC构建SourceFunction并读取数据
    DebeziumSourceFunction sourceFunction = MySQLSource.builder()
            .hostname("hadoop102")
            .port(3306)
            .username("root")
            .password("root")
            .databaseList("gmall-210325-flink")
            .tableList("gmall-210325-flink.base_trademark") // 如果不添加该参数,则消费指定数据库中所有表的数据,如果指定,指定方式为db.table,必须带上库名
            .deserializer(new StringDebeziumDeserializationSchema())
            .startupOptions(StartupOptions.initial())
            .build();
    DataStreamSource streamSource = env.addSource(sourceFunction);
    // 3. 打印数据
    streamSource.print();
    // 4. 启动任务
    env.execute("FlinkCDC");
}

启动任务

bin/flink run -m hadoop102:8081 -c com.atguigu.FlinkCDC ./gmall-flink-cdc.jar

保存savepoint

# bin/flink savepoint 任务id hdfs地址
bin/flink savepoint eaebb93839f0c66014b34a9bf21b4cfa hdfs://hadoop102:8020/gmall-210325/sv

从savepoint处启动

bin/flink run -m hadoop102:8081 -s hdfs://hadoop102:8020/gmall-210325/sv/savepoint-eaebb9-0759b72fba40 -c com.atguigu.FlinkCDC ./gmall-flink-cdc.jar

FlinkSQL方式应用

引入依赖


    org.apache.flink
    flink-table-planner-blink_2.12
    1.12.0

监控数据库变化

public static void main(String[] args) throws Exception {
    // 获取执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    // DDL方式建表
    tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
            " id STRING NOT NULL, " +
            " tm_name STRING, " +
            " logo_url STRING " +
            ") WITH ( " +
            " 'connector' = 'mysql-cdc', " +
            " 'hostname' = 'hadoop102', " +
            " 'port' = '3306', " +
            " 'username' = 'root', " +
            " 'password' = 'root', " +
            " 'database-name' = 'gmall-210325-flink', " +
            " 'table-name' = 'base_trademark' " +
            ")");
    // 查询数据
    Table table = tableEnv.sqlQuery("select * from mysql_binlog");
    // 将动态表转换为流
    DataStream> retractStream = tableEnv.toRetractStream(table, Row.class);
    retractStream.print();
    // 启动任务
    env.execute("FlinkCDCWithSQL");
}
自定义反序列化器
public class CustomerDeserialization implements DebeziumDeserializationSchema {

    // 封装的数据格式 json
    
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
        // 创建JSON对线用于存储最终数据
        JSONObject result = new JSONObject();
        // 获取库名和表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct value = (Struct) sourceRecord.value();
        // 获取before数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List beforeFields = beforeSchema.fields();
            for (int i = 0; i < beforeFields.size(); i++) {
                Field field = beforeFields.get(i);
                Object beforevalue = before.get(field);
                beforeJson.put(field.name(), beforevalue);
            }
        }
        // 获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }
        // 获取 *** 作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if("create".equals(type)){
            type="insert";
        }
        // 将字段写入JSON对线
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);
        // 输出数据
        collector.collect(result.toJSONString());
    }

    @Override
    public TypeInformation getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

使用自定义序列化器

public static void main(String[] args) throws Exception {
    // 1. 获取执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // Flink-cdc将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断电续传,需要从CheckPoint或者SavePoint启动程序

    // 开启CK,每隔5秒钟做一次CK,CK超时时间10s,最大并发数2,CK之间间隔1,访问HDFS的用户名root,重启策略延时2秒重启3次
    env.enableCheckpointing(5000L);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
    env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-210325/ck"));
    System.setProperty("HADOOP_USER_NAME", "root");
    env.getCheckpointConfig().setCheckpointTimeout(10000L);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1L);

    // 2. 通过FlinkCDC构建SourceFunction并读取数据
    DebeziumSourceFunction sourceFunction = MySQLSource.builder()
            .hostname("hadoop102")
            .port(3306)
            .username("root")
            .password("root")
            .databaseList("gmall-210325-flink")
            .tableList("gmall-210325-flink.base_trademark") // 如果不添加该参数,则消费指定数据库中所有表的数据,如果指定,指定方式为db.table,必须带上库名
            .deserializer(new CustomerDeserialization())
            .startupOptions(StartupOptions.initial())
            .build();
    DataStreamSource streamSource = env.addSource(sourceFunction);
    // 3. 打印数据
    streamSource.print();
    // 4. 启动任务
    env.execute("FjavalinkCDCWithCustomerDeserialization");
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5695842.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存