- canal的搭建及使用
- 1、开启mysql binlog
- 2、上传解压配置环境变量
- 3、修改配置文件conf/example/instance.properties
- 4、修改配置文件conf/canal.properties
- 5、测试
- 6、可以利用Kafka的kafka-console-consumer或者Kafka Tools查看test这个topic的数据:
cp /usr/share/mysql/my-medium.cnf /etc/my.cnf
修改my.cnf
vim /etc/my.cnf
[mysqld] # 打开binlog log-bin=mysql-bin # 选择ROW(行)模式 binlog-format=ROW # 配置MySQL replaction需要定义,不要和canal的slaveId重复 server_id=1
改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:
service mysqld restart
进入mysql查看,后面为on开启
show variables like 'log_bin';2、上传解压配置环境变量
mkdir canal tar -zxvf canal.deployer-1.1.4.tar.gz -C canal3、修改配置文件conf/example/instance.properties
# mysql 地址 canal.instance.master.address=master:3306 # mysql用户名 canal.instance.dbUsername=root # mysql密码 canal.instance.dbPassword=123456 # 数据写入kafka 的topic canal.mq.topic=example # 监控gma数据库,不同的表发送到表名的topic上, tast mysql的库名 canal.mq.dynamicTopic=student\..*4、修改配置文件conf/canal.properties
# zk地址 canal.zkServers = master:2181,node1:2181,node2:2181 # 数据保存到kafka canal.serverMode = kafka # kafka集群地址 canal.mq.servers = master:9092,node1:9092,node2:9092
启动
cd /usr/local/soft/canal/bin ./startup.sh #配了环境变量其实可以直接startup.sh启动5、测试
use `student`; # 创建表 CREATE TABLE `order` ( id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键', order_id VARCHAR(64) COMMENT '订单ID', amount DECIMAL(10, 2) COMMENT '订单金额', create_time DATETIME COMMENT '创建时间', UNIQUE uniq_order_id (`order_id`) ) COMMENT '订单表'; # 插入数据 INSERT INTO `order`(order_id, amount) VALUES ('10086', 999); UPDATe `order` SET amount = 10087 WHERe order_id = '10086'; DELETE FROM `order` WHERe order_id = '10086';6、可以利用Kafka的kafka-console-consumer或者Kafka Tools查看test这个topic的数据:
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic example
mysql插入更新删除后,kafka消费的状况
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)