Canal 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,工作原理如下:
Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 Canal )Canal 解析 binary log 对象(原始为 byte 流)
可以用于以下业务场景:
数据库镜像数据库实时备份索引构建和实时维护(拆分异构索引、倒排索引等)业务 cache 刷新带业务逻辑的增量数据处理
当前的 Canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
Canal 使用 数据库配置Tips: 以上内容来自官网,详细内容建议先看一遍官方 wiki 文档。
在正式安装 Canal 之前,首先需要对 MySQL(本文使用版本为 5.7.34) 的配置进行修改,同时我们可以新建一个测试用户和测试库来完成后续的步骤,首先是修改my.cnf配置文件,增加以下配置内容:
[mysqld] # 开启 binlog log-bin=mysql-bin # 选择 ROW 模式 binlog-format=ROW # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 server_id=1
然后通过root账号新建一个名为canal的用户和名为canal的库并授权给canal:
-- 新建 canal 用户, 密码为 canal CREATE USER canal IDENTIFIED BY 'canal'; -- 新建 canal 数据库并给 canal 用户授予权限 CREATE DATAbase canal CHARACTER SET utf8mb4; GRANT SELECt, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; GRANT ALL PRIVILEGES ON canal.* TO 'canal'@'%'; FLUSH PRIVILEGES;
然后创建一个user表并添加几条数据用于后续测试:
CREATE TABLE `user` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '用户 id', `username` varchar(50) DEFAULT NULL COMMENT '用户名', `password` varchar(50) DEFAULT NULL COMMENT '密码', `email` varchar(45) DEFAULT NULL COMMENT '邮箱', `phone` varchar(15) DEFAULT NULL COMMENT '手机号码', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4; INSERT INTO `canal`.`user` (`id`, `username`, `password`, `email`, `phone`) VALUES (1, '姜磊', 'k0VP$l@ru', 't.wazmcs@qxfvsstyo.uy', '18145206808'), (2, '丁洋', '8pig73*dW', 'h.wsecj@wmlp.li', '19832458514'), (3, '邱秀兰', '5G)c@7RyV', 'c.afkrfcr@rnhewu.org.cn', '18656022523'), (4, '孔洋', 'KjvLG*BP', 'r.tbnmdyh@pzzuo.jo', '18674498531'), (5, '董霞', '%fqmhybp3', 'o.hnlu@hhyvqxbv.eg', '18192674843');
安装 Canal 下载ps: 以上数据为 mock 生成,如有雷同,纯属巧合。
访问官网releases页面,下载并安装 1.1.5 版本:
mkdir /data/canal cd /data/canal wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz tar -zxvf canal.deployer-1.1.5.tar.gz rm -rf canal.deployer-1.1.5.tar.gz修改配置
修改conf/example/instance.properties的以下配置为自己的数据库连接信息:
# 不能和 mysql 的 server_id 重复 canal.instance.mysql.slaveId=2 # position info canal.instance.master.address=192.168.3.13:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal启动
sh bin/startup.sh防火墙设置
firewall-cmd --zone=public --add-port=11111/tcp --permanent firewall-cmd --reload测试
编写以下测试代码进行连接测试,完整代码已上传到GitHub。
@Slf4j @Component @RequiredArgsConstructor public class CanalListener { private final CanalConfig canalConfig; @PostConstruct private void init() { listen(); } private void listen() { new Thread(() -> { // 连接到 Canal CanalConnector connector = connect(); try { // 循环获取数据库变更信息 while (true) { Message message = connector.getWithoutAck(canalConfig.getBatchSize()); long batchId = message.getId(); ListentryList = message.getEntries(); if (batchId == -1 || entryList.isEmpty()) { Util.sleep(500); } else { // 存在变更信息则打印 entryList.forEach(this::printEntry); } connector.ack(batchId); } } finally { connector.disconnect(); } }).start(); } private CanalConnector connect() { CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()), canalConfig.getTopic(), canalConfig.getUsername(), canalConfig.getPassword() ); connector.connect(); connector.subscribe(); connector.rollback(); return connector; } @SneakyThrows private void printEntry(CanalEntry.Entry entry) { CanalEntry.EntryType entryType = entry.getEntryType(); if (entryType == CanalEntry.EntryType.TRANSACTIonBEGIN || entryType == CanalEntry.EntryType.TRANSACTIONEND) { return; } String lineSeparator = System.lineSeparator(); StringBuilder info = new StringBuilder(lineSeparator); info.append("==========数据变更信息==========").append(lineSeparator); CanalEntry.Header header = entry.getHeader(); info.append(String.format( "数据库.表名: %s.%s%n", header.getSchemaName(), header.getTableName())); info.append(String.format(" *** 作类型: %s%n", header.getEventType())); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStorevalue()); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { CanalEntry.EventType eventType = rowChange.getEventType(); if (eventType == CanalEntry.EventType.DELETE) { info.append(String.format( "delete: %s%n", getDataInfo(rowData.getBeforeColumnsList()))); } else if (eventType == CanalEntry.EventType.INSERT) { info.append(String.format( "insert: %s%n", getDataInfo(rowData.getAfterColumnsList()))); } else { info.append(String.format( "update: %s%n", getDataInfo(rowData.getAfterColumnsList()))); } } log.info(info.toString()); } private String getDataInfo(List columns) { return JSON.toJSONString( columns.stream() .collect(Collectors.toMap( CanalEntry.Column::getName, CanalEntry.Column::getValue)) ); } }
然后修改id为5的用户的密码由%fqmhybp3变为%fqmhybp4,控制台会打印以下信息:
可以发现我们顺利地获取到了数据的变更信息,通过变更信息,我们就可以执行数据的同步修改、刷新缓存等 *** 作了。
Dokcer 安装 Canal确保本机已安装 Docker 和 Dokcer Compose,然后创建docker-compose.yml文件:
version: '3' services: canal: container_name: canal_latest image: canal/canal-server restart: always ports: - 11111:11111 environment: - canal.instance.mysql.slaveId=2 - canal.instance.master.address=192.168.3.13:3306 - canal.instance.dbUsername=canal - canal.instance.dbPassword=canal volumes: - ./conf:/admin/canal-server/conf - ./logs:/admin/canal-server/logs
然后在当前目录执行docker-compose up -d即可,后续连接步骤参考上节即可。
RocketMQ 配置 安装 RocketMQ完成了 Canal 的基本安装及使用介绍后,下面再来简要介绍 RocketMQ 的安装,这里只展示使用 Docker 的安装方式,首先创建docker-compose.yml文件:
version: '3.5' services: rmqnamesrv: image: foxiswho/rocketmq:server container_name: rmqnamesrv ports: - 9876:9876 networks: rmq: aliases: - rmqnamesrv rmqbroker: image: foxiswho/rocketmq:broker container_name: rmqbroker ports: - 10909:10909 - 10911:10911 volumes: - ./conf/broker.conf:/etc/rocketmq/broker.conf environment: NAMESRV_ADDR: "rmqnamesrv:9876" JAVA_OPTS: " -Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m" command: mqbroker -c /etc/rocketmq/broker.conf depends_on: - rmqnamesrv networks: rmq: aliases: - rmqbroker rmqconsole: image: styletang/rocketmq-console-ng container_name: rmqconsole ports: - 8076:8080 environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" depends_on: - rmqnamesrv networks: rmq: aliases: - rmqconsole networks: rmq: name: rmq driver: bridge
然后创建conf文件夹与docker-compose.yml文件同级,并在conf文件夹中新建broker.conf文件,内容如下:
brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 # 修改为主机 IP brokerIP1=192.168.3.13 defaultTopicQueueNums=4 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true listenPort=10911 deleteWhen=04 fileReservedTime=120 mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=300000 diskMaxUsedSpaceRatio=88 maxMessageSize=65536 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH
然后执行docker-compose up -d即可成功启动。
防火墙设置firewall-cmd --zone=public --add-port=8076/tcp --permanent firewall-cmd --zone=public --add-port=9876/tcp --permanent firewall-cmd --reload
本地访问192.168.3.13:8076测试:
Canal 配置 RocketMQ 修改配置修改conf/canal.properties中的以下配置:
# tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = rocketMQ rocketmq.producer.group = canal_group rocketmq.namesrv.addr = 192.168.3.13:9876
修改conf/example/instance.properties的以下配置:
# mq config canal.mq.topic=canal_topic
ps: 以上配置基于前文配置。
然后执行sh bin/restart.sh即可应用修改后的配置。
测试这里把id为5的用户名修改为canal,然后在 RocketMQ 的管理界面查看消息:
可以发现变更信息已经顺利发送到 RocketMQ 中了。
编码测试下面再通过实际编码进行测试,完整代码及配置访问GitHub即可:
@Slf4j @Component @RocketMQMessageListener( topic = "canal_topic", consumerGroup = "canal_group" ) public class UserCanalListener implements RocketMQListener> { @Override public void onMessage(CanalMessage message) { String lineSeparator = System.lineSeparator(); StringBuilder info = new StringBuilder(lineSeparator); info.append("==========数据变更信息==========").append(lineSeparator); info.append(String.format( "数据库.表名: %s.%s%n", message.getDatabase(), message.getTable())); info.append(String.format(" *** 作类型: %s%n", message.getType())); message.getData().forEach(user -> info.append(user).append(lineSeparator)); log.info(info.toString()); } }
将id为5的用户名恢复,然后查看控制台打印信息:
最后再来简要介绍一下 Canal Admin 的安装及使用,访问官网releases页面,下载并安装 1.1.5 版本:
mkdir /data/canal-admin cd /data/canal-admin wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz tar -zxvf canal.admin-1.1.5.tar.gz rm -rf canal.admin-1.1.5.tar.gz修改配置
执行vi conf/application.yml,修改其中的数据库配置:
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 192.168.3.13:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin
然后将conf/canal_manager.sql导入到数据库中(使用 root 用户导入并授权给 canal 用户),其中授权语句如下:
GRANT ALL PRIVILEGES ON canal_manager.* TO 'canal'@'%'; FLUSH PRIVILEGES;启动
执行以下命令启动控制台:
sh bin/start.sh防火墙设置
firewall-cmd --zone=public --add-port=8089/tcp --permanent firewall-cmd --reload修改 Canal Server 配置
然后修改之前下载的 Canal 中conf/canal.properties中的如下配置:
# canal admin config canal.admin.manager = 192.168.3.13:8089
然后重启 Canal:
sh bin/restart.sh
然后在本地登录访问192.168.3.13:8089(记得修改为自己的 ip)进行登录,注意这里的密码和 Canal Admin 配置中的密码不是一个东西,这里的密码为123456:
之后便可以进行 Canal Server 的管理了:
总结本文简单介绍了关于 Canal 的安装使用及结合 RocketMQ 的基本使用方式,在后续将会以一些实际应用案例(例如在数据库表的数据变更时,执行更新缓存相关 *** 作,就不再需要在代码有很多分散的缓存更新 *** 作了)再来介绍 Canal 的使用。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)