第二章 数据采集模块之FlinkCDC实时采集Mysql业务数据(源码&资料见文末)

第二章 数据采集模块之FlinkCDC实时采集Mysql业务数据(源码&资料见文末),第1张

第二章 数据采集模块之FlinkCDC实时采集Mysql业务数据(源码&资料见文末) 1、Mysql数据准备

(1)创建实时同步数据库

create database flink_gmall

(2)将Mysql.sql文件导入到Mysql中

source /opt/data/Mysql.sql

(3)查看数据库表

show tables;

2、开启数据库的binlog

(1)在mysql中对需要进行实时数据监测的库开启binlog

sudo vim /etc/my.cnf

#添加数据库的binlog
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=flink_gmall


#重启MySQL服务
sudo systemctl restart mysqld

(2)查询生成日志

cd /var/lib/mysql

3、编写脚本

(1)创建项目

项目目录详述 目录作用app产生各层的flink任务bean数据对象common公共常量utils工具类

(2)编写ODS代码

创建topic

bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --create --replication-factor 3 --partitions 6 --topic ods_behavior_db

工具类MyKafkaUtil

package com.lhw.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class MyKafkaUtil {
    public static FlinkKafkaProducer getKafkaProducer(String topic){
        return new FlinkKafkaProducer("hadoop102:9092,hadoop103:9092,hadoop104:9092",topic,new SimpleStringSchema());
    }
}

反序列化函数CustomerDeserialization

package com.lhw.app.function;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomerDeserialization implements DebeziumDeserializationSchema {

    
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

        //1.创建JSON对象用于存储最终数据
        JSONObject result = new JSONObject();

        //2.获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforevalue = before.get(field);
                beforeJson.put(field.name(), beforevalue);
            }
        }

        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //5.获取 *** 作类型  CREATE UPDATe DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        //6.将字段写入JSON对象
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);

        //7.输出数据
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

FlinkCDC的mainclass

package com.lhw.app.ods;

import com.lhw.app.function.CustomerDeserialization;
import com.lhw.utils.MyKafkaUtil;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class MysqlFlinkCDC {
    public static void main(String[] args) throws Exception {
        // 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

//        // 2、开启CK并指定状态后端为FS
//        // 2.1、制定存储ck地址
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
//        // 2.2、指定ck储存触发间隔时间
//        env.enableCheckpointing(5000);
//        // 2.3、指定ck模式
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        // 2.4、指定超时时间
//        env.getCheckpointConfig().setAlignmentTimeout(Duration.ofSeconds(1000));
//        // 2.5、CK最小触发间隔时间
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2000);

        // 3、通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction mysqlSource = MySqlSource.builder()
                .hostname("hadoop102")
                .port(3306)
                .databaseList("flink_gmall")  // set captured database
                .tableList("flink_gmall.base_category1") // 如果不添加该参数,则消费指定数据库中的所有表
                .username("root")
                .password("123456")
                .startupOptions(StartupOptions.initial())
                .deserializer(new CustomerDeserialization())
                .build();
        // 4、使用CDC Source方式从mysql中读取数据
        DataStreamSource mysqlDS = env.addSource(mysqlSource);

        // 5、打印数据并将数据输入到kafka
        mysqlDS.print();
        String sinkTopic = "ods_behavior_db";
        mysqlDS.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        // 5、执行任务
        env.execute("flinkcdcmysql3");
    }
}

(3)运行测试

本地IDEA打印输出 & kafka集群消费

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_behavior_db

(4)检测变化

Mysql进行增改删工作

#新增数据
insert into flink_gmall.base_category1 values(18,"奢侈品");

#修改数据
update flink_gmall.base_category1 set name="轻奢品" where id = 18;

#删除数据
delete from flink_gmall.base_category1 where id=18;

IDEA检测变化 & Kakfa集群检测变化

(5)模拟数据生成(检测整个库的变化)

①文件上传并运行

#文件上传
rz 
	application.properties
	gmall2020-mock-db-2020-11-27.jar

②修改配置文件

修改对应的库名,用户名&密码

③运行数据生成项目文件

java -jar gmall2020-mock-db-2020-11-27.jar

④IDEA检测变化 & kafka消费数据变化

4、打包部署

(1)修改代码

添加CK保存位置

package com.lhw.app.ods;

import com.lhw.app.function.CustomerDeserialization;
import com.lhw.utils.MyKafkaUtil;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class MysqlFlinkCDC {
    public static void main(String[] args) throws Exception {
        // 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 2、开启CK并指定状态后端为FS
        // 2.1、制定存储ck地址
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC/flinkcdc-gmall-db"));
        // 2.2、指定ck储存触发间隔时间
        env.enableCheckpointing(5000);
        // 2.3、指定ck模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 2.4、指定超时时间
        env.getCheckpointConfig().setAlignmentTimeout(Duration.ofSeconds(1000));
        // 2.5、CK最小触发间隔时间
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2000);


        // 3、通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction mysqlSource = MySqlSource.builder()
                .hostname("hadoop102")
                .port(3306)
                .databaseList("flink_gmall")  // set captured database
                .username("root")
                .password("123456")
                .startupOptions(StartupOptions.initial())
                .deserializer(new CustomerDeserialization())
                .build();
        // 4、使用CDC Source方式从mysql中读取数据
        DataStreamSource mysqlDS = env.addSource(mysqlSource);

        // 5、打印数据并将数据输入到kafka
        mysqlDS.print();
        String sinkTopic = "ods_behavior_db";
        mysqlDS.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        // 6、执行任务
        env.execute("flinkcdcmysql3");
    }
}

(2)打全量包

maven–>package

(3)上传脚本并运行

bin/flink run -m hadoop102:8081 -c com.lhw.MysqlFlinkCDC /opt/data/gmall-flinkcdc-mysql.jar 

查看运行结果

查看WEB端的打印输出

(4)变更业务数据

运行业务数据的变更包

java -jar gmall2020-mock-db-2020-11-27.jar

查看运行结果(WEB端运行状态 & Kafka消费者消费消息)

查看WEB端的打印输出

5、代码运行步骤

代码路径: /opt/module/data/mysql_cdc

启动步骤

#1、启动Flink
bin/start-culter

#2、启动kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_behavior_db


#3、启动flink的WEB端
http://hadoop102:8081/

#4、启动flink任务
bin/flink run -m hadoop102:8081 -c com.lhw.MysqlFlinkCDC /opt/data/gmall-flinkcdc-mysql.jar 

运行界面

6、源码&资料下载
链接:https://pan.baidu.com/s/1mVRzQ4G5wqKkCAW74IzDOA 
提取码:eni1 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5706243.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存