Maxwell 安装和同步MySQL数据到Kafka

Maxwell 安装和同步MySQL数据到Kafka,第1张

Maxwell 安装和同步MySQL数据到Kafka

首先下载Maxwell,Maxwell下载地址:https://github.com/zendesk/maxwell/releases/tag/v1.28.2,现在版本1.30.0版本之上需要jdk11以上,建议下载1.30.0版本以下版本。下载完成后按照如下步骤进行配置,同步MySQL数据到Kafka,前提是MySQL需要开启Binlog日志,可以参考Canal章节设置。Maxwell不支持高可用搭建,但是支持断点还原,可以在执行失败时重新启动继续上次位置读取数据。

将下载好的安装包上传到node3并解压
[root@node3 ~]# cd /opt/apps
[root@node3 software]# tar -zxvf ./maxwell-1.28.2.tar.gz -C /opt
在MySQL中创建Maxwell的用户及赋权

Maxwell同步mysql数据到Kafka中需要将读取的binlog位置文件及位置信息等数据存入MySQL,所以这里创建maxwell数据库,及给maxwell用户赋权访问其他所有数据库。

mysql> CREATE database maxwell;
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECt, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> flush privileges;
修改配置“config.properties”文件

进入“/opt/maxwell-1.28.2”,修改“config.properties.example”为“config.properties”并配置:

producer=kafka
kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092
kafka_topic=maxwell_topic
#设置根据表将binlog写入Kafka不同分区,还可指定:[database, table, primary_key, transaction_id, thread_id, column]
producer_partition_by=table

#mysql 节点
host=node1
#连接mysql用户名和密码
user=maxwell
password=maxwell

注意:以上参数也可以在后期启动maxwell时指定参数方式来设置。

启动zookeeper及Kafka,并监控Kafka maxwell_topic
[root@node3 bin]# ./kafka-console-consumer.sh  --bootstrap-server node1:9092,node2:9092,node3:9092 --topic maxwell_topic
启动Maxwell
[root@node4 bin]# maxwell --config ../config.properties

注意以上启动也可以编写脚本:

#startMaxwell.sh 脚本内容:
/opt/maxwell-1.28.2/bin/maxwell --config /opt/maxwell-1.28.2/config.properties > ./log.txt 2>&1 &

修改执行权限:

chmod +x ./start_maxwell.sh 
向MySQL中增删改查写入数据
mysql> create database mysqldb;
Query OK, 1 row affected (0.01 sec)

mysql> use mysqldb;
Database changed
mysql> create table info(id int,name varchar(255),age int);
Query OK, 0 rows affected (0.04 sec)

mysql> insert into info values (10,"xx",20);
Query OK, 1 row affected (0.01 sec)

mysql> insert into info values (12,"xx",20);
Query OK, 1 row affected (0.04 sec)

mysql> update info set age = 100 where id = 10;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> delete from info where id = 10;
Query OK, 1 row affected (0.00 sec)

对应Kafka中的消息如下:

[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic maxwell_topic
{"database":"mysqldb","table":"info","type":"insert","ts":1641292268,"xid":2568,"commit":true,"data":{"id":12,"name":"xx","age":20}}
{"database":"mysqldb","table":"info","type":"update","ts":1641292291,"xid":2631,"commit":true,"data":{"id":10,"name":"xx","age":100},"old":{"age":20}}
{"database":"mysqldb","table":"info","type":"delete","ts":1641292302,"xid":2662,"commit":true,"data":{"id":10,"name":"xx","age":100}}
测试Maxwell断点续传

停止Maxwell,向MySQL中插入新的数据,重启Maxwell观察是否从上次消费到的binlog位置继续消费。

#使用kill -9 xxx 命令在node3停止Maxwell
#向MySQL继续插入数据
mysql> insert into info values (200,"bbb",20);
mysql> update info set age = 30 where id = 200;
#重新在node3启动Maxwell,可以观察到Kafka中继续上次binlog位置写入数据
{"database":"mysqldb","table":"info","type":"insert","ts":1619000378,"xid":4565,"commit":true,"data"
:{"id":200,"name":"bbb","age":20}}{"database":"mysqldb","table":"info","type":"update","ts":1619000391,"xid":4566,"commit":true,"data"
:{"id":200,"name":"bbb","age":30},"old":{"age":20}}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700064.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存