canal-adapter 同步mysql到es [基于tcp模式]

canal-adapter 同步mysql到es [基于tcp模式],第1张

canal-adapter 同步mysql到es [基于tcp模式]

文章目录

canal环境搭建canal-servercanal-admincanal-adapter

canal环境搭建
    canal官网下载 https://github.com/alibaba/canal/tags分别将三个tar.gz包解压到指定的包下(adapter|admin|deployer)
canal-server
    将自己伪装成mysql的slave节点,来订阅mysql binlog的变更配置mysql,开启binlog
log-bin=mysql-bin # 开启 
binlog binlog-format=ROW # 选择 ROW 模式 
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    配置canal.properties
canal.id = 101
canal.register.ip = 127.0.0.1 
canal.admin.user = admin 
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# 当前server上部署的instance列表,对应conf目录下创建对应的文件夹,文件copy example中做修改即可
canal.destinations = example,testusers  
    配置example.properties
canal.instance.mysql.slaveId=103  # 不能和mysql的server_id重复
canal.instance.master.address=mysql地址:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=xxxxxx
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex=canal_manager..*  # 配置包含数据库和表
canal.instance.filter.black.regex=mysql\.slave_.*   # 配置不包含数据库和表
canal.mq.topic=xxx

    配置过滤正则表达式说明

    启动canal

/bin/startup.sh
查看canal.log,cat logs/canal/canal.log
查看example.log,cat logs/example/example.log
canal-admin
    提供了WebUI *** 作界面用来配置集群、节点、实例初始化数据库:conf/canal_manager.sql配置application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: 123456
    启动
bin/startup.sh
查看日志:cat logs/admin.log
    WebUI
浏览器访问:http://localhost:8089/ 用户名/密码:admin/123456
canal-adapter
    可以将数据库变更同步给MQ、ES、DB、LOGGER,需要我们根据实际业务需要去修改配置文件conf/application.yml创建表
DROp TABLE IF EXISTS `users`;
CREATE TABLE `users` (
  `id` varchar(64) NOT NULL,
  `username` varchar(20) NOT NULL COMMENT '用户名',
  `phone` varchar(64) NOT NULL COMMENT '手机号',
  `nickname` varchar(20) NOT NULL COMMENT '昵称',
  `last_modified` timestamp NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `id` (`id`),
  UNIQUE KEY `username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

BEGIN;
INSERT INTO `users` VALUES ('1', '郑强', '13181838112', '爸爸', '2022-01-24 17:13:00');
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;
    创建索引
PUT /mytest_user/
{
  "mappings": {
    "_doc":{
      "properties": {
        "id": {
          "type": "keyword"
        },
        "username": {
          "type": "text"
        },
        "phone": {
          "type": "text"
        },
        "nickname": {
          "type": "text"
        },
        "last_modified":{
          "type": "date"
        }
      }
    }
  }
}
    修改canal-adapter配置
server:
  port: 8088
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://172.22.77.76:3306/canal_manager?useUnicode=true
      username: root
      password: rc2021!
  canalAdapters:
  - instance: testusers # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
        properties:
          mode: transport # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: es-01
    修改conf/es7/mytest_user.yml
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: testusers # cannal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: mytest_user # 索引名称
  _type: _doc
  _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配, **_id要与下面sql语句字段 as _id一样**
  upsert: true
  #pk: id
  sql: "SELECT id as _id,username as username,phone as phone,nickname as nickname,last_modified as last_modified FROM users"
#  objFields:
#  #    _labels: array:;
#  # etlCondition: "where a.c_time>={}"
#    commitBatch: 3000 # 提交批大小
    启动
bin/startup.sh
    全量同步
adapter根目录下创建目录 mkdir sh
vim users_all.sh   curl http://localhost:8088/etl/es7/mytest_user.yml -X POST
chmod vim users_all.sh 777
./vim users_all.sh
{"succeeded":true,"resultMessage":"导入ES 数据:4 条"}

    增量同步
    启动服务之后就已经增量同步,打开日志即可看到tail -f logs/adapter/adapter.log

    解决问题
    (1) druid类型转换错误

   1> 下载源码包:[https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz](https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz)
   2> 定位到 client-adapter.escore 模块的 pom.xml 的 druid 更新为

   	
   	    com.alibaba
   	    druid
   	    provided
   	
   	
   3> mvn clean package
   4> 到canal-canal-1.1.5/client-adapter/es7x/target 下 将打包好的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 替换掉 canal-adapter/plugin 下原来的
   5> 重启adapter
   

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5717675.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存