Canal或maxwell实时采集MySQL数据到Kafka

Canal或maxwell实时采集MySQL数据到Kafka,第1张

Canal或maxwell实时采集MySQL数据到Kafka

文章目录

MySQL的binlog

1、编辑MySQL配置2、重启MySQL3、检测配置是否成功4、MySQL数据准备5、Kafka准备 Canal

1、下载Canal2、解压3、修改配置4、启动 Maxwell

1、准备工作2、安装和配置maxwell 比较Maxwell和CanalAppendix

MySQL的binlog

二进制日志以事件形式记录了所有的DDL和DML,还包含语句所执行的消耗的时间MySQL的binlog是事务安全型的开启binlog会有1%左右的性能损耗一般场景:MySQL主从复制

MySQL主从复制原理图

1、编辑MySQL配置
sudo find / -name my.cnf
sudo vim /etc/my.cnf

添加如下内容

server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=b1
参数说明server-idMySQL主从复制时,主从之间每个实例都有独一无二的IDlog-bin生成的日志文件的前缀binlog_formatbinlog格式binlog-do-db指定哪些库需要写到binlog;如果不配置,就会是所有库 binlog_format参数值参数值说明空间占用数据一致性statement语句级:记录每次一执行写 *** 作的语句小如果用binlog进行数据恢复,执行时间不同可能会导致数据不一致row行级:记录每次 *** 作后每行记录的变化大绝对支持mixedstatement的升级版小极端情况下仍会造成数据不一致 2、重启MySQL
sudo systemctl restart mysqld
3、检测配置是否成功
mysql -uroot -p123456
show variables like'%log_bin%';

查看log_bin相关变量

sudo ls -l /var/lib/mysql | grep mysql-bin

查看binlog文件

4、MySQL数据准备

被监控的MySQL数据

DROP DATAbase IF EXISTS b1;
CREATE DATAbase b1;
CREATE TABLE b1.t11 (f1 INT PRIMARY KEY,f2 INT);
CREATE TABLE b1.t22 (f1 INT PRIMARY KEY,f2 INT);
INSERT b1.t11 VALUES (1,10);
INSERT b1.t22 VALUES (1,10);

插入数据(数据同步时才插入)

INSERT b1.t11 VALUES (2,20),(3,NULL),(4,40);
INSERT b1.t22 VALUES (2,20),(3,NULL),(4,40);
5、Kafka准备

创建Kafka的主题

kafka-topics.sh 
--delete 
--zookeeper hadoop102:2181/kafka 
--topic t1

kafka-topics.sh 
--create 
--zookeeper hadoop102:2181/kafka 
--replication-factor 1 
--partitions 2 
--topic t1

起一个终端消费者,用于检测MySQL数据是否同步到Kafka

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic t1
Canal

Canal是Java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件使用场景:实时采集MySQL数据到Kafka工作原理:
1、Canal伪装自己为MySQL的Slave,向MySQL的Master发送dump协议
2、MySQL的Master收到dump请求,推送binary log给Canal
3、Canal解析binary log 1、下载Canal

下载地址:https://github.com/alibaba/canal/releases

wget http://github.com/alibaba/canal/releases/download/canal-1.1.6-alpha-1/canal.deployer-1.1.6-SNAPSHOT.tar.gz
2、解压
mkdir $B_HOME/canal
tar -zxvf canal.deployer-1.1.6-SNAPSHOT.tar.gz -C $B_HOME/canal/
cd $B_HOME/canal
3、修改配置

Canal配置有2种级别:server级别和instance级别
server级别:全局性的配置,一个sever可配置多个instance
instance级别:最小订阅MySQL

cd $B_HOME/canal

server配置

vim ./conf/canal.properties
# Canal服务器绑定ip地址和端口
canal.ip = hadoop102
# 服务模式:tcp、kafka、RocketMQ…
canal.serverMode = kafka
# Kafka地址
kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

instance配置

canal.destinations可配置多个实例,默认有个example

vim ./conf/example/instance.properties
# 要监控的MySQL地址
canal.instance.master.address=hadoop102:3306
# 监控哪些【库.表】(转义符需要双斜杠)
canal.instance.filter.regex=b1.t1.*

# 消息队列的主题
canal.mq.topic=t1
# 散列模式的分区数, 要和Kafka主题的分区数保持一致
canal.mq.partitionsNum=2

实例默认的数据库用户及其密码都是canal,所以创建一个同名用户

mysql -uroot -p123456
-- 创建用户canal,%表示所有地址可访问,密码canal,并授予所有库和表的指定权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* to 'canal'@'%' identified by 'canal';
4、启动
./bin/startup.sh

启动后 用jps 可看到CanalLauncher

Maxwell

Java编写的MySQL数据实时抓取软件,其抓取原理基于binlog

1、准备工作
mysql -uroot -p123456
    在MySQL中创建一个用于存储maxwell元数据的数据库
CREATE DATAbase `maxwell` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
    创建可以 *** 作maxwell库的用户,名为maxwell密码为maxwell
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell';
    给用户maxwell分配 *** 作其他数据库的权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'maxwell'@'%';
FLUSH PRIVILEGES;
2、安装和配置maxwell

下载

wget http://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz

解压

tar -zxvf maxwell-1.27.1.tar.gz -C $B_HOME/

配置

cd $B_HOME/maxwell-1.27.1
vim ./config.properties
# 日志级别
log_level=info
# 生产者
producer=kafka
# Kafka服务地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# Kafka主题
kafka_topic=t1
# 生产者分区依据:按照主键的hash
producer_partition_by=primary_key

# MySQL登录信息
host=hadoop102
user=maxwell
password=maxwell

启动maxwell

cd $B_HOME/maxwell-1.27.1
./bin/maxwell --config config.properties --filter exclude:b1.t22
可选参数说明--config配置文件--filter指定监控的表,建议传参(有语法提示)--daemon后台运行 比较Maxwell和Canal

1、Canal功能更强大、技术更成熟;Maxwell更轻
2、Maxwell不能直接支持高可用,Canal支持
3、Maxwell只支持JSON格式,Canal可自定义格式

输出格式比较

4、Maxwell可导出完整的历史数据用于初始化,步骤如下:

·4.1、全局配置加入client_id

vim $B_HOME/maxwell-1.27.1/config.properties
# 用于初始化维度表数据
client_id=maxwell_1

·4.2、执行初始化脚本(先重启maxwell,再执行该命令)

cd $B_HOME/maxwell-1.27.1/
./bin/maxwell-bootstrap 
--user maxwell 
--password maxwell 
--host hadoop102 
--database b1 
--table t11 
--client_id maxwell_1
Appendix encncanalkəˈnæln. 运河;[地理] 水道;[建] 管道;灌溉水渠maxwellˈmæksˌwɛln. 麦克斯韦(磁通量单位)dumpdʌmpv. 丢弃;倾销;转存(计算机数据);n. 垃圾场;grantɡræntv. 授予;承认n. 补助金;bootstrapˈbuːtstræpn. (靴筒后的)靴襻;[计] 引导程序,辅助程序

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701742.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存