canal admin+canal server安装部署教程(内附kafka推送demo)

canal admin+canal server安装部署教程(内附kafka推送demo),第1张

canal admin+canal server安装部署教程(内附kafka推送demo)

文章目录
    • 1.前置条件
    • 2.mysql环境准备
    • 3.相关套件准备
    • 4.canal-admin安装部署
    • 5.canal-server安装部署
    • 6.instance配置
    • 7.测试
    • 8.总结

1.前置条件

软件

1.jdk_1.8.0_131

2.mysql_5.7.24

3.zookeeper_3.4.14

4.kafka_2.11-2.2.2

集群环境

hadoop1:192.168.56.1hadoop2:192.168.56.2hadoop3:192.168.56.3mysql✔❌❌zookeeper✔✔✔kafka✔✔✔canal-admin✔❌❌canal-server✔✔✔

相关安装包:
百度网盘:https://pan.baidu.com/s/1h67eVcvYh0a0opGnIytW9g
密码:hehe

2.mysql环境准备

开启Binlog写入功能

my.cnf配置文件中追加

log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

创建canal用户并授权

root用户进入mysql环境,输入以下命令

# 修改密码校验规则
set global validate_password_length=0;
set global validate_password_policy=LOW;
# 创建用户canal,密码为canal
CREATE USER canal IDENTIFIED BY 'canal';  
# canal用户授权
GRANT SELECT,UPDATE,INSERT,DELETE,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新权限
FLUSH PRIVILEGES;

创建后续测试需要的库和表

进入mysql环境,输入以下命令

