canal环境搭建canal-servercanal-admincanal-adapter
canal环境搭建- canal官网下载 https://github.com/alibaba/canal/tags分别将三个tar.gz包解压到指定的包下(adapter|admin|deployer)
- 将自己伪装成mysql的slave节点,来订阅mysql binlog的变更配置mysql,开启binlog
log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 配置canal.properties
canal.id = 101 canal.register.ip = 127.0.0.1 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 # 当前server上部署的instance列表,对应conf目录下创建对应的文件夹,文件copy example中做修改即可 canal.destinations = example,testusers
- 配置example.properties
canal.instance.mysql.slaveId=103 # 不能和mysql的server_id重复 canal.instance.master.address=mysql地址:3306 canal.instance.dbUsername=root canal.instance.dbPassword=xxxxxx canal.instance.connectionCharset = UTF-8 canal.instance.filter.regex=canal_manager..* # 配置包含数据库和表 canal.instance.filter.black.regex=mysql\.slave_.* # 配置不包含数据库和表 canal.mq.topic=xxx
配置过滤正则表达式说明
启动canal
/bin/startup.sh 查看canal.log,cat logs/canal/canal.log 查看example.log,cat logs/example/example.logcanal-admin
- 提供了WebUI *** 作界面用来配置集群、节点、实例初始化数据库:conf/canal_manager.sql配置application.yml
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: 123456
- 启动
bin/startup.sh 查看日志:cat logs/admin.log
- WebUI
浏览器访问:http://localhost:8089/ 用户名/密码:admin/123456canal-adapter
- 可以将数据库变更同步给MQ、ES、DB、LOGGER,需要我们根据实际业务需要去修改配置文件conf/application.yml创建表
DROp TABLE IF EXISTS `users`; CREATE TABLE `users` ( `id` varchar(64) NOT NULL, `username` varchar(20) NOT NULL COMMENT '用户名', `phone` varchar(64) NOT NULL COMMENT '手机号', `nickname` varchar(20) NOT NULL COMMENT '昵称', `last_modified` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间', PRIMARY KEY (`id`), UNIQUE KEY `id` (`id`), UNIQUE KEY `username` (`username`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; BEGIN; INSERT INTO `users` VALUES ('1', '郑强', '13181838112', '爸爸', '2022-01-24 17:13:00'); COMMIT; SET FOREIGN_KEY_CHECKS = 1;
- 创建索引
PUT /mytest_user/ { "mappings": { "_doc":{ "properties": { "id": { "type": "keyword" }, "username": { "type": "text" }, "phone": { "type": "text" }, "nickname": { "type": "text" }, "last_modified":{ "type": "date" } } } } }
- 修改canal-adapter配置
server: port: 8088 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: srcDataSources: defaultDS: url: jdbc:mysql://172.22.77.76:3306/canal_manager?useUnicode=true username: root password: rc2021! canalAdapters: - instance: testusers # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: es7 hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode properties: mode: transport # or rest # security.auth: test:123456 # only used for rest mode cluster.name: es-01
- 修改conf/es7/mytest_user.yml
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 destination: testusers # cannal的instance或者MQ的topic groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: mytest_user # 索引名称 _type: _doc _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配, **_id要与下面sql语句字段 as _id一样** upsert: true #pk: id sql: "SELECT id as _id,username as username,phone as phone,nickname as nickname,last_modified as last_modified FROM users" # objFields: # # _labels: array:; # # etlCondition: "where a.c_time>={}" # commitBatch: 3000 # 提交批大小
- 启动
bin/startup.sh
- 全量同步
adapter根目录下创建目录 mkdir sh vim users_all.sh curl http://localhost:8088/etl/es7/mytest_user.yml -X POST chmod vim users_all.sh 777 ./vim users_all.sh {"succeeded":true,"resultMessage":"导入ES 数据:4 条"}
增量同步
启动服务之后就已经增量同步,打开日志即可看到tail -f logs/adapter/adapter.log
解决问题
(1) druid类型转换错误
1> 下载源码包:[https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz](https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz) 2> 定位到 client-adapter.escore 模块的 pom.xml 的 druid 更新为3> mvn clean package 4> 到canal-canal-1.1.5/client-adapter/es7x/target 下 将打包好的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 替换掉 canal-adapter/plugin 下原来的 5> 重启adapter com.alibaba druidprovided
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)