- Flink-CDC简介
- DataStream方式应用
- FlinkSQL方式应用
- 自定义反序列化器
官网地址https://ververica.github.io/flink-cdc-connectors/master/
CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的 变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
CDC主要分为基于查询和基于Binlog两种方式
引入依赖
org.apache.flink flink-java1.12.0 org.apache.flink flink-streaming-java_2.121.12.0 org.apache.flink flink-clients_2.121.12.0 org.apache.hadoop hadoop-client3.1.3 mysql mysql-connector-java5.1.49 com.alibaba.ververica flink-connector-mysql-cdc1.2.0 com.alibaba fastjson1.2.75
打包方式:全量包
org.apache.maven.plugins maven-assembly-plugin3.0.0 jar-with-dependencies make-assembly package single
监控数据库变化
public static void main(String[] args) throws Exception { // 1. 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // Flink-cdc将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断电续传,需要从CheckPoint或者SavePoint启动程序 // 开启CK,每隔5秒钟做一次CK,CK超时时间10s,最大并发数2,CK之间间隔1,访问HDFS的用户名root,重启策略延时2秒重启3次 env.enableCheckpointing(5000L); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-210325/ck")); System.setProperty("HADOOP_USER_NAME", "root"); env.getCheckpointConfig().setCheckpointTimeout(10000L); env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1L); // 2. 通过FlinkCDC构建SourceFunction并读取数据 DebeziumSourceFunctionsourceFunction = MySQLSource. builder() .hostname("hadoop102") .port(3306) .username("root") .password("root") .databaseList("gmall-210325-flink") .tableList("gmall-210325-flink.base_trademark") // 如果不添加该参数,则消费指定数据库中所有表的数据,如果指定,指定方式为db.table,必须带上库名 .deserializer(new StringDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) .build(); DataStreamSource streamSource = env.addSource(sourceFunction); // 3. 打印数据 streamSource.print(); // 4. 启动任务 env.execute("FlinkCDC"); }
启动任务
bin/flink run -m hadoop102:8081 -c com.atguigu.FlinkCDC ./gmall-flink-cdc.jar
保存savepoint
# bin/flink savepoint 任务id hdfs地址 bin/flink savepoint eaebb93839f0c66014b34a9bf21b4cfa hdfs://hadoop102:8020/gmall-210325/sv
从savepoint处启动
bin/flink run -m hadoop102:8081 -s hdfs://hadoop102:8020/gmall-210325/sv/savepoint-eaebb9-0759b72fba40 -c com.atguigu.FlinkCDC ./gmall-flink-cdc.jarFlinkSQL方式应用
引入依赖
org.apache.flink flink-table-planner-blink_2.121.12.0
监控数据库变化
public static void main(String[] args) throws Exception { // 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // DDL方式建表 tableEnv.executeSql("CREATE TABLE mysql_binlog ( " + " id STRING NOT NULL, " + " tm_name STRING, " + " logo_url STRING " + ") WITH ( " + " 'connector' = 'mysql-cdc', " + " 'hostname' = 'hadoop102', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = 'root', " + " 'database-name' = 'gmall-210325-flink', " + " 'table-name' = 'base_trademark' " + ")"); // 查询数据 Table table = tableEnv.sqlQuery("select * from mysql_binlog"); // 将动态表转换为流 DataStream自定义反序列化器> retractStream = tableEnv.toRetractStream(table, Row.class); retractStream.print(); // 启动任务 env.execute("FlinkCDCWithSQL"); }
public class CustomerDeserialization implements DebeziumDeserializationSchema{ // 封装的数据格式 json @Override public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { // 创建JSON对线用于存储最终数据 JSONObject result = new JSONObject(); // 获取库名和表名 String topic = sourceRecord.topic(); String[] fields = topic.split("\."); String database = fields[1]; String tableName = fields[2]; Struct value = (Struct) sourceRecord.value(); // 获取before数据 Struct before = value.getStruct("before"); JSONObject beforeJson = new JSONObject(); if (before != null) { Schema beforeSchema = before.schema(); List beforeFields = beforeSchema.fields(); for (int i = 0; i < beforeFields.size(); i++) { Field field = beforeFields.get(i); Object beforevalue = before.get(field); beforeJson.put(field.name(), beforevalue); } } // 获取after数据 Struct after = value.getStruct("after"); JSONObject afterJson = new JSONObject(); if (after != null) { Schema afterSchema = after.schema(); List afterFields = afterSchema.fields(); for (Field field : afterFields) { Object afterValue = after.get(field); afterJson.put(field.name(), afterValue); } } // 获取 *** 作类型 Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toLowerCase(); if("create".equals(type)){ type="insert"; } // 将字段写入JSON对线 result.put("database", database); result.put("tableName", tableName); result.put("before", beforeJson); result.put("after", afterJson); result.put("type", type); // 输出数据 collector.collect(result.toJSONString()); } @Override public TypeInformation getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
使用自定义序列化器
public static void main(String[] args) throws Exception { // 1. 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // Flink-cdc将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断电续传,需要从CheckPoint或者SavePoint启动程序 // 开启CK,每隔5秒钟做一次CK,CK超时时间10s,最大并发数2,CK之间间隔1,访问HDFS的用户名root,重启策略延时2秒重启3次 env.enableCheckpointing(5000L); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-210325/ck")); System.setProperty("HADOOP_USER_NAME", "root"); env.getCheckpointConfig().setCheckpointTimeout(10000L); env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1L); // 2. 通过FlinkCDC构建SourceFunction并读取数据 DebeziumSourceFunctionsourceFunction = MySQLSource. builder() .hostname("hadoop102") .port(3306) .username("root") .password("root") .databaseList("gmall-210325-flink") .tableList("gmall-210325-flink.base_trademark") // 如果不添加该参数,则消费指定数据库中所有表的数据,如果指定,指定方式为db.table,必须带上库名 .deserializer(new CustomerDeserialization()) .startupOptions(StartupOptions.initial()) .build(); DataStreamSource streamSource = env.addSource(sourceFunction); // 3. 打印数据 streamSource.print(); // 4. 启动任务 env.execute("FjavalinkCDCWithCustomerDeserialization"); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)