1.下载canel
https://github.com/alibaba/canal/releases2. 开启 MySQL 的 binlog 配置 如果你忘记了my.cnf的路径 find / -name my.cnf
cd /etc vi my.cnf #打开my.cnf(window my.ini) 在【mysqld】块中添加 server-id=1 log-bin=mysql-bin binlog_format=row binlog-do-db=你数据库的名字 多个用逗号隔开(这里是需要监控的数据库的名字) 这里我的数据库名称是mydemo
打开官网 下载
canal.deployer-1.1.4.tar.gz# 创建1个文件夹 mkdir -p /opt/soft/canal # 将tar.gz移动到canal下 mv canal.deployer-1.1.4.tar.gz /opt/soft/canal # 解压 tar -zxf canal.deployer-1.1.4.tar.g
进入解压后的conf 目录
cd /opt/soft/canal/conf vi canal.properties # 可选项: tcp(默认), kafka, RocketMQ# 更改模式,直接把数据扔进 Kafka canal.serverMode = kafka # ... # kafka/rocketmq 集群配置: canal.mq.servers = 192.168.xx.xx:9092 canal.mq.retries = 0 # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下请将该值改大, 建议50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 #canal.mq.flatMessage = true # 使用文本格式(JSON)进行传输,否则 Kafka 里扔进去的是二进 制数据,虽然不影响,但是看起来不方便 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all # kafka消息投递是否使用事务 canal.mq.transaction = false
修改一下几处
cd example vi instance.properties # 修改 conf/example/instance.properties # 按需修改成自己的数据库信息 ################################################# #需要同步的mysql地址,端口 canal.instance.master.address=192.168.x.x:3306 # username/password,数据库的用户名和密码 ... canal.instance.dbUsername = root canal.instance.dbPassword = xx #需要同步的具体表,也可以使用正则表达式监听多张表或者库 #canal.instance.filter.regex=realtime.tpp_vehicle_through_points_history ... # mq config# topic的名称 canal.mq.topic=mytest # 针对库名或者表名发送动态topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 # 库名.表名: 唯一主键,多个表之间用逗号分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id
不多说
vi /etc/fprofile
#canal export CANAL_HOME=/opt/soft/canal export PATH=$PATH:$CANAL_HOME/bin 添加环境变量 source /etc/profile
启动canal
#配置好/etc/profile startup.sh
6.开启kafka监控查看数据的录入
kafka-console-consumer.sh --bootstrap-server 192.168.80.181:9092 --topic mykafkademo
消息队列中的名称是在这设置的
然后启动你的mysql 进入mydemo数据库中插入数据 kafka就能实时读取数据了
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)