大数据流处理框架之Flink-CDC

大数据流处理框架之Flink-CDC,第1张

数据流处理框架之Flink-CDC 1. CDC简介 1.1 CDC种类

FlinkCDC,简单了解下Change Data Capture(变更数据获取)的概念:
监控并捕获数据库的变更,将这些变更按照发生的顺序进行记录,写入消息中间件供其他服务订阅及消费。
CDC的种类:主要分为基于查询和基于Binlog两种方式,区别:

1.2 FlinkCDC

Flink自然也不甘示弱,FlinkCDC应运而生,通过flink-cdc-connectors 组件,可以直接从MySQL等数据库直接读取全量数据和增量变更数据的source组件

2. 实战Coding

通过一个简单的Demo学会使用FlinkCDC

2.1 DataStream方式

通过创建maven项目,通过pom文件注入相关依赖:

 
 org.apache.flink
 flink-java
 1.12.0
 
 
 org.apache.flink
 flink-streaming-java_2.12
 1.12.0
 
 
 org.apache.flink
 flink-clients_2.12
 1.12.0
 
 
 org.apache.hadoop
 hadoop-client
 3.1.3
 
 
 mysql
 mysql-connector-java
 5.1.49
 
  
 com.alibaba.ververica
 flink-connector-mysql-cdc
 1.2.0
 
 
 com.alibaba
 fastjson
 1.2.75
 


 
 
 org.apache.maven.plugins
 maven-assembly-plugin
 3.0.0
 
 
 jar-with-dependencies
 
 
 
 
 make-assembly
 package
 
 single
 
 
 
 
 

依赖注入后就可以开始Coding…(愉快的打开IDEA)

public class FlinkCDC {
 public static void main(String[] args) throws Exception {
 //1.创建执行环境
 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 
 //2.1开启CheckPoint,每五秒做一次CheckPoint
env.enableCheckpointing(5);
 //2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(
CheckpointingMode.EXACTLY_ONCE);
 //2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
 //2.4 指定从 CK 自动重启策略
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
 //2.5 设置状态后端
 env.setStateBackend(new FsStateBackend("hdfs://master:8020/flinkCDC"));
 //2.6 设置访问 HDFS 的用户名
 System.setProperty("HADOOP_USER_NAME", "root");
//3.创建 Flink-MySQL-CDC 的 Source
 DebeziumSourceFunction mysqlSource = MySQLSource.builder()
 .hostname("master")
 .port(3306)
 .username("root")
 .password("000000")
 .databaseList("mall-flink")
 .tableList("mall-flink.z_user_info") 
 //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
 .startupOptions(StartupOptions.initial())
 .deserializer(new StringDebeziumDeserializationSchema())
 .build();
 //4.使用 CDC Source 从 MySQL 读取数据
 DataStreamSource mysqlDS = env.addSource(mysqlSource);
 //5.打印数据
 mysqlDS.print();
 //6.执行任务
 env.execute();
 }
}

ok,到这里代码部分已经完成,接下来开始测试
将代码打包上传至服务器 mvn clean package
(确保MySQL Binlog开启状态,若是首次开始,则需重启MySQL)
启动Flink,HDFS集群,最后启动程序(java -jar FlinkCDC.jar)

2.2 FlinkSQL方式

同样首先注入依赖


 org.apache.flink
 flink-table-planner-blink_2.12
 1.12.0

public class FlinkSQL_CDC {
 public static void main(String[] args) throws Exception {
 //1.创建执行环境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 //2.创建 Flink-MySQL-CDC 的 Source
 tableEnv.executeSql("CREATE TABLE user_info (" +
  " id INT," +
 " name STRING," +
 " phone_num STRING" +
 ") WITH (" +
 " 'connector' = 'mysql-cdc'," +
 " 'hostname' = 'master'," +
 " 'port' = '3306'," +
 " 'username' = 'root'," +
 " 'password' = '000000'," +
 " 'database-name' = 'mall-flink'," +
 " 'table-name' = 'z_user_info'" +
 ")");
 tableEnv.executeSql("select * from user_info").print();
 env.execute();
 }
}
2.3 自定义反序列化器
public class Flink_CDCWithCustomerSchema {
 public static void main(String[] args) throws Exception {
 //1.创建执行环境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 //2.创建 Flink-MySQL-CDC 的 Source
 Properties properties = new Properties();
 DebeziumSourceFunction mysqlSource = MySQLSource.builder()
 .hostname("master")
 .port(3306)
 .username("root")
 .password("000000")
 .databaseList("mall-flink")
 .tableList("mall-flink.z_user_info") 
.startupOptions(StartupOptions.initial())
 .deserializer(new DebeziumDeserializationSchema() { //自定义数据解析器
 @Override
 public void deserialize(SourceRecord sourceRecord, Collector 
collector) throws Exception {
 //获取主题信息,包含着数据库和表名 
mysql_binlog_source.gmall-flink.z_user_info
 String topic = sourceRecord.topic();
String[] arr = topic.split("\.");
 String db = arr[1];
String tableName = arr[2];
 //获取 *** 作类型 READ DELETE UPDATE CREATE
 Envelope.Operation operation = 
Envelope.operationFor(sourceRecord);
 //获取值信息并转换为 Struct 类型
 Struct value = (Struct) sourceRecord.value();
 //获取变化后的数据
Struct after = value.getStruct("after");
 //创建 JSON 对象用于存储数据信息
 JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
 Object o = after.get(field);
data.put(field.name(), o);
 }
 //创建 JSON 对象用于封装最终返回值数据信息
 JSONObject result = new JSONObject();
result.put("operation", operation.toString().toLowerCase());
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
 //发送数据至下游
collector.collect(result.toJSONString());
 }
 @Override
 public TypeInformation getProducedType() {
 return TypeInformation.of(String.class);
 }
 })
 .build();
 //3.使用 CDC Source 从 MySQL 读取数据
 DataStreamSource mysqlDS = env.addSource(mysqlSource);
 //4.打印数据
 mysqlDS.print();
 //5.执行任务
 env.execute();
 	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5712430.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存