Canal一般用于实时同步数据场景,那么对于实时场景HA显得尤为重要,Canal支持HA搭建,canal的HA分为两部分,canal server和canal client分别有对应的HA实现。大数据中使用Canal同步数据一般同步到Kafka中,这里Kafka相当于是Canal Client,Kafka集群自带HA属性,所以这里我们 只关注Canal Server HA。Canal Server HA主要是为了减少对mysql dump的请求,不同server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态(standby是instance的状态),Canal Server HA原理如下:
Canal HA 保证步骤如下:
1)canal server要启动某个canal instance时都先向zookeeper_进行一次尝试启动判断。
2)创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。
3)一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的 *** 作,重新选出一个canal server启动instance。
4)canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。
1)机器准备
运行Canal的机器:node3,node4
zookeeper地址:node2:2181,node3:2181,node4:2181
mysql地址:node1:3306
2)在node3,node4上单独部署配置Canal
将Canal安装包上传到node3,node4,并解压到“/opt/canal”目录下,修改“/opt/canal/conf”下的canal.properties文件,加上zookeeper配置
#指定zookeeper集群地址 canal.zkServers = node2:2181,node3:2181,node4:2181 #配置spring的xml配置文件 canal.instance.global.spring.xml = classpath:spring/default-instance.xml #canal将数据写入Kafka,可配:tcp, kafka, RocketMQ,tcp就是使用canal代码接收 canal.serverMode = kafka #配置canal写入Kafka地址 canal.mq.servers = node1:9092,node2:9092,node3:9092
进入“/opt/canal/conf/example”目录,修改“instance.properties”文件:
#另外一台机器改成123457,保证slaveId不重复即可 canal.instance.mysql.slaveId=123456 #配置mysql master 节点及端口 canal.instance.master.address=node2:3306 #配置连接mysql的用户名和密码,就是前面复制权限的用户名和密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal #配置Canal将数据导入到Kafka topic canal.mq.topic=canal_topic
注意:两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置,此配置中才有关于zookeeper的设置信息。
3)启动两台机器的Canal
#在node3上启动Canal [root@node3 ~]# cd /opt/canal/bin [root@node3 bin]# ./startup.sh #在node4上启动Canal [root@node4 ~]# cd /opt/canal/bin [root@node4 bin]# ./startup.sh
启动完成后,可以查看zookeeper中对应的路径信息:
[zk: localhost:2181(CONNECTED) 2] ls /otter/canal/cluster [192.168.134.103:11111, 192.168.134.104:11111] [zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running {"active":true,"address":"192.168.134.103:11111"}Canal HA 测试
1)向Mysql中“testdb.person”表中写入数据
mysql> insert into person values (4,"s1",21),(5,"s2",22),(6,"s3",23);
可以观察到Kafka canal_topic中有监控到的数据如下:
[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic canal_topic {"data":[{"id":"4","name":"s1","age":"21"},{"id":"5","name":"s2","age":"22"},{"id":"6","name":"s3","age":"23"}],"database":"testdb","es":1641283997000,"id":2,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(255)","age":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"person","ts":1641283997225,"type":"INSERT"}
2)关闭active Canal Server节点,继续向Mysql表中写入数据
关闭node3 Canal Server:
[root@node3 bin]# ./stop.sh
查看zookeeper “/otter/canal/destinations/examples/running”路径Active的Canal节点:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running {"active":true,"address":"192.168.134.104:11111"}
继续向MySQL中“testdb.person”表中写入数据:
mysql> insert into person values (4,"s1",21),(5,"s2",22),(6,"s3",23);
可以观察写入到Kafka “canal_topic”中数据如下:
[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic canal_topic {"data":[{"id":"4","name":"s1","age":"21"},{"id":"5","name":"s2","age":"22"},{"id":"6","name":"s3","age":"23"}],"database":"testdb","es":1641283997000,"id":2,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(255)","age":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"person","ts":1641283997225,"type":"INSERT"} {"data":[{"id":"4","name":"s1","age":"21"},{"id":"5","name":"s2","age":"22"},{"id":"6","name":"s3","age":"23"}],"database":"testdb","es":1641284121000,"id":2,"isDdl":false,"mysqlType":{"id":"int","name":"varchar(255)","age":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"person","ts":1641284121920,"type":"INSERT"}
经过以上测试,Canal HA 生效。
注意:经过测试Canal HA 在使用zookeeper存储binlog position时,当有一个Canal Server重新启动并切换成Active节点时,每次都会重复读取最后一条数据。使用非HA 本地存储binlog position时,没有此问题。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)