MaxWell的使用

MaxWell的使用,第1张

Maxwell的使用 1、Maxwell的概述 1.1 官网简介

​ This is Maxwell’s daemon, an application that reads MySQL binlogs and writes row updates as JSON to Kafka, Kinesis, or other streaming platforms. Maxwell has low operational overhead, requiring nothing but mysql and a place to write to. Its common use cases include ETL, cache building/expiring, metrics collection, search indexing and inter-service communication. Maxwell gives you some of the benefits of event sourcing without having to re-architect your entire platform. (官网简介)

官网地址:Maxwell’s Daemon (maxwells-daemon.io)

Maxwell 主要用于实时数据同步

1.2 运行原理 1.2.1 MySQL的主从复制

​ 当数据量和并发量特别高时,单个节点的MySQL就难以承受并发量。因此我们就需要对单个节点MySQL进行扩展,形成MySQL集群。MySQL集群中主节点为Master,从节点为slave。

​ 这里进行主从复制后,我们就可以将读和写进行分离,master主库用于写,slave从库用于读,有效解决高并发。并且进行数据的多个备份,保障了数据的安全。

相应细节:

① Master主库将改变记录,写到二进制日志(binary log)中;

② Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log);

③ Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

1.2.2 Maxwell工作原理

​ Maxwell的工作原理很简单,就是把自己伪装成MySQL的一个slave,然后以slave的身份假装从MySQL(master)复制数据。

1.3 Maxwell和Canal的对比
对比CanalMaxwell
语言javajava
数据格式格式自由json
采集数据模式增量全量/增量
数据落地定制支持kafka等多种平台
2、Maxwell的使用 2.1 Maxwell下载安装 2.1.1 Maxwell下载

注意:1.30版本以后支持JDK11,不支持JDK1.8

这里我们下载1.29.2

2.1.2 安装部署

1)上传到Linux系统

2)解压到指定目录

[hadoop@hadoop101 software]$ tar -zxvf maxwell-1.29.2.tar.gz -C ../module/

[hadoop@hadoop101 module]$ ll | grep max
drwxrwxr-x.  4 hadoop hadoop  200 Jan 28 11:42 maxwell-1.29.2
2.1.3 MySql环境准备

MySql需要开启binlog设置

(1)修改mysql的配置文件,开启MySQL Binlog设置

[hadoop@hadoop101 module]$ sudo vim /etc/my.cnf
[sudo] password for hadoop: 

# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
# 添加下面内容
server_id=1
log-bin=mysql-bin
binlog_format=row
#binlog-do-db=test_maxwell

重启MySQL

[hadoop@hadoop101 module]$ systemctl restart mysqld

登录MySQL,查看:

(2)进入/var/lib/mysql目录,查看MySQL生成的binlog文件

注:MySQL生成的binlog文件初始大小一定是154字节,然后前缀是log-bin参数配置的,后缀是默认从.000001,然后依次递增。除了binlog文件文件以外,MySQL还会额外生产一个.index索引文件用来记录当前使用的binlog文件。

2.1.4 初始化元数据库

(1)在MySQL中建立一个maxwell库用于存储Maxwell的元数据

mysql> CREATE DATABASE maxwell;

(2)设置mysql用户密码安全级别

mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;

(3)分配账号 *** 作该数据库

mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';

(4)分配这个账号可以监控其他数据库的权限

mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';

(5)刷新mysql权限

mysql> flush privileges;
2.1.5 启动Maxwell

方式一:使用命令行参数

[hadoop@hadoop101 maxwell-1.29.2]$ bin/maxwell \
> --user='maxwell' \
> --password='123456' \
> --host='localhost' \
> --producer=stdout
# --user 数据库的用户名
# --password 登录密码
# --host 数据库主机
# --producer 生产者模式:stdout:控制台   kafka:kafka集群
# 完整指令如下:
bin/maxwell --user='maxwell' --password='123456' --host='localhost' --producer=stdout

方式二:通过配置文件启动

[hadoop@hadoop101 maxwell-1.29.2]$ cp config.properties.example config.properties
[hadoop@hadoop101 maxwell-1.29.2]$ vim config.properties

启动maxwell

[hadoop@hadoop101 maxwell-1.29.2]$ bin/maxwell --config ./config.propertie
## --config 指定启动的配置文件

这里启动hive并做了插入 *** 作,可以看出,数据已经监控到了。

2.2 常用案例 2.2.1 监控MySQL数据,并输出控制台

编写启动命令:

[hadoop@hadoop101 maxwell-1.29.2]$ bin/maxwell \
> --user='maxwell' \
> --password='123456' \
> --host='localhost' \
> --producer=stdout

向MySQL中插入数据:

mysql> create database testmaxwell;
Query OK, 1 row affected (0.00 sec)
mysql> use testmaxwell;
Database changed
mysql> create table emplyee(id int,name varchar(255));
Query OK, 0 rows affected (0.10 sec)
mysql> insert into emplyee values(1,'tom');
Query OK, 1 row affected (0.00 sec)

mysql> insert into emplyee values(2,'jerry');
Query OK, 1 row affected (0.00 sec)

mysql> insert into emplyee values(3,'tony');
Query OK, 1 row affected (0.00 sec)

可以看到Maxwell控制台:

相应数据以json格式发送到控制台来了。

2.2.2 监控MySQL数据到Kafka中

1)、启动kafka集群

kafka集群启动完成。

2)、创建kafka主题

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 2 --partitions 2 --topic maxwell
Created topic "maxwell".

3)、为方便展示,开始kafka消费者

[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic maxwell

4)、编写maxwell启动命令

[hadoop@hadoop101 maxwell-1.29.2]$ bin/maxwell \
> --user='maxwell' \
> --password='123456' \
> --host='hadoop101' \
> --producer=kafka \
> --kafka.bootstrap.servers=hadoop101:9092 \
> --kafka_topic=maxwell

我们向mysql的表中添加数据

kafka消费者控制台输出数据:

注:
​ ① 若需要指定监控某些表,可使用–filter参数,过滤表。Filtering - Maxwell’s Daemon (maxwells-daemon.io)

​ ② 对数据库进行初始全量导入数据:

​ 需要修改Maxwell的元数据,触发数据初始化机制,在mysql的maxwell库中bootstrap表中插入一条数据,写明需要全量数据的库名和表名,可在官网参考配置。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/sjk/991785.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存