在上篇文章中我们介绍了使用TiDB Binlog将数据同步至下游的Mysql 中,本篇我们学习下使用TiDB Binlog工具将数据同步至Kafka中自定义业务逻辑,比如可以做TIDB和ES、MongoDB 或 Redis的数据同步,这功能就和Canal解析Mysql的binlog功能相差不大。如果还不了解TiDB Binlog工具的也可以参考我的上篇博客:
https://blog.csdn.net/qq_43692950/article/details/121597230
注意:在做实验前,请确保已经配置好Kafka环境:不了解的可以参考下面一篇我的博客:
二、TiDB Binlog 配置消息中间件KafKa集群搭建与使用: https://blog.csdn.net/qq_43692950/article/details/110648852
在上篇文章中,我们使用tiup 扩容出了一个pump 和 一个 drainer,我们先看下现在的集群架构:
但上篇文章我们讲解的是TIDB 到 Mysql之间的同步,如果换成Kafka只需修改下配置文件即可,但考虑到有些小伙伴可能没有看过前面我们的系列教程,这里我们还是通过扩容的方式扩容出pump 和 drainer,如果已经安装过pump 和 drainer,直接修改配置即可:
tiup cluster edit-config tidb-test
drainer_servers: - host: 192.168.40.162 ssh_port: 22 port: 8249 deploy_dir: /tidb-deploy/drainer-8249 data_dir: /tidb-data/drainer-8249 log_dir: /tidb-deploy/drainer-8249/log config: syncer.db-type: kafka syncer.to.kafka-addrs: 192.168.40.1:9092 syncer.to.kafka-version: 2.6.0 syncer.to.topic-name: tidb-test arch: amd64 os: linux
修改上述kafka的指向即可,如果是kafka集群,用英文逗号隔开即可。
下面我们再讲下扩容的方式,没有安装pump 和 drainer的就用看下面的方式:
编写扩容配置
vi scale-out-binlog.yaml
写入以下内容:
pump_servers: - host: 192.168.40.160 ssh_port: 22 port: 8250 deploy_dir: /tidb-deploy/pump-8250 data_dir: /tidb-data/pump-8250 log_dir: /tidb-deploy/pump-8250/log config: gc: 7 storage.stop-write-at-available-space: 200MB arch: amd64 os: linux drainer_servers: - host: 192.168.40.162 ssh_port: 22 port: 8249 deploy_dir: /tidb-deploy/drainer-8249 data_dir: /tidb-data/drainer-8249 log_dir: /tidb-deploy/drainer-8249/log config: syncer.db-type: kafka syncer.to.kafka-addrs: 192.168.40.1:9092 syncer.to.kafka-version: 2.6.0 syncer.to.topic-name: tidb-test arch: amd64 os: linux
注意 storage.stop-write-at-available-space 这个参数表示存储空间低于指定值时不再接收 binlog 写入请求,默认为10G ,如果硬盘没这么大,就调小一点。
开始扩容:
tiup cluster scale-out tidb-test scale-out-binlog.yaml -u root -p
等待一会就可以看到集群中已经有pump 和 drainer了:
下一步还要开启TIDB的binglog配制:
tiup cluster edit-config tidb-test
修改 server_configs 的配制:
server_configs: tidb: binlog.enable: true binlog.ignore-error: true
重新加载集群:
tiup cluster reload tidb-test
使用mysql 客户端连接tidb,查看bnlog是否已经开启:
show variables like "log_bin";
ON即为开启状态。
看下pump和drainer的状态:
show pump status;
show drainer status;
状态都为online在线状态。
下载官方demo
https://github.com/pingcap/tidb-tools/tree/master/tidb-binlog/driver/example/kafkaReader
官方demo是直接用的Java Kafka Api,本篇我们使用SpringBoot 的 spring-kafka 。
下载之后需要将三个文件复制到自己的SpringBoot项目中:
需要用这三个工具进行解析数据,不然解析出来的是乱码,这点可以去Tidb的社区看下:
POM文件引入的主要依赖:
org.springframework.kafka spring-kafkacom.google.protobuf protobuf-java3.9.0 com.google.protobuf protobuf-java-util3.9.1
application配制信息:
server: port: 8080 spring: kafka: # kafka服务器地址(可以多个) # bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092 bootstrap-servers: 192.168.40.1:9092 consumer: # 指定一个默认的组名 group-id: kafkaGroup # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: earliest # key/value的反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer producer: # key/value的序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 批量抓取 batch-size: 65536 # 缓存容量 buffer-memory: 524288 #失败重试次数 retries: 3 # 服务器地址 # bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
注意consumer.value-deserializer这个要使用ByteArrayDeserializer,主要发送端就是byte[],我们只能配合:
日志监听:
@Slf4j @Component public class TidbConsumer { @KafkaListener(topics = "tidb-test") public void receive3(ConsumerRecord四、测试consumer) throws Exception { System.out.println("tidb bing-log Listener >> "); //binglog对象 BinLogInfo.Binlog binlog = BinLogInfo.Binlog.parseFrom(consumer.value()); // *** 作类型 0 DML 1 DDL BinLogInfo.BinlogType type = binlog.getType(); log.info(binlog.toString()); log.info(" *** 作类型:{} ", type); //解析内容 if (BinLogInfo.BinlogType.DML == type) { BinLogInfo.DMLData dmlData = binlog.getDmlData(); if (dmlData.getTablesCount() == 0) { return; } dmlData.getTablesList().forEach(table -> { String db = table.getSchemaName(); log.info("更新数据库:{}", db); String tableName = table.getTableName(); log.info("更新数据表:{}", tableName); List columnInfoList = table.getColumnInfoList(); List MutationsList = table.getMutationsList(); MutationsList.forEach(mutation -> { BinLogInfo.MutationType mutationType = mutation.getType(); log.info(" *** 作类型:{}", mutationType); List columnsList = mutation.getRow().getColumnsList(); //解析更新后的数据 for (int i = 0; i < columnInfoList.size(); i++) { String filedName = columnInfoList.get(i).getName(); log.info("字段:{} ,更新后的值:{} ", filedName, columnsList.get(i)); } }); }); } else if (BinLogInfo.BinlogType.DDL == type) { BinLogInfo.DDLData ddlData = binlog.getDdlData(); String db = ddlData.getSchemaName(); String tableName = ddlData.getTableName(); String ddlSql = ddlData.getDdlQuery().toStringUtf8(); log.info("更新数据库:{}",db); log.info("更新数据表:{}", tableName); log.info("DDL :{}", ddlSql); } else { throw new Exception("analysis binglog err!"); } } }
测试表结构:
添加数据:
insert into user(name,age) values('bxc',10);
BinLogInfo.Binlog toString信息:
type: DML commit_ts: 429572910085570562 dml_data { tables { schema_name: "testdb" table_name: "user" column_info { name: "id" mysql_type: "int" is_primary_key: true } column_info { name: "name" mysql_type: "varchar" is_primary_key: false } column_info { name: "age" mysql_type: "int" is_primary_key: false } mutations { type: Insert row { columns { int64_value: 212247 } columns { string_value: "bxc" } columns { int64_value: 10 } } } 5: { 1: "PRIMARY" 2: "id" } } }
解析打印信息:
更新数据:
update user set age = 20 where name = 'bxc';
删除数据:
delete from user where name = 'bxc';
创建一个新的表:
CREATE TABLE `user_copy` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=242214;
BinLogInfo.Binlog toString信息:
type: DDL commit_ts: 429573182230102017 ddl_data { schema_name: "testdb" table_name: "user_copy" ddl_query: "CREATE TABLE `user_copy` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=242214" }
喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)