大数据高级开发工程师——大数据相关工具之三 Maxwell

大数据高级开发工程师——大数据相关工具之三 Maxwell,第1张

数据高级开发工程师——大数据相关工具之三 Maxwell

文章目录
  • 大数据相关工具
    • Maxwell数据实时同步工具
      • Maxwell 简介
      • MySQL Binlog 介绍
        • 1. Binlog简介
        • 2. Binlog的日志格式
      • Mysql 实时数据同步方案对比
      • 开启MySQL的Binlog
      • Maxwell 安装部署
      • Maxwell 实时采集案例

大数据相关工具 Maxwell数据实时同步工具 Maxwell 简介
  • Maxwell 是一个能实时读取 MySQL 二进制日志文件binlog,并生成 Json格式的消息,作为生产者发送给 Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的 DML 指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。
  • 官网地址:http://maxwells-daemon.io
  • GitHub地址:https://github.com/zendesk/maxwell
  • Maxwell 主要功能:
    • 支持 select * from table 方式进行全量数据初始化;
    • 支持在主库发生 failover 后,自动恢复 binlog 位置,实现断点续传;
    • 可以对数据进行分区,解决数据倾斜问题,发送到 Kafka 的数据支持库、表、列等级别的数据分区;
    • 工作方式是伪装为 slave 接收 binlog events,然后根据 schema 信息拼装,可以接受 ddl、xid、row 等event。
MySQL Binlog 介绍 1. Binlog简介
  • MySQL 中有以下几种日志
    • 错误日志:记录在启动,运行或停止mysqld时遇到的问题
    • 通用日志:记录建立的客户端连接和执行的语句
    • 二进制日志(bin log):记录更改数据的语句
    • 中继日志:从服务器 复制 主服务器接收的数据更改
    • 慢查询日志:记录所有执行时间超过 long_query_time 秒的所有查询或不使用索引的查询
    • DDL 日志(元数据日志):元数据 *** 作由 DDL 语句执行
  • 在默认情况下,系统仅仅打开错误日志,关闭了其他所有日志,以达到尽可能减少IO损耗提高系统性能的目的,但是在一般稍微重要一点的实际应用场景中,都至少需要打开二进制日志,因为这是MySQL很多存储引擎进行增量备份的基础,也是MySQL实现复制的基本条件。
  • MySQL 的二进制日志 binlog 可以说是 MySQL 最重要的日志,它记录了所有的 DDL 和 DML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。binlog 的主要目的是复制和恢复。
    • MySQL主从复制:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给slaves 来达到 master-slave 数据一致的目的。
    • 数据恢复:通过使用 mysqlbinlog 工具来使恢复数据。
2. Binlog的日志格式
  • 记录在二进制日志中的事件的格式取决于二进制记录格式,支持三种格式类型:
    • Statement:基于 SQL 语句的复制(statement-based replication, SBR)
    • Row:基于行的复制(row-based replication, RBR)
    • Mixed:混合模式复制(mixed-based replication, MBR)
  • Statement:
    • 每一条会修改数据的sql都会记录在binlog中。
    • 优点:不需要记录每一行的变化,减少了binlog日志量,节约了IO,提高了性能。
    • 缺点:在进行数据同步的过程中有可能出现数据不一致,比如 update tt set create_date=now(),如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
  • Row:
    • 它不记录sql语句上下文相关信息,仅保存哪条记录被修改。
    • 优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,它只记录执行后的效果。
    • 缺点:每行数据的修改都会记录,最明显的就是update语句,导致更新多少条数据就会产生多少事件,占用较大空间。
  • Mixed:
    • 从5.1.8版本开始,MySQL提供了Mixed格式,实际上就是Statement与Row的结合。在Mixed模式下,一般的复制使用Statement模式保存binlog,对于Statement模式无法复制的 *** 作使用Row模式保存binlog, MySQL会根据执行的SQL语句选择日志保存方式(因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为Row模式)。
    • 优点:节省空间,同时兼顾了一定的一致性。
    • 缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。
