canal+Kafka实现mysql与redis数据同步

canal+Kafka实现mysql与redis数据同步,第1张

前言

上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更 *** 作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。

架构设计

通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。

Kafka&Zookeeper搭建

首先在 官网 下载Kafka:

下载后解压文件夹,可以看到以下几个文件:

Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。

通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:

Canal搭建

canal搭建具体可以参考上文,这里只讲解具体的参数配置:

找到/conf目录下的canal.properties配置文件:

然后配置instance,找到/conf/example/instance.properties配置文件:

经过上述配置后,就可以启动canal了。

测试

环境搭建完成后,就可以编写代码进行测试。

1、引入pom依赖

2、封装Redis工具类

在application.yml文件增加以下配置:

封装一个 *** 作Redis的工具类:

3、创建MQ消费者进行同步

创建一个CanalBean对象进行接收:

最后就可以创建一个消费者CanalConsumer进行消费:

测试Mysql与Redis同步

mysql对应的表结构如下:

启动项目后,新增一条数据:

可以在控制台看到以下输出:

如果更新呢?试一下Update语句:

同样可以在控制台看到以下输出:

经过测试完全么有问题。

总结

既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等; 尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。

使用阿里开源的 canal 作为数据同步工具。

总的来说有两种方案

本文把两种方式都实现下。如果公司有统一的平台接入binlog的话,canal+mq应该是比较好的解耦的方式。

pom依赖

CanalClientMysql2Redis

RocketmqMysql2Redis

官方文档有给出顺序性的说明 https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

在上面的配置中,我使用的是canal.mq.partition=0,是单分区(rocketmq中应该叫consume queue), 看下控制台可以看到全是发送到queue=0的消费队列

二者数据同步的关键在于mysql数据库中主键,方案是在redis启动时区mysql读取所有表键值存入redis中,往redis写数据是,对redis主键自增并进行读取,若mysql更新失败,则需要及时清除缓存及同步redis主键。

参考代码如下:

String tbname = "login"

//获取mysql表主键值--redis启动时

long id = MySQL.getID(tbname)

//设置redis主键值--redis启动时

redisService.set(tbname, String.valueOf(id))

System.out.println(id)

long l = redisService.incr(tbname)

System.out.println(l)

Login login = new Login()

login.setId(l)

login.setName("redis")

redisService.hmset(String.valueOf(login.getId()), login)

boolean b = MySQL.insert("insert into login(id,name) values(" + login.getId()

+ ",'" + login.getName() + "')")

/**

*

* 队列处理器更新mysql失败:

*

* 清除缓存数据,同时主键值自减

*/

if (!b)

{

redisService.delKeyAndDecr

(tbname, "Login:"+String.valueOf(login.getId()))

// redisService.delete("Login:"+String.valueOf(login.getId()))

//redisService.decr(tbname)

}

System.out.println(redisService.exists("Login:"+String.valueOf(login.getId())))

System.out.println(redisService.get(tbname))


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/7411649.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-05
下一篇 2023-04-05

发表评论

登录后才能评论

评论列表(0条)

保存