centos7 阿里 canal 实现增量同步MySQL的数据到ES

centos7 阿里 canal 实现增量同步MySQL的数据到ES,第1张

centos7 阿里 canal 实现增量同步MySQL的数据到ES 下载canal

下载地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.5

准备工作

安装mysql5.7 , es7.x,安装过程略…
注意: mysql 需要开启 binlog

在my.cnf 加入如下:
[mysqld]
og-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
mysql中 配置canal数据库管理用户,配置相应权限(repication权限)(如果使用root用户忽略此步骤)
CREATE USER canal IDENTIFIED BY 'canal'; 
GRANT SELECt, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
解压文件

配置canal-deployer/conf/example/instance.properties

启动deployer

 ./startup.sh 
配置 /canal-adapter/conf/application.yml
USER: 
HOSTNAME%%.*: 
PWD/#$HOME/~: 
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
#    kafka.bootstrap.servers: 127.0.0.1:9092
#    kafka.enable.auto.commit: false
#    kafka.auto.commit.interval.ms: 1000
#    kafka.auto.offset.reset: latest
#    kafka.request.timeout.ms: 40000
#    kafka.session.timeout.ms: 30000
#    kafka.isolation.level: read_committed
#    kafka.max.poll.records: 1000
    # rocketMQ consumer
#    rocketmq.namespace:
#    rocketmq.namesrv.addr: 127.0.0.1:9876
#    rocketmq.batch.size: 1000
#    rocketmq.enable.message.trace: false
#    rocketmq.customized.trace.topic:
#    rocketmq.access.channel:
#    rocketmq.subscribe.filter:
    # rabbitMQ consumer
#    rabbitmq.host:
#    rabbitmq.virtual.host:
#    rabbitmq.username:
#    rabbitmq.password:
#    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf-8&autoReconnect=true
      username: root
      password: Yi123456789
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es7
#        key: es-test
        hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: my-application
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address
配置 es7 同步文件 /canal-adapter/conf/es7/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: mytest_user
  _id: _id
  _type: _doc
  upsert: true
  pk: id
  sql: "select a.id as _id, a.name, a.role_id, b.role_name from m_user a
        left join m_role b on b.id=a.role_id"
#  objFields:
#    _labels: array:;
  etlCondition: "where a.id = {}"
  commitBatch: 3000

启动adapter

./startup.sh 
常见问题总结 问题一

DruidDataSource jar包依赖冲突问题
该问题会导致源数据变更了,但无法写入es中,这个需要下载canal 源码包,修改client-adapter escore模块下的pom文件,对应位置加上provided配置,如下图:

在编译过程中有可能出现

[Help 1][ERROR]" />

将 guava 版本改为 18.0

问题二
ERROR c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - dump address /127.0.0.1:3306 has an error, retrying. caused by

com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example

原因:meta.dat 中保存的位点信息和数据库的位点信息不一致;导致canal抓取不到数据库的动作;
解决方案:删除meta.dat删除,再重启canal,问题解决;
集群 *** 作:进入canal对应的zookeeper集群下,删除节点/otter/canal/destinations/xxxxx/1001/cursor ;重启canal即可恢复;

问题三
java.lang.OutOfMemoryError: Java heap space

canal消费端挂了太久,在zk对应conf下节点的

/otter/canal/destinations/test_db/1001/cursor 位点信息是很早以前,导致重启canal时,从很早以前的位点开始消费,导致canal服务器内存爆掉
监听数据库变更,只有TransactionBegin/TransactionEnd,没有拿到数据的EventType;
原因可能是canal.instance.filter.black.regex=.*…*导致,改canal.instance.filter.black.regex= 再重启试试

问题四
ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:fdyb_db[com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table 
Caused by: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table 

Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by 
Caused by: java.io.IOException: ErrorPacket [errorNumber=1142, fieldCount=-1, 

分析:mysql系统表权限较高,canal读该表的binlog失败,位点无法移动
解决:将配置项中黑名单加上mysql下的所有表:canal.instance.filter.black.regex = mysql…* ,修改后canal集群不需要重启即可恢复;
其它注意点:检查下CanalConnector是否调用subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是.,那么相当于你消费了所有的更新数据。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5673188.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存