Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖

Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据湖,第1张

Flink CDC 系列 - 同步 MySQL 分库分表,构建 Iceberg 实时数据

在 OLTP 系统中,为了解决单表数据量大的问题,通常采用分库分表的方式将单个大表进行拆分以提高系统的吞吐量。

但是为了方便数据分析,通常需要将分库分表拆分出的表在同步到数据仓库、数据湖时,再合并成一个大表。

这篇教程将展示如何使用 Flink CDC 构建实时数据湖来应对这种场景,本教程的演示基于 Docker,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE,你可以很方便地在自己的电脑上完成本教程的全部内容。

接下来将以数据从 MySQL 同步到 Iceberg[1] 为例展示整个流程,架构图如下所示:

一、准备阶段 1.2 准备数据

进入 MySQL 容器中:

创建两个不同的数据库,并在每个数据库中创建两个表,作为 user 表分库分表下拆分出的表。

 CREATE DATAbase db_1;
 USE db_1;
 CREATE TABLE user_1 (
   id INTEGER NOT NULL PRIMARY KEY,
   name VARCHAr(255) NOT NULL DEFAULT 'flink',
   address VARCHAr(1024),
   phone_number VARCHAr(512),
   email VARCHAr(255)
 );
 INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");

 CREATE TABLE user_2 (
   id INTEGER NOT NULL PRIMARY KEY,
   name VARCHAr(255) NOT NULL DEFAULT 'flink',
   address VARCHAr(1024),
   phone_number VARCHAr(512),
   email VARCHAr(255)
 );
INSERT INTO user_2 VALUES (120,"user_120","Shanghai","123567891234","user_120@foo.com");
CREATE DATAbase db_2;
USE db_2;
CREATE TABLE user_1 (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAr(255) NOT NULL DEFAULT 'flink',
  address VARCHAr(1024),
  phone_number VARCHAr(512),
  email VARCHAr(255)
);
INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234", NULL);

CREATE TABLE user_2 (
  id INTEGER NOT NULL PRIMARY KEY,
  name VARCHAr(255) NOT NULL DEFAULT 'flink',
  address VARCHAr(1024),
  phone_number VARCHAr(512),
  email VARCHAr(255)
);
INSERT INTO user_2 VALUES (220,"user_220","Shanghai","123567891234","user_220@foo.com");
二、在 Flink SQL CLI 中

使用 Flink DDL 创建表

1. 开启 checkpoint

        Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。并且,mysql-cdc 在 binlog 读取阶段开始前,需要等待一个完整的 checkpoint 来避免 binlog 记录乱序的情况。

-- Flink SQL-- 每隔 3 秒做一次 checkpointFlink SQL> SET execution.checkpointing.interval = 3s;

2. 创建 MySQL 分库分表 source 表

        创建 source 表 user_source 来捕获MySQL中所有 user 表的数据,在表的配置项 database-name , table-name 使用正则表达式来匹配这些表。并且,user_source 表也定义了 metadata 列来区分数据是来自哪个数据库和表。

Flink SQL> CREATE TABLE user_source (
    database_name STRING metaDATA VIRTUAL,
    table_name STRING metaDATA VIRTUAL,
    `id` DECIMAL(20, 0) NOT NULL,
    name STRING,
    address STRING,
    phone_number STRING,
    email STRING,
    PRIMARY KEY (`id`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mysql',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'db_[0-9]+',
    'table-name' = 'user_[0-9]+'
  );

3. 创建 Iceberg sink 表

        创建 sink 表 all_users_sink,用来将数据加载至 Iceberg 中。在这个 sink 表,考虑到不同的 MySQL 数据库表的 id 字段的值可能相同,我们定义了复合主键 (database_name, table_name, id)。

Flink SQL> CREATE TABLE all_users_sink (
    database_name STRING,
    table_name    STRING,
    `id`          DECIMAL(20, 0) NOT NULL,
    name          STRING,
    address       STRING,
    phone_number  STRING,
    email         STRING,
    PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
  ) WITH (
    'connector'='iceberg',
    'catalog-name'='iceberg_catalog',
    'catalog-type'='hadoop',  
    'warehouse'='file:///tmp/iceberg/warehouse',
    'format-version'='2'
  );
三、流式写入 Iceberg  

1. 使用下面的 Flink SQL 语句将数据从 MySQL 写入 Iceberg 中:

 Flink SQL> INSERT INTO all_users_sink select * from user_source;

上述命令将会启动一个流式作业,源源不断将 MySQL 数据库中的全量和增量数据同步到 Iceberg 中。在 Flink UI (http://localhost:8081/#/job/running)上可以看到这个运行的作业: 

 然后我们就可以使用如下的命令看到 Iceberg 中的写入的文件:

docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/

如下所示:

. 使用下面的 Flink SQL 语句查询表 all_users_sink 中的数据:

 Flink SQL> SELECt * FROM all_users_sink;

在 Flink SQL CLI 中我们可以看到如下查询结果:

 修改 MySQL 中表的数据,Iceberg 中的表 all_users_sink 中的数据也将实时更新:

        在本文中,我们展示了如何使用 Flink CDC 同步 MySQL 分库分表的数据,快速构建 Icberg 实时数据湖。用户也可以同步其他数据库(Postgres/Oracle)的数据到 Hudi 等数据湖中。最后希望通过本文,能够帮助读者快速上手 Flink CDC 。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5707009.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存