MySQL的binlog
1、编辑MySQL配置2、重启MySQL3、检测配置是否成功4、MySQL数据准备5、Kafka准备 Canal
1、下载Canal2、解压3、修改配置4、启动 Maxwell
1、准备工作2、安装和配置maxwell 比较Maxwell和CanalAppendix
MySQL的binlog二进制日志以事件形式记录了所有的DDL和DML,还包含语句所执行的消耗的时间MySQL的binlog是事务安全型的开启binlog会有1%左右的性能损耗一般场景:MySQL主从复制
1、编辑MySQL配置MySQL主从复制原理图
sudo find / -name my.cnf
sudo vim /etc/my.cnf
添加如下内容
server-id=1 log-bin=mysql-bin binlog_format=row binlog-do-db=b1
sudo systemctl restart mysqld3、检测配置是否成功
mysql -uroot -p123456
show variables like'%log_bin%';
查看log_bin相关变量
sudo ls -l /var/lib/mysql | grep mysql-bin
4、MySQL数据准备查看binlog文件
被监控的MySQL数据
DROP DATAbase IF EXISTS b1; CREATE DATAbase b1; CREATE TABLE b1.t11 (f1 INT PRIMARY KEY,f2 INT); CREATE TABLE b1.t22 (f1 INT PRIMARY KEY,f2 INT); INSERT b1.t11 VALUES (1,10); INSERT b1.t22 VALUES (1,10);
插入数据(数据同步时才插入)
INSERT b1.t11 VALUES (2,20),(3,NULL),(4,40); INSERT b1.t22 VALUES (2,20),(3,NULL),(4,40);5、Kafka准备
创建Kafka的主题
kafka-topics.sh --delete --zookeeper hadoop102:2181/kafka --topic t1 kafka-topics.sh --create --zookeeper hadoop102:2181/kafka --replication-factor 1 --partitions 2 --topic t1
起一个终端消费者,用于检测MySQL数据是否同步到Kafka
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic t1Canal
Canal是Java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件使用场景:实时采集MySQL数据到Kafka工作原理:
1、Canal伪装自己为MySQL的Slave,向MySQL的Master发送dump协议
2、MySQL的Master收到dump请求,推送binary log给Canal
3、Canal解析binary log
1、下载Canal
下载地址:https://github.com/alibaba/canal/releases
wget http://github.com/alibaba/canal/releases/download/canal-1.1.6-alpha-1/canal.deployer-1.1.6-SNAPSHOT.tar.gz2、解压
mkdir $B_HOME/canal tar -zxvf canal.deployer-1.1.6-SNAPSHOT.tar.gz -C $B_HOME/canal/ cd $B_HOME/canal3、修改配置
Canal配置有2种级别:server级别和instance级别
server级别:全局性的配置,一个sever可配置多个instance
instance级别:最小订阅MySQL
cd $B_HOME/canal
server配置
vim ./conf/canal.properties
# Canal服务器绑定ip地址和端口 canal.ip = hadoop102 # 服务模式:tcp、kafka、RocketMQ… canal.serverMode = kafka # Kafka地址 kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
instance配置
canal.destinations可配置多个实例,默认有个example
vim ./conf/example/instance.properties
# 要监控的MySQL地址 canal.instance.master.address=hadoop102:3306 # 监控哪些【库.表】(转义符需要双斜杠) canal.instance.filter.regex=b1.t1.* # 消息队列的主题 canal.mq.topic=t1 # 散列模式的分区数, 要和Kafka主题的分区数保持一致 canal.mq.partitionsNum=2
实例默认的数据库用户及其密码都是canal,所以创建一个同名用户
mysql -uroot -p123456
-- 创建用户canal,%表示所有地址可访问,密码canal,并授予所有库和表的指定权限 GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* to 'canal'@'%' identified by 'canal';4、启动
./bin/startup.sh
Maxwell启动后 用jps 可看到CanalLauncher
Java编写的MySQL数据实时抓取软件,其抓取原理基于binlog
1、准备工作mysql -uroot -p123456
- 在MySQL中创建一个用于存储maxwell元数据的数据库
CREATE DATAbase `maxwell` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
- 创建可以 *** 作maxwell库的用户,名为maxwell密码为maxwell
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell';
- 给用户maxwell分配 *** 作其他数据库的权限
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'maxwell'@'%'; FLUSH PRIVILEGES;2、安装和配置maxwell
下载
wget http://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz
解压
tar -zxvf maxwell-1.27.1.tar.gz -C $B_HOME/
配置
cd $B_HOME/maxwell-1.27.1 vim ./config.properties
# 日志级别 log_level=info # 生产者 producer=kafka # Kafka服务地址 kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 # Kafka主题 kafka_topic=t1 # 生产者分区依据:按照主键的hash producer_partition_by=primary_key # MySQL登录信息 host=hadoop102 user=maxwell password=maxwell
启动maxwell
cd $B_HOME/maxwell-1.27.1 ./bin/maxwell --config config.properties --filter exclude:b1.t22
1、Canal功能更强大、技术更成熟;Maxwell更轻
2、Maxwell不能直接支持高可用,Canal支持
3、Maxwell只支持JSON格式,Canal可自定义格式
输出格式比较
4、Maxwell可导出完整的历史数据用于初始化,步骤如下:
·4.1、全局配置加入client_id
vim $B_HOME/maxwell-1.27.1/config.properties
# 用于初始化维度表数据 client_id=maxwell_1
·4.2、执行初始化脚本(先重启maxwell,再执行该命令)
cd $B_HOME/maxwell-1.27.1/ ./bin/maxwell-bootstrap --user maxwell --password maxwell --host hadoop102 --database b1 --table t11 --client_id maxwell_1Appendix
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)