前言
上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更 *** 作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。
架构设计
通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。
Kafka&Zookeeper搭建
首先在 官网 下载Kafka:
下载后解压文件夹,可以看到以下几个文件:
Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。
通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:
Canal搭建
canal搭建具体可以参考上文,这里只讲解具体的参数配置:
找到/conf目录下的canal.properties配置文件:
然后配置instance,找到/conf/example/instance.properties配置文件:
经过上述配置后,就可以启动canal了。
测试
环境搭建完成后,就可以编写代码进行测试。
1、引入pom依赖
2、封装Redis工具类
在application.yml文件增加以下配置:
封装一个 *** 作Redis的工具类:
3、创建MQ消费者进行同步
创建一个CanalBean对象进行接收:
最后就可以创建一个消费者CanalConsumer进行消费:
测试Mysql与Redis同步
mysql对应的表结构如下:
启动项目后,新增一条数据:
可以在控制台看到以下输出:
如果更新呢?试一下Update语句:
同样可以在控制台看到以下输出:
经过测试完全么有问题。
总结
既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等; 尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。
使用阿里开源的 canal 作为数据同步工具。
总的来说有两种方案
本文把两种方式都实现下。如果公司有统一的平台接入binlog的话,canal+mq应该是比较好的解耦的方式。
pom依赖
CanalClientMysql2Redis
RocketmqMysql2Redis
官方文档有给出顺序性的说明 https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
在上面的配置中,我使用的是canal.mq.partition=0,是单分区(rocketmq中应该叫consume queue), 看下控制台可以看到全是发送到queue=0的消费队列
二者数据同步的关键在于mysql数据库中主键,方案是在redis启动时区mysql读取所有表键值存入redis中,往redis写数据是,对redis主键自增并进行读取,若mysql更新失败,则需要及时清除缓存及同步redis主键。参考代码如下:
String tbname = "login"
//获取mysql表主键值--redis启动时
long id = MySQL.getID(tbname)
//设置redis主键值--redis启动时
redisService.set(tbname, String.valueOf(id))
System.out.println(id)
long l = redisService.incr(tbname)
System.out.println(l)
Login login = new Login()
login.setId(l)
login.setName("redis")
redisService.hmset(String.valueOf(login.getId()), login)
boolean b = MySQL.insert("insert into login(id,name) values(" + login.getId()
+ ",'" + login.getName() + "')")
/**
*
* 队列处理器更新mysql失败:
*
* 清除缓存数据,同时主键值自减
*/
if (!b)
{
redisService.delKeyAndDecr
(tbname, "Login:"+String.valueOf(login.getId()))
// redisService.delete("Login:"+String.valueOf(login.getId()))
//redisService.decr(tbname)
}
System.out.println(redisService.exists("Login:"+String.valueOf(login.getId())))
System.out.println(redisService.get(tbname))
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)