FlinkCDC,简单了解下Change Data Capture(变更数据获取)的概念:
监控并捕获数据库的变更,将这些变更按照发生的顺序进行记录,写入消息中间件供其他服务订阅及消费。
CDC的种类:主要分为基于查询和基于Binlog两种方式,区别:
Flink自然也不甘示弱,FlinkCDC应运而生,通过flink-cdc-connectors 组件,可以直接从MySQL等数据库直接读取全量数据和增量变更数据的source组件
2. 实战Coding通过一个简单的Demo学会使用FlinkCDC
2.1 DataStream方式通过创建maven项目,通过pom文件注入相关依赖:
org.apache.flink flink-java1.12.0 org.apache.flink flink-streaming-java_2.121.12.0 org.apache.flink flink-clients_2.121.12.0 org.apache.hadoop hadoop-client3.1.3 mysql mysql-connector-java5.1.49 com.alibaba.ververica flink-connector-mysql-cdc1.2.0 com.alibaba fastjson1.2.75 org.apache.maven.plugins maven-assembly-plugin3.0.0 jar-with-dependencies make-assembly package single
依赖注入后就可以开始Coding…(愉快的打开IDEA)
public class FlinkCDC { public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.1开启CheckPoint,每五秒做一次CheckPoint env.enableCheckpointing(5); //2.2 指定 CK 的一致性语义 env.getCheckpointConfig().setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE); //2.3 设置任务关闭的时候保留最后一次 CK 数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp ointCleanup.RETAIN_ON_CANCELLATION); //2.4 指定从 CK 自动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //2.5 设置状态后端 env.setStateBackend(new FsStateBackend("hdfs://master:8020/flinkCDC")); //2.6 设置访问 HDFS 的用户名 System.setProperty("HADOOP_USER_NAME", "root"); //3.创建 Flink-MySQL-CDC 的 Source DebeziumSourceFunctionmysqlSource = MySQLSource. builder() .hostname("master") .port(3306) .username("root") .password("000000") .databaseList("mall-flink") .tableList("mall-flink.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式 .startupOptions(StartupOptions.initial()) .deserializer(new StringDebeziumDeserializationSchema()) .build(); //4.使用 CDC Source 从 MySQL 读取数据 DataStreamSource mysqlDS = env.addSource(mysqlSource); //5.打印数据 mysqlDS.print(); //6.执行任务 env.execute(); } }
ok,到这里代码部分已经完成,接下来开始测试
将代码打包上传至服务器 mvn clean package
(确保MySQL Binlog开启状态,若是首次开始,则需重启MySQL)
启动Flink,HDFS集群,最后启动程序(java -jar FlinkCDC.jar)
同样首先注入依赖
org.apache.flink flink-table-planner-blink_2.121.12.0
public class FlinkSQL_CDC { public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //2.创建 Flink-MySQL-CDC 的 Source tableEnv.executeSql("CREATE TABLE user_info (" + " id INT," + " name STRING," + " phone_num STRING" + ") WITH (" + " 'connector' = 'mysql-cdc'," + " 'hostname' = 'master'," + " 'port' = '3306'," + " 'username' = 'root'," + " 'password' = '000000'," + " 'database-name' = 'mall-flink'," + " 'table-name' = 'z_user_info'" + ")"); tableEnv.executeSql("select * from user_info").print(); env.execute(); } }2.3 自定义反序列化器
public class Flink_CDCWithCustomerSchema { public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.创建 Flink-MySQL-CDC 的 Source Properties properties = new Properties(); DebeziumSourceFunctionmysqlSource = MySQLSource. builder() .hostname("master") .port(3306) .username("root") .password("000000") .databaseList("mall-flink") .tableList("mall-flink.z_user_info") .startupOptions(StartupOptions.initial()) .deserializer(new DebeziumDeserializationSchema () { //自定义数据解析器 @Override public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { //获取主题信息,包含着数据库和表名 mysql_binlog_source.gmall-flink.z_user_info String topic = sourceRecord.topic(); String[] arr = topic.split("\."); String db = arr[1]; String tableName = arr[2]; //获取 *** 作类型 READ DELETE UPDATE CREATE Envelope.Operation operation = Envelope.operationFor(sourceRecord); //获取值信息并转换为 Struct 类型 Struct value = (Struct) sourceRecord.value(); //获取变化后的数据 Struct after = value.getStruct("after"); //创建 JSON 对象用于存储数据信息 JSONObject data = new JSONObject(); for (Field field : after.schema().fields()) { Object o = after.get(field); data.put(field.name(), o); } //创建 JSON 对象用于封装最终返回值数据信息 JSONObject result = new JSONObject(); result.put("operation", operation.toString().toLowerCase()); result.put("data", data); result.put("database", db); result.put("table", tableName); //发送数据至下游 collector.collect(result.toJSONString()); } @Override public TypeInformation getProducedType() { return TypeInformation.of(String.class); } }) .build(); //3.使用 CDC Source 从 MySQL 读取数据 DataStreamSource mysqlDS = env.addSource(mysqlSource); //4.打印数据 mysqlDS.print(); //5.执行任务 env.execute(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)