上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更 *** 作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。
架构设计canal是一个伪装成slave订阅mysql的binlog,实现数据同步的中间件。上一篇文章 canal入门 中简单介绍了使用方式,即tcp模式;其实canal也是支持直接发送到MQ中,比如:Kafka、RocketMQ、RabbitMQ。本文采用Kafka讲解,实现mysql与redis之间的数据同步。
通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。
Kafka&Zookeeper搭建首先在官网下载Kafka:
下载后解压文件夹,可以看到以下几个文件:
Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。
通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:
# 命令常见一个canaltopic 队列kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopicCanal搭建
canal搭建具体可以参考上文,这里只讲解具体的参数配置:
找到/conf目录下的canal.properties配置文件:
# tcp, kafka, RocketMQ 这里选择kafka模式canal.serverMode = kafka# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况canal.instance.parser.parallelThreadSize = 16# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口canal.mq.servers = 127.0.0.1:9092# 配置instance,在conf目录下要有example同名的目录,可以配置多个canal.destinations = example
然后配置instance,找到/conf/example/instance.properties配置文件:
## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)# canal.instance.mysql.slaveId=0 # position infocanal.instance.master.address=127.0.0.1:3306# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlogcanal.instance.master.journal.name=mysql-bin.000006canal.instance.master.position=4596# 账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=Canal@****canal.instance.connectionCharset = UTF-8#MQ队列名称canal.mq.topic=canaltopic#单队列模式的分区下标canal.mq.partition=0
经过上述配置后,就可以启动canal了。
测试环境搭建完成后,就可以编写代码进行测试。
1、引入pom依赖2、封装Redis工具类org.springframework.kafka spring-kafkaorg.springframework.boot spring-boot-starter-data-redis
在application.yml文件增加以下配置:
spring: redis: host: 127.0.0.1 port: 6379 database: 0 password: 123456
封装一个 *** 作Redis的工具类:
@Componentpublic class RedisClient { @Resource private StringRedisTemplate stringRedisTemplate; public void setString(String key, String value) { setString(key, value, null); } public void setString(String key, String value, Long timeOut) { stringRedisTemplate.opsForValue().set(key, value); if (timeOut != null) { stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS); } } public String getString(String key) { return stringRedisTemplate.opsForValue().get(key); } public Boolean deleteKey(String key) { return stringRedisTemplate.delete(key); }}3、创建MQ消费者进行同步
在application.yml配置文件加上kafka的配置信息:spring: kafka: # Kafka服务地址 bootstrap-servers: 127.0.0.1:9092 consumer: # 指定一个默认的组名 group-id: consumer-group1 #序列化反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringDeserializer value-serializer: org.apache.kafka.common.serialization.StringDeserializer # 批量抓取 batch-size: 65536 # 缓存容量 buffer-memory: 524288
创建一个CanalBean对象进行接收:
public class CanalBean { //数据 private Listdata; //数据库名称 private String database; private long es; //递增,从1开始 private int id; //是否是DDL语句 private boolean isDdl; //表结构的字段类型 private MysqlType mysqlType; //UPDATE语句,旧数据 private String old; //主键名称 private List pkNames; //sql语句 private String sql; private SqlType sqlType; //表名 private String table; private long ts; //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等 private String type; //getter、setter方法} public class MysqlType { private String id; private String commodity_name; private String commodity_price; private String number; private String description; //getter、setter方法} public class SqlType { private int id; private int commodity_name; private int commodity_price; private int number; private int description;}
最后就可以创建一个消费者CanalConsumer进行消费:
@Slf4j@Componentpublic class CanalConsumer {@Resource private RedisClient redisClient;@KafkaListener(topics = "canaltopic") public void receive(ConsumerRecord, ?> consumer) { String value = (String) consumer.value(); log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value); //转换为javaBean CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class); //获取是否是DDL语句 boolean isDdl = canalBean.hasDdl(); //获取类型 String type = canalBean.getType(); //不是DDL语句 if (!isDdl) { List测试Mysql与Redis同步tbCommodityInfos = canalBean.getData(); //过期时间 long TIME_OUT = 600L; if ("INSERT".equals(type)) { //新增语句 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { String id = tbCommodityInfo.getId(); log.info("新增数据到redis, id: {}, data: {}", id, JSONObject.toJSonString(tbCommodityInfo)); //新增到redis中,过期时间是10分钟 redisClient.setString(id, JSONObject.toJSonString(tbCommodityInfo), TIME_OUT); log.info("从redis获取数据 result: {}", JSONObject.toJSonString(redisClient.getString(id))); } } else if ("UPDATE".equals(type)) { //更新语句 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { String id = tbCommodityInfo.getId(); log.info("修改数据到redis, id: {}, data: {}", id, JSONObject.toJSonString(tbCommodityInfo)); //更新到redis中,过期时间是10分钟 redisClient.setString(id, JSONObject.toJSonString(tbCommodityInfo), TIME_OUT); } } else { //删除语句 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { String id = tbCommodityInfo.getId(); log.info("删除数据从redis, id: {}", id); //从redis中删除 redisClient.deleteKey(id); } } } }}
mysql对应的表结构如下:
CREATE TABLE `tb_commodity_info` ( `id` varchar(32) NOT NULL, `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称', `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格', `number` int(10) DEFAULT '0' COMMENT '商品数量', `description` varchar(2048) DEFAULT '' COMMENT '商品描述', PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
启动项目后,新增一条数据:
INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉烧包', '3.99', '3', '又大又香的叉烧包,老人小孩都喜欢');
可以在控制台看到以下输出:
2022-01-02 18:12:51.317 INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer : 新增数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"叉烧包","commodity_price":"3.99","description":"又大又香的叉烧包,老人小孩都喜欢","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}2022-01-02 18:12:51.320 INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer : 从redis获取数据 result: "{"commodity_name":"叉烧包","commodity_price":"3.99","description":"又大又香的叉烧包,老人小孩都喜欢","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}"
如果更新呢?试一下Update语句:
UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='很便宜的青菜包呀,不买也开看看了喂' WHERe `id`='3e71a81fd80711eaaed600163e046cc3';
同样可以在控制台看到以下输出:
2022-01-02 18:14:44.613 INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer : topic名称:canaltopic,key:null,分区位置:0,下标:6,value:{"data":[{"id":"3e71a81fd80711eaaed600163e046cc3","commodity_name":"青菜包","commodity_price":"3.99","number":"3","description":"很便宜的青菜包呀,不买也开看看了喂"}],"database":"study","es":1641118484000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"varchar(36)","number":"int(10)","description":"varchar(2048)"},"old":[{"commodity_name":"叉烧包","description":"又大又香的叉烧包,老人小孩都喜欢"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":12,"number":4,"description":12},"table":"tb_commodity_info","ts":1641118484602,"type":"UPDATE"}2022-01-02 18:14:44.616 INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer : 修改数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"青菜包","commodity_price":"3.99","description":"很便宜的青菜包呀,不买也开看看了喂","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}
经过测试完全么有问题。
总结既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等;尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。
如果你觉得这篇文章对你有用,点个赞吧~ 你的点赞是我创作的最大动力~想第一时间看到我更新的文章,可以微信搜索公众号「CodingCode」。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)