前言
上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更 *** 作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。
架构设计
通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。
Kafka&Zookeeper搭建
首先在 官网 下载Kafka:
下载后解压文件夹,可以看到以下几个文件:
Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。
通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:
Canal搭建
canal搭建具体可以参考上文,这里只讲解具体的参数配置:
找到/conf目录下的canal.properties配置文件:
然后配置instance,找到/conf/example/instance.properties配置文件:
经过上述配置后,就可以启动canal了。
测试
环境搭建完成后,就可以编写代码进行测试。
1、引入pom依赖
2、封装Redis工具类
在application.yml文件增加以下配置:
封装一个 *** 作Redis的工具类:
3、创建MQ消费者进行同步
创建一个CanalBean对象进行接收:
最后就可以创建一个消费者CanalConsumer进行消费:
测试Mysql与Redis同步
mysql对应的表结构如下:
启动项目后,新增一条数据:
可以在控制台看到以下输出:
如果更新呢?试一下Update语句:
同样可以在控制台看到以下输出:
经过测试完全么有问题。
总结
既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等; 尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。
因为自己项目中需要用到mysql数据同步到es中,查找了相关资料最后决定用canal来做,所以便有了本文,下面一起来看如何使用canal吧
根据 https://github.com/alibaba/canal 上的原理解释,我们知道 canal 会模拟 mysql slave 的交互协议,伪装自己为 mysql slave,然后向 mysql master 发送 dump 协议。
mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal),然后 canal 解析 binary log 对象(原始为 byte流)。
经 canal 解析过的对象,我们使用起来就非常的方便了。
再根据 https://github.com/alibaba/canal/releases 提供的版本信息,你会发现 canal 其实相当于一个中间件,专门用来解析 MySQL 的 binlog 日志。canal 解析好了之后,会封装成一个数据对象,通过 protobuf3.0 协议进行交互,让 canal 客户端进行消费。
根据上面的解释,以及 canal 提供的版本信息,我们在使用 canal 的时候,首选要安装一个 canal.deployer-1.1.4.tar.gz 进行解析 MySQL 的 binlog 日志。
下载后,复制 canal.deployer-1.1.4.tar.gz 到 MySQL 主机上,比如放在 /usr/local/soft/目录下。然后依次执行下面的命令:
然后修改 canal 的配置文件 vim conf/example/instance.properties
这三项改成你自己的,比如我的配置如下:
然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)
接着,我们检查一下 MySQL 的配置。确定版本和是否开启了 binlog 日志,以及日志格式。
canal 支持 binlog 格式为 ROW 的模式。如果你没开启 binlog,并且格式是非 row 的,建议修改一下 mysql 的配置文件。
执行 mysql –help | grep my.cnf 找到 mysql 的 my.cnf 文件。
执行 vim /etc/my.cnf 命令。添加下面 3 个配置。
然后保存并退出。
接着执行 sudo service mysqld restart 重启 MySQL。
需要注意的是你的 mysql 用户,必须要有 REPLICATION SLAVE 权限。该权限授予 slave 服务器以该账户连接 master 后可以执行 replicate *** 作的权利。
如果没有权限,则使用 root 账户登录进 MySQL,执行下面的语句,创建用户,分配权限。
MySQL 启动后,就可以开启 canal 服务了。
开启后,观察 canal 服务的日志,确保服务正常。
查看 canal 的日志
确定没有问题后,开始编写我们的测试程序。
pom.xml 中导入下面的依赖。
使用JAVA进行测试
然后执行 main 方法。你再修改修改 MySQL 中的数据,你会发现所有改变都同步过来了。上面是使用的Java代码进行运行,如果想用canal.adapter来进行运行可以下载
放入服务器中,依次执行下面命令
然后修改配置文件 :
然后将需要运行存储到es的的yml文件放入到
目录下。例如:
然后开启canal-adapter服务
/usr/local/soft/canal-adapter/bin/startup.sh
查看 canal-adapter 的日志,确定没有问题后修改数据 就可以同步到es了
注意:
1、canal-adapter自带mysql连接使用的5.x的,如果自己安装的是高版本的mysql需要自己去/usr/local/soft/canal-adapter/lib增加对应的jar包
2、因项目中同步es使用的sql中有数据库中没有的字段,导致原生程序一直报异常,后修改源码中
加了一个判断后才可以
3、es中使用的date字段类型和数据库中不一致,所以这里又修改了部分源码兼容我们项目中的类型
可以根据各自情况修改。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)