下载地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.5
准备工作安装mysql5.7 , es7.x,安装过程略…
注意: mysql 需要开启 binlog
[mysqld] og-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复mysql中 配置canal数据库管理用户,配置相应权限(repication权限)(如果使用root用户忽略此步骤)
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECt, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;解压文件 配置canal-deployer/conf/example/instance.properties 启动deployer
./startup.sh配置 /canal-adapter/conf/application.yml
USER: HOSTNAME%%.*: PWD/#$HOME/~: server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # kafka consumer # kafka.bootstrap.servers: 127.0.0.1:9092 # kafka.enable.auto.commit: false # kafka.auto.commit.interval.ms: 1000 # kafka.auto.offset.reset: latest # kafka.request.timeout.ms: 40000 # kafka.session.timeout.ms: 30000 # kafka.isolation.level: read_committed # kafka.max.poll.records: 1000 # rocketMQ consumer # rocketmq.namespace: # rocketmq.namesrv.addr: 127.0.0.1:9876 # rocketmq.batch.size: 1000 # rocketmq.enable.message.trace: false # rocketmq.customized.trace.topic: # rocketmq.access.channel: # rocketmq.subscribe.filter: # rabbitMQ consumer # rabbitmq.host: # rabbitmq.virtual.host: # rabbitmq.username: # rabbitmq.password: # rabbitmq.resource.ownerId: srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf-8&autoReconnect=true username: root password: Yi123456789 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase - name: es7 # key: es-test hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode properties: mode: rest # or rest # # security.auth: test:123456 # only used for rest mode cluster.name: my-application # - name: kudu # key: kudu # properties: # kudu.master.address: 127.0.0.1 # ',' split multi address配置 es7 同步文件 /canal-adapter/conf/es7/mytest_user.yml
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: mytest_user _id: _id _type: _doc upsert: true pk: id sql: "select a.id as _id, a.name, a.role_id, b.role_name from m_user a left join m_role b on b.id=a.role_id" # objFields: # _labels: array:; etlCondition: "where a.id = {}" commitBatch: 3000启动adapter
./startup.sh常见问题总结 问题一
DruidDataSource jar包依赖冲突问题
该问题会导致源数据变更了,但无法写入es中,这个需要下载canal 源码包,修改client-adapter escore模块下的pom文件,对应位置加上provided配置,如下图:
在编译过程中有可能出现
[Help 1][ERROR]" />
将 guava 版本改为 18.0
ERROR c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - dump address /127.0.0.1:3306 has an error, retrying. caused by com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example
原因:meta.dat 中保存的位点信息和数据库的位点信息不一致;导致canal抓取不到数据库的动作;
解决方案:删除meta.dat删除,再重启canal,问题解决;
集群 *** 作:进入canal对应的zookeeper集群下,删除节点/otter/canal/destinations/xxxxx/1001/cursor ;重启canal即可恢复;
java.lang.OutOfMemoryError: Java heap space
canal消费端挂了太久,在zk对应conf下节点的
/otter/canal/destinations/test_db/1001/cursor 位点信息是很早以前,导致重启canal时,从很早以前的位点开始消费,导致canal服务器内存爆掉
监听数据库变更,只有TransactionBegin/TransactionEnd,没有拿到数据的EventType;
原因可能是canal.instance.filter.black.regex=.*…*导致,改canal.instance.filter.black.regex= 再重启试试
ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:fdyb_db[com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table Caused by: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by Caused by: java.io.IOException: ErrorPacket [errorNumber=1142, fieldCount=-1,
分析:mysql系统表权限较高,canal读该表的binlog失败,位点无法移动
解决:将配置项中黑名单加上mysql下的所有表:canal.instance.filter.black.regex = mysql…* ,修改后canal集群不需要重启即可恢复;
其它注意点:检查下CanalConnector是否调用subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是.…,那么相当于你消费了所有的更新数据。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)