Mysql 实时数据同步方案对比
  • mysql 数据实时同步可以通过解析mysql的 binlog 的方式来实现,解析binlog可以有多种方式,可以通过canal,或者maxwell等各种方式实现。以下是各种抽取方式的对比介绍:
对比项CanalMaxwellmysql_streamer开源方阿里巴巴zendeskYelp语言JavaJavaPython活跃度活跃活跃活跃HA支持定制支持数据落地定制Kafka等Kafka分区支持支持不支持bootstrap不支持支持支持数据格式格式自由json(固定)json(固定)文档较详细较详细粗略随机读支持支持支持
  • 其中canal由 Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费 canal 解析到的数据。
  • Maxwell相对于canal的优势是使用简单,Maxwell比Canal更加轻量级,它直接将数据变更输出为json字符串,不需要再编写客户端。对于缺乏基础建设,短时间内需要快速迭代的项目和公司比较合适。
  • 另外Maxwell 有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
开启MySQL的Binlog
  • mysql的版本尽量不要太低,也不要太高,最好使用5.6及以上版本。
  • 添加mysql普通用户maxwell,因为maxwell这个软件默认用户使用的是maxwell这个用户。
-- 校验级别最低,只校验密码长度
mysql> set global validate_password_policy=LOW;
mysql> set global validate_password_length=6;

-- 创建maxwell库(启动时候会自动创建,不需手动创建)和用户
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

-- 刷新权限
mysql> flush privileges;
  • 修改配置文件 sudo vim /etc/my.cnf,添加或修改以下三行配置:
# binlog日志名称前缀
log-bin=/var/lib/mysql/mysql-bin
# binlog日志格式
binlog-format=ROW
# 唯一标识,这个值的区间是:1到(2^32)-1
server_id=1
  • 重启 MySQL 服务
sudo service mysqld restart
  • 验证 binlog 是否配置成功
show variables like '%log_bin%';
  • 查看binlog日志文件生成:进入 /var/lib/mysql 目录,查看binlog日志文件

Maxwell 安装部署
  • 安装包下载&解压:
wget https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz
tar -zxvf maxwell-1.21.1.tar.gz -C /bigdata/install
  • 修改maxwell配置文件:
cd /bigdata/install/maxwell-1.21.1/
cp config.properties.example config.properties
vim config.properties
  • 配置文件config.properties 内容如下:
# choose where to produce data to
producer=kafka
# list of kafka brokers
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
# mysql login info
host=node03
port=3306
user=maxwell
password=123456
# kafka topic to write to
kafka_topic=maxwell
Maxwell 实时采集案例
  • 关于 kafka 的安装和使用,请参考我之前的博文:JavaEE 企业级分布式高级架构师(十二)Kafka学习笔记
  • 启动kafka集群和zookeeper集群
zkServer.sh start
bin/kafka-server-start.sh -daemon config/server.properties
  • 创建 topic
# /usr/apps/kafka
bin/kafka-topics.sh --create --bootstrap-server 192.168.254.128:9092 --replication-factor 2 --partitions 3 --topic maxwell
# 查看topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.254.128:9092

  • node01 启动 kafka 自带控制台消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.254.128:9092 --topic maxwell --from-beginning
  • node03 启动maxwell服务
maxwell

  • 插入数据进行测试,向mysql表中插入一条数据,查看kafka是否能够接收到数据
CREATE DATAbase `test_db` ;

USE `test_db`;



DROp TABLE IF EXISTS `user`;

CREATE TABLE `user` (
  `id` varchar(10) NOT NULL,
  `name` varchar(10) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


-- 插入数据
insert  into `user`(`id`,`name`,`age`) values  ('1','xiaokai',20);
-- 修改数据
update `user` set age= 30 where id='1';
-- 删除数据
delete from `user` where id='1';

  • 输出 json 数据字段说明:
字段说明database数据库名称table表名称type *** 作类型,包括 insert、update、delete 等ts *** 作时间戳xid事务idcommit同一个 xid 代表同一个事务,事务的最后一条语句会有 commitdata最新的数据,修改后的数据old旧数据,修改前的数据

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5695827.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存