使用canal保持mysql与kafka数据同步

使用canal保持mysql与kafka数据同步,第1张

使用canal保持mysql与kafka数据同步

1.下载canel

https://github.com/alibaba/canal/releases
2. 开启 MySQL 的 binlog 配置 如果你忘记了my.cnf的路径 find / -name my.cnf
cd /etc
vi my.cnf
#打开my.cnf(window my.ini) 在【mysqld】块中添加 
server-id=1
log-bin=mysql-bin 
binlog_format=row 
binlog-do-db=你数据库的名字 多个用逗号隔开(这里是需要监控的数据库的名字)
这里我的数据库名称是mydemo

打开官网 下载

canal.deployer-1.1.4.tar.gz
# 创建1个文件夹
mkdir -p /opt/soft/canal 
# 将tar.gz移动到canal下 
mv canal.deployer-1.1.4.tar.gz /opt/soft/canal 
# 解压 
tar -zxf canal.deployer-1.1.4.tar.g

进入解压后的conf 目录

cd /opt/soft/canal/conf
vi canal.properties
# 可选项: tcp(默认), kafka, RocketMQ# 更改模式,直接把数据扔进 Kafka
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置:
canal.mq.servers = 192.168.xx.xx:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
#canal.mq.flatMessage = true # 使用文本格式(JSON)进行传输,否则 Kafka 里扔进去的是二进
制数据,虽然不影响,但是看起来不方便
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false

 修改一下几处

cd example
vi instance.properties
# 修改 conf/example/instance.properties
# 按需修改成自己的数据库信息
#################################################
#需要同步的mysql地址,端口
canal.instance.master.address=192.168.x.x:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = root
canal.instance.dbPassword = xx #需要同步的具体表,也可以使用正则表达式监听多张表或者库
#canal.instance.filter.regex=realtime.tpp_vehicle_through_points_history
...
# mq config# topic的名称
canal.mq.topic=mytest
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
# 库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id

 不多说

vi /etc/fprofile

#canal
export CANAL_HOME=/opt/soft/canal
export PATH=$PATH:$CANAL_HOME/bin
添加环境变量
source /etc/profile

启动canal

#配置好/etc/profile
startup.sh

6.开启kafka监控查看数据的录入

kafka-console-consumer.sh --bootstrap-server 192.168.80.181:9092 --topic mykafkademo

 消息队列中的名称是在这设置的

然后启动你的mysql 进入mydemo数据库中插入数据 kafka就能实时读取数据了

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5709326.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存