Canal会把自己伪装成从库和mysql做主从同步,Mysql需要开启binlog日志,Canal会监听mysql发生的变化,并把发生变化的类型(INSERT, UPDATE, DELETE),以及对应表名,与变化后得那条数据输入到Kafka中。下面从0开始搭建canal环境,跟着一步一步的走,最终就可以实现监听多个Mysql实例。
第一步:下载Canal下载地址: https://github.com/alibaba/canal/releases
下载后解压的文件夹目录如下图所示
进入config文件夹,并打开canal.properties,大部分不用动,需要改如下图这几个地方即可。
进入example,打开instance.properties配置文件,把下面这段全量替换掉原内容(注意备份一份原内容)
## mysql serverId , v1.0.26+ will autoGen ## v1.0.26版本后会自动生成slaveId,所以可以不用配置 # canal.instance.mysql.slaveId=0 # 数据库地址 canal.instance.master.address=127.0.0.1:3306 # binlog日志名称 canal.instance.master.journal.name=mysql-bin.000008 # mysql主库链接时起始的binlog偏移量 canal.instance.master.position=3431 # mysql主库链接时起始的binlog的时间戳 canal.instance.master.timestamp= canal.instance.master.gtid= # username/password # 在MySQL服务器授权的账号密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 字符集 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false # table regex .*\..*表示监听所有表 也可以写具体的表名,用,隔开 # canal.instance.filter.regex=.*\..* # canal-test3.order.* 匹配canal-test3库下,以order开头的表 是转义字符 # canal-test3.order0 匹配canal-test3库下的order0表 # canal-test3.* 匹配canal-test3库下的所有表 canal.instance.filter.regex=canal.* # mysql 数据解析表的黑名单,多个表用,隔开 canal.instance.filter.black.regex= canal.mq.topic=canal-topic # 分区个数 canal.mq.partitionsNum=1 # 发送到哪个分区的规则 #.*\..*:id正则匹配,指定所有正则匹配的表对应的hash字段为id # .*\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找) canal.mq.partitionHash=.*\..*:id第三步:开启Mysql的binlog文件
-
创建用户,并配置所有ip可访问
CREATE USER ‘canal’@’%’ IDENTIFIED WITH mysql_native_password BY ‘canal’; -
赋予该用户权限,ALL=所有权限
GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ -
进入my.cnf文件。(docker部署的mysql进入容器后,cd etc/mysql 即可到达my,cnf路径下,然后vim my.cnf即可)
在my.cnf的[mysqld]下添加
log-bin = mysql-bin binlog-format = ROW server_id = 1
添加后的样子
4. 然后重启mysql容器即可
至此,canal监听单个mysql server完毕。
第四步:启动canalwindow启动:
linux启动:
把整个下载下来的canal.deployer-1.1.5放到linux目录,然后进入到bin目录下
输入 【./startup.sh】
- 启动canal
- 启动kafka消费者
- 修改被监听的mysql下的某条记录
- 看一下kafka消费者的变化
注意:修改完配置文件,要把example文件夹下的meta.dat删除
- 进入conf文件夹,把example多复制几个,改成不同的名
- 进入到canal.properties配置文件中
每个example配置各自的mysql和kafka的topic即可,当然也可以使用同一个mysql server 和 kafka, 这个看个人
下面这个博主写的不错,可以看下如何通过springboot整合
转载:https://blog.csdn.net/yehongzhi1994/article/details/107880162.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)