- 1.前置条件
- 2.mysql环境准备
- 3.相关套件准备
- 4.canal-admin安装部署
- 5.canal-server安装部署
- 6.instance配置
- 7.测试
- 8.总结
软件
1.jdk_1.8.0_131
2.mysql_5.7.24
3.zookeeper_3.4.14
4.kafka_2.11-2.2.2
集群环境
相关安装包:
百度网盘:https://pan.baidu.com/s/1h67eVcvYh0a0opGnIytW9g
密码:hehe
开启Binlog写入功能
my.cnf配置文件中追加
log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
创建canal用户并授权
root用户进入mysql环境,输入以下命令
# 修改密码校验规则 set global validate_password_length=0; set global validate_password_policy=LOW; # 创建用户canal,密码为canal CREATE USER canal IDENTIFIED BY 'canal'; # canal用户授权 GRANT SELECT,UPDATE,INSERT,DELETE,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; # 刷新权限 FLUSH PRIVILEGES;
创建后续测试需要的库和表
进入mysql环境,输入以下命令
# 创建数据库 create database test_canal; # 切换数据库 use test_canal; # 创建表 CREATE TABLE `tb_commodity_info` ( `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `commodity_name` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '商品名称', `commodity_price` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '0' COMMENT '商品价格', `number` int(10) NULL DEFAULT 0 COMMENT '商品数量', `description` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '' COMMENT '商品描述', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '商品信息表' ROW_FORMAT = Dynamic;3.相关套件准备
启动zookeeper
关于zk的集群安装部署这里直接省略,有需要请上网查找
# 进入zk安装目录的bin目录下 # 执行启动命令(三台机子都要) ./zkServer.sh start
启动kafka
关于kafka的集群安装部署这里直接省略,有需要请上网查找
# 进入kafka安装目录的bin目录下 # 执行启动命令(三台机子都要) ./kafka-server-start.sh ../config/server.properties & # 选择其中一台kafka创建topic ./kafka-topics.sh --create --zookeeper 192.168.56.1:2181,192.168.56.2:2181,192.168.56.3:2181 --replication-factor 3 --partitions 1 --topic test_canal4.canal-admin安装部署
安装
# 下载安装包 wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz # 创建目录 mkdir -p /opt/program/canal-admin_1.1.4 # 解压 tar -zxvf canal.admin-1.1.4.tar.gz -C /opt/program/canal-admin_1.1.4
修改配置
# 打开配置文件 vi /opt/program/canal-admin_1.1.4/conf/application.yml # 修改为以下配置 server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: # mysql相关配置 address: 192.168.56.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin
执行初始化脚本
# 进入脚本所在目录 cd /opt/program/canal-admin_1.1.4/conf # 进入root用户mysql(我这里root用户密码为123456,根据实际情况写密码) mysql -u root -p123456 # 执行脚本 source ./canal_manager.sql
启动
# 进入bin目录 /opt/program/canal-admin_1.1.4/bin # 执行启动命令 ./startup.sh
浏览器访问
浏览器访问canal-admin,http://服务器地址:8090
创建集群
5.canal-server安装部署安装
# 下载安装包 wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz # 创建目录 mkdir -p /opt/program/canal-server_1.1.4 # 解压 tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/program/canal-server_1.1.4
修改配置
# 切换到目标目录 cd /opt/program/canal-server_1.1.4/conf # 修改配置 vi canal_local.properties # 修改为以下配置 # register ip(本机地址) canal.register.ip = 192.168.56.1 # canal admin config(运行canal-admin地址) canal.admin.manager = 192.168.56.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register(开启自动注册到canal-admin) canal.admin.register.auto = true # 注册集群名称,如果不填写则为单机节点,这里我们直接填写之前创建好的集群名称 canal.admin.register.cluster = hadoop
分发
# 192.168.56.2 scp -r /opt/program/canal-server_1.1.4 root@192.168.56.2:/opt/program/canal-server_1.1.4 # 192.168.56.3 scp -r /opt/program/canal-server_1.1.4 root@192.168.56.3:/opt/program/canal-server_1.1.4 # 分发完毕后记得修改canal_local.properties中的canal.register.ip为本机地址
启动
# 三台服务器执行以下命令 # 切换目录 cd /opt/program/canal-server_1.1.4/bin # 启动 ./startup.sh local
查看
回到canal-admin中查看Server管理
配置server配置
注意在canal-admin集群模式下,一个集群下的server共用一套配置
1、打开集群管理,对集群选择 *** 作,点击主配置
2、进入配置页面,点击载入模板
3、修改以下配置
# 按实际填写zk地址 canal.zkServers = 192.168.56.1:2181,192.168.56.2:2181,192.168.56.3:2181 # 选择serverMode为kafka canal.serverMode = kafka # 切换canal.instance.global.spring.xml为classpath:spring/default-instance.xml,上面两行注释掉 canal.instance.global.spring.xml = classpath:spring/default-instance.xml # 按实际填写kafka地址 canal.mq.servers = 192.168.56.1:9092,192.168.56.2:9092,192.168.56.3:9092 # 保存
4、配置项补充说明(这里可以暂时不看,后面demo跑通后再来扩展)
目前默认支持的instance.xml有以下几种:
- canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
- canal.instance.global.spring.xml = classpath:spring/file-instance.xml
- canal.instance.global.spring.xml = classpath:spring/default-instance.xml
- canal.instance.global.spring.xml = classpath:spring/group-instance.xml
在介绍instance配置之前,先了解一下canal如何维护一份增量订阅&消费的关系信息:
- 解析位点 (parse模块会记录,上一次解析binlog到了什么位置,对应组件为:CanalLogPositionManager)
- 消费位点(canal server在接收了客户端的ack后,就会记录客户端提交的最后位点,对应的组件为:CanalmetaManager)
对应的两个位点组件,目前都有几种实现:
- memory (memory-instance.xml中使用)
- zookeeper
- mixed
- period (default-instance.xml中使用,集合了zookeeper+memory模式,先写内存,定时刷新数据到zookeeper上)
下面是解析:
-
canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析
特点:速度最快,依赖最少(不需要zookeeper)
场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境
-
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
基于file的持久化模式。
特点:支持单机持久化
场景:生产环境,无HA需求,简单可用.
采用该模式的时候,如果关闭了canal,会在destination中生成一个meta.dat,用来记录关键信息。如果想要启动canal之后马上订阅最新的位点,需要把该文件删掉。
-
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
store选择了内存模式,其余的parser/sink依赖的位点管理选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.
特点:支持HA
场景:生产环境,集群化部署.
-
canal.instance.global.spring.xml = classpath:spring/group-instance.xml
主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。
场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.
新建
1、canal-admin选择Instance管理,点击新建Instance
2、进入配置页面,点击载入模板,输入Instance名称以及选择所属集群
3、修改以下配置
下方配置topic为文章上部介绍相关套件准备时创建,需要可往上翻阅
# 根据实际配置数据库地址 canal.instance.master.address=192.168.56.1:3306 # 配置topic canal.mq.topic=test_canal
4、创建成功后,点击对应Instance *** 作,选择启动
5、配置项补充说明(这里可以暂时不看,后面demo跑通后再来扩展)
-
canal.instance.filter.regex
该配置项可以配置mysql数据解析关注的表,匹配模式为正则匹配
常见例子: - 所有表:.* or .*\..* - canal数据库下所有表: canal\..* - canal数据库下的以canal打头的表:canal\.canal.* - canal数据库下的某张表:canal\.test1 - 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
-
canal.mq.dynamicTopic
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔
例子1:test\.test 指定匹配的单表,数据库test中的test表变更数据发送到名称为test.test的topic上 例子2:.\.. 匹配所有表,则每个表都会发送到各自表名的topic上 例子3:test 指定匹配对应的库,test库的所有表变更数据发送到名称为test的topic上 例子4:test\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上 例子5:test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1.test1 topic上,其余的表发送到默认的canal.mq.topic值
为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table
例子1: test:test\.test 指定匹配的单表,发送到以test为名字的topic上 例子2: test:.\.. 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下 例子4:testA:test\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下 例子5:test0:test,test1:test1\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值
-
canal.mq.partitionHash
该参数用于决定数据发往哪个分区
例子1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2 例子2:.\..:id 正则匹配,指定所有正则匹配的表对应的hash字段为id 例子3:.\..:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找) 例子4: 匹配规则啥都不写,则默认发到0这个partition上 例子5:.\.. ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题) 例子6: test\.test:id,.\..* , 针对test的表按照id散列,其余的表按照table散列
启动kafka消费者
进入kafka安装目录下bin目录,执行以下命令
./kafka-console-consumer.sh --bootstrap-server 192.168.56.1:9092,192.168.56.2:9092,192.168.56.3:9092 --topic test_canal
数据库插入数据
进入mysql环境,输入以下命令(该库表为文章上部介绍mysql环境准备时创建,需要可往上翻阅)
# 选择目标库 use test_canal; # 执行插入语句 insert into `tb_commodity_info`(id,commodity_name,commodity_price,number,description) values("3e71a81fd80711eaaed600163e046cc9","白糖糕","3.12",3,"软糯香甜白糖糕");
消费者结果输出
{ "data": [ { "id": "3e71a81fd80711eaaed600163e046cc9", "commodity_name": "白糖糕", "commodity_price": "3.12", "number": "3", "description": "软糯香甜白糖糕" } ], "database": "test_canal", "es": 1634571615000, "id": 8, "isDdl": false, "mysqlType": { "id": "varchar(32)", "commodity_name": "varchar(512)", "commodity_price": "varchar(36)", "number": "int(10)", "description": "varchar(2048)" }, "old": null, "pkNames": [ "id" ], "sql": "", "sqlType": { "id": 12, "commodity_name": 12, "commodity_price": 12, "number": 4, "description": 12 }, "table": "tb_commodity_info", "ts": 1634571615824, "type": "INSERT" }8.总结
以上就是搭建canal-admin+canal-server+kafka+mysql binlog日志实时同步的demo,其中各配置项基本全为最简配置,让我们的demo最快的跑起来。但是在实际使用中我们还会去配置mysql表和topic的过滤映射规则、kafka发送数据时的分渠策略等,详情可以看官方文档:https://github.com/alibaba/canal/wiki/AdminGuide,被墙了可看这个:https://www.wenjiangs.com/doc/admin-guide
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)