# 创建数据库
create database test_canal;
# 切换数据库
use test_canal;
# 创建表
CREATE TABLE `tb_commodity_info`  (
  `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `commodity_name` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '商品名称',
  `commodity_price` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '0' COMMENT '商品价格',
  `number` int(10) NULL DEFAULT 0 COMMENT '商品数量',
  `description` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '' COMMENT '商品描述',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '商品信息表' ROW_FORMAT = Dynamic;
3.相关套件准备

启动zookeeper

关于zk的集群安装部署这里直接省略,有需要请上网查找

# 进入zk安装目录的bin目录下
# 执行启动命令(三台机子都要)
./zkServer.sh start

启动kafka

关于kafka的集群安装部署这里直接省略,有需要请上网查找

# 进入kafka安装目录的bin目录下
# 执行启动命令(三台机子都要)
 ./kafka-server-start.sh ../config/server.properties &
# 选择其中一台kafka创建topic
./kafka-topics.sh --create --zookeeper 192.168.56.1:2181,192.168.56.2:2181,192.168.56.3:2181 --replication-factor 3 --partitions 1 --topic test_canal
4.canal-admin安装部署

安装

# 下载安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
# 创建目录
mkdir -p /opt/program/canal-admin_1.1.4
# 解压
tar -zxvf canal.admin-1.1.4.tar.gz -C /opt/program/canal-admin_1.1.4

修改配置

# 打开配置文件
vi /opt/program/canal-admin_1.1.4/conf/application.yml
# 修改为以下配置
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  # mysql相关配置
  address: 192.168.56.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

执行初始化脚本

# 进入脚本所在目录
cd /opt/program/canal-admin_1.1.4/conf
# 进入root用户mysql(我这里root用户密码为123456,根据实际情况写密码)
mysql -u root -p123456
# 执行脚本
source ./canal_manager.sql

启动

# 进入bin目录
/opt/program/canal-admin_1.1.4/bin
# 执行启动命令
./startup.sh

浏览器访问

浏览器访问canal-admin,http://服务器地址:8090

创建集群

5.canal-server安装部署

安装

# 下载安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
# 创建目录
mkdir -p /opt/program/canal-server_1.1.4
# 解压
tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/program/canal-server_1.1.4

修改配置

# 切换到目标目录
cd /opt/program/canal-server_1.1.4/conf
# 修改配置
vi canal_local.properties
# 修改为以下配置
# register ip(本机地址)
canal.register.ip = 192.168.56.1

# canal admin config(运行canal-admin地址)
canal.admin.manager = 192.168.56.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register(开启自动注册到canal-admin)
canal.admin.register.auto = true
# 注册集群名称,如果不填写则为单机节点,这里我们直接填写之前创建好的集群名称
canal.admin.register.cluster = hadoop

分发

# 192.168.56.2
scp -r /opt/program/canal-server_1.1.4 root@192.168.56.2:/opt/program/canal-server_1.1.4
# 192.168.56.3
scp -r /opt/program/canal-server_1.1.4 root@192.168.56.3:/opt/program/canal-server_1.1.4
# 分发完毕后记得修改canal_local.properties中的canal.register.ip为本机地址

启动

# 三台服务器执行以下命令
# 切换目录
cd /opt/program/canal-server_1.1.4/bin
# 启动
./startup.sh local

查看

回到canal-admin中查看Server管理

配置server配置

注意在canal-admin集群模式下,一个集群下的server共用一套配置

1、打开集群管理,对集群选择 *** 作,点击主配置

2、进入配置页面,点击载入模板

3、修改以下配置

# 按实际填写zk地址
canal.zkServers = 192.168.56.1:2181,192.168.56.2:2181,192.168.56.3:2181
# 选择serverMode为kafka
canal.serverMode = kafka
# 切换canal.instance.global.spring.xml为classpath:spring/default-instance.xml,上面两行注释掉
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# 按实际填写kafka地址
canal.mq.servers = 192.168.56.1:9092,192.168.56.2:9092,192.168.56.3:9092
# 保存

4、配置项补充说明(这里可以暂时不看,后面demo跑通后再来扩展)

目前默认支持的instance.xml有以下几种:

  1. canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
  2. canal.instance.global.spring.xml = classpath:spring/file-instance.xml
  3. canal.instance.global.spring.xml = classpath:spring/default-instance.xml
  4. canal.instance.global.spring.xml = classpath:spring/group-instance.xml

在介绍instance配置之前,先了解一下canal如何维护一份增量订阅&消费的关系信息:

  • 解析位点 (parse模块会记录,上一次解析binlog到了什么位置,对应组件为:CanalLogPositionManager)
  • 消费位点(canal server在接收了客户端的ack后,就会记录客户端提交的最后位点,对应的组件为:CanalmetaManager)

对应的两个位点组件,目前都有几种实现:

  • memory (memory-instance.xml中使用)
  • zookeeper
  • mixed
  • period (default-instance.xml中使用,集合了zookeeper+memory模式,先写内存,定时刷新数据到zookeeper上)

下面是解析:

  1. canal.instance.global.spring.xml = classpath:spring/memory-instance.xml

    所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析

    特点:速度最快,依赖最少(不需要zookeeper)

    场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境

  2. canal.instance.global.spring.xml = classpath:spring/file-instance.xml

    基于file的持久化模式。

    特点:支持单机持久化

    场景:生产环境,无HA需求,简单可用.

    采用该模式的时候,如果关闭了canal,会在destination中生成一个meta.dat,用来记录关键信息。如果想要启动canal之后马上订阅最新的位点,需要把该文件删掉。

  3. canal.instance.global.spring.xml = classpath:spring/default-instance.xml

    store选择了内存模式,其余的parser/sink依赖的位点管理选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.

    特点:支持HA

    场景:生产环境,集群化部署.

  4. canal.instance.global.spring.xml = classpath:spring/group-instance.xml

    主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。

    场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.

6.instance配置

新建

1、canal-admin选择Instance管理,点击新建Instance

2、进入配置页面,点击载入模板,输入Instance名称以及选择所属集群

3、修改以下配置

下方配置topic为文章上部介绍相关套件准备时创建,需要可往上翻阅

# 根据实际配置数据库地址
canal.instance.master.address=192.168.56.1:3306
# 配置topic
canal.mq.topic=test_canal

4、创建成功后,点击对应Instance *** 作,选择启动

5、配置项补充说明(这里可以暂时不看,后面demo跑通后再来扩展)

  1. canal.instance.filter.regex

    该配置项可以配置mysql数据解析关注的表,匹配模式为正则匹配

    常见例子:
    - 所有表:.* or .*\..*
    - canal数据库下所有表: canal\..*
    - canal数据库下的以canal打头的表:canal\.canal.*
    - canal数据库下的某张表:canal\.test1
    - 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
    
  2. canal.mq.dynamicTopic

    canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

    例子1:test\.test 指定匹配的单表,数据库test中的test表变更数据发送到名称为test.test的topic上
    例子2:.\.. 匹配所有表,则每个表都会发送到各自表名的topic上
    例子3:test 指定匹配对应的库,test库的所有表变更数据发送到名称为test的topic上
    例子4:test\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
    例子5:test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1.test1 topic上,其余的表发送到默认的canal.mq.topic值
    

    为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

    例子1: test:test\.test 指定匹配的单表,发送到以test为名字的topic上
    例子2: test:.\.. 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
    例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
    例子4:testA:test\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
    例子5:test0:test,test1:test1\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值
    
  3. canal.mq.partitionHash

    该参数用于决定数据发往哪个分区

    例子1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
    例子2:.\..:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
    例子3:.\..:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
    例子4: 匹配规则啥都不写,则默认发到0这个partition上
    例子5:.\.. ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
    按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
    例子6: test\.test:id,.\..* , 针对test的表按照id散列,其余的表按照table散列
    
7.测试

启动kafka消费者

进入kafka安装目录下bin目录,执行以下命令

./kafka-console-consumer.sh --bootstrap-server 192.168.56.1:9092,192.168.56.2:9092,192.168.56.3:9092 --topic test_canal

数据库插入数据

进入mysql环境,输入以下命令(该库表为文章上部介绍mysql环境准备时创建,需要可往上翻阅)

# 选择目标库
use test_canal;
# 执行插入语句
insert into `tb_commodity_info`(id,commodity_name,commodity_price,number,description) 
values("3e71a81fd80711eaaed600163e046cc9","白糖糕","3.12",3,"软糯香甜白糖糕");

消费者结果输出

{
  "data": [
    {
      "id": "3e71a81fd80711eaaed600163e046cc9",
      "commodity_name": "白糖糕",
      "commodity_price": "3.12",
      "number": "3",
      "description": "软糯香甜白糖糕"
    }
  ],
  "database": "test_canal",
  "es": 1634571615000,
  "id": 8,
  "isDdl": false,
  "mysqlType": {
    "id": "varchar(32)",
    "commodity_name": "varchar(512)",
    "commodity_price": "varchar(36)",
    "number": "int(10)",
    "description": "varchar(2048)"
  },
  "old": null,
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {
    "id": 12,
    "commodity_name": 12,
    "commodity_price": 12,
    "number": 4,
    "description": 12
  },
  "table": "tb_commodity_info",
  "ts": 1634571615824,
  "type": "INSERT"
}
8.总结

以上就是搭建canal-admin+canal-server+kafka+mysql binlog日志实时同步的demo,其中各配置项基本全为最简配置,让我们的demo最快的跑起来。但是在实际使用中我们还会去配置mysql表和topic的过滤映射规则、kafka发送数据时的分渠策略等,详情可以看官方文档:https://github.com/alibaba/canal/wiki/AdminGuide,被墙了可看这个:https://www.wenjiangs.com/doc/admin-guide

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4016040.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-22
下一篇 2022-10-22

发表评论

登录后才能评论

评论列表(0条)

保存