spring boot+kafka+canal实现监听MySQL数据库
一、zookeeper安装
kafka依赖于zookeeper,安装kafka前先安装zookeeper
下载地址:Apache ZooKeeper
本次示例使用的是 3.5.9版本,下载后将压缩文件上传至linux环境并且解压
解压后bin目录下有zoo_sample.cfg文件,zookeeper使用的配置文件是zoo.cfg,所以复制一份zoo_sample.cfg重命名为zoo.cfg。配置里面有端口等信息,默认端口为2181
然后启动zookeeper,切换至bin目录下,执行命令: ./zkServer.sh start
停止:./zkServer.sh stop
二、kafka安装
下载地址:Apache Kafka
这里选择的版本为 kafka_2.12-2.8.1.tgz
上传至linux然后解压如下图
修改配置:
1、server.properties 主要修改kafka的端口,以及listeners 两个ip换成安装的服务器ip,zookeeper所在服务器的ip和端口,日志记录目录等。
broker.id=0 port=9092 listeners=PLAINTEXT://ip:9092 advertised.listeners=PLAINTEXT://ip:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/mq/kafka/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
2、consumer.properties 主要配置group.id,和springboot配置的群组id一致(我没试过不一致会不会失败)
bootstrap.servers=localhost:9080
group.id=default_consumer_group
3、zoopeeker.properties zookeeper的相关配置,不改zookeeper默认端口可以不用管。
启动kafka,,执行命令:bin/kafka-server-start.sh config/server.properties &
停止服务命令:bin/kafka-server-stop.sh
三、MySQL数据库支持canal监控配置
1、查看是否开启binlog模式
show binary logs; 如下为开启。
2、如果没有开启,需要更改mysql的my.cnf文件。文件所在目录不记得了,我的测试环境mysql是装在docker容器中,目录为etc/mysql下
配置如下:
[mysqld] lower_case_table_names=1 max_connections=1000 max_allowed_packet=512M skip-name-resolve 加入以下三行配置开启binlog
server_id=1 log-bin=mysql-bin binlog-format=ROW
四、安装canal
下载地址:Releases · alibaba/canal · GitHub
同样的解压到服务器,使用的版本为1.1.5
1、修改配置:conf目录下canal.properties 主要修改端口以及zookeeper的地址,消息中间件的类型配置为kafka
canal.port = 11111
canal.zkServers = ip:2181
canal.serverMode = kafka
2、conf/example目录下的instance.properties
#数据库地址
canal.instance.master.address=ip:3306
#日志名称,上面数据库语句查询出来的log_name
canal.instance.master.journal.name=mysql-bin.000001
#数据库账号
canal.instance.dbUsername=username
#数据库密码
canal.instance.dbPassword=password
#监听数据库表(可以配置多个、全数据库)
canal.instance.filter.regex=database.tablename
#kafka主题名称
canal.mq.topic=test
3、启动canal
sh startup.sh
五、springboot集成配置,监听来自kafka test主题的消息
添加依赖项: org.springframework.kafka spring-kafkaorg.springframework.kafka spring-kafka-testtest com.alibaba.otter canal.client1.1.3 yml文件配置: spring: kafka: listener: missing-topics-fatal: false bootstrap-servers: ip:9080 #指定kafka server的地址,集群配多个,中间,逗号隔开 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default_consumer_group #群组ID #enable-auto-commit: true #auto-commit-interval: 1000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer@Component public class ConsumerSms { /** * 定义此消费者接收topics = "demo"的消息 * @param record 变量代表消息本身,可以通过ConsumerRecord,?>类型的record变量来打印接收的消息的各种信息 */ @KafkaListener(topics = "test") public void listen (ConsumerRecord, ?> record){ System.out.printf(""+record.value()); } }
数据库中,给监听的表插入一条数据,可以看到监听程序输出:
{"data":[{"id":"4","bsm":"444"}],"database":"nmjf","es":1650967508000,"id":2316,"isDdl":false,"mysqlType":{"id":"bigint(0)","bsm":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"bsm":12},"table":"t_sms","ts":1650967508207,"type":"INSERT"}
到此集成成功
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)