canal+Kafka实现mysql与redis数据同步

canal+Kafka实现mysql与redis数据同步,第1张

前言

上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更 *** 作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现mysql与redis之间的数据同步。

架构设计

通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。

Kafka&Zookeeper搭建

首先在 官网 下载Kafka:

下载后解压文件夹,可以看到以下几个文件:

Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。

通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:

Canal搭建

canal搭建具体可以参考上文,这里只讲解具体的参数配置:

找到/conf目录下的canal.properties配置文件:

然后配置instance,找到/conf/example/instance.properties配置文件:

经过上述配置后,就可以启动canal了。

测试

环境搭建完成后,就可以编写代码进行测试。

1、引入pom依赖

2、封装Redis工具类

在application.yml文件增加以下配置:

封装一个 *** 作Redis的工具类:

3、创建MQ消费者进行同步

创建一个CanalBean对象进行接收:

最后就可以创建一个消费者CanalConsumer进行消费:

测试Mysql与Redis同步

mysql对应的表结构如下:

启动项目后,新增一条数据:

可以在控制台看到以下输出:

如果更新呢?试一下Update语句:

同样可以在控制台看到以下输出:

经过测试完全么有问题。

总结

既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等; 尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。

Flink 任务 failover 之后,可能会重复写出数据到 Sink 中,你们公司是怎么做到端对端 exactly-once 的?

端对端 exactly-once 有 3 个条件:

⭐ Source 引擎可以重新消费,比如 Kafka 可以重置 offset 进行重新消费

⭐ Flink 任务配置 exactly-once,保证 Flink 任务 State 的 exactly-once

⭐ Sink 算子支持两阶段或者可重入,保证产出结果的 exactly-once

其中前两项一般大多数引擎都支持,我们需要关注的就是第 3 项,目前有两种常用方法:

⭐ Sink 两阶段:由于两阶段提交是随着 Checkpoint 进行的,假设 Checkpoint 是 5min 做一次,那么数据对下游消费方的可见性延迟至少也是 5min,所以会有数据延迟等问题,目前用的比较少。

⭐ Sink 支持可重入:举例:

⭐ Sink 为 MySQL:可以按照 key update 数据

⭐ Sink 为 Druid:聚合类型可以选用 longMax

⭐ Sink 为 ClickHouse:查询时使用 longMax 或者使用 ReplacingMergeTree 表引擎将重复写入的数据去重,这里有小伙伴会担心 ReplacingMergeTree 会有性能问题,但是博主认为其实性能影响不会很大,因为 failover 导致的数据重复其实一般情况下是小概率事件,并且重复的数据量也不会很大,也只是一个 Checkpoint 周期内的数据重复,所以使用 ReplacingMergeTree 是可以接受的)

⭐ Sink 为 Redis:按照 key 更新数据

其他解答:Flink状态一致性、端到端的精确一次保证

状态一致性:当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?       对于流处理内部来说,所谓的状态一致性,其实就是我们所说的计算结果保证准确。在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。    一条数据不应该丢失,也不应该重复计算

一致性可以分为 3 个级别: at-most-once(最多一次):计数结果可能丢失

at-least-once (至少一次):计数程序在发生故障后可能多算,但是绝不会少算。

exactly-once (精确一次):系统保证在发生故障后得到的计数结果与正确值一致。

数据流(DataStream)内部保证exactly-once (精确一次)的方法:Flink 使用了一种轻量级快照机制 ---- 检查点(checkpoint)来保证 exactly-once 语义

有状态应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。

端到端保证一致性:

内部保证  —— 依赖 checkpoint

source 端  —— 需要外部源可重设数据的读取位置

sink 端  —— 需要保证从故障恢复时,数据不会重复写入外部系统

而对于 sink 端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。

幂等 *** 作:是说一个 *** 作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。 例如Hashmap 的写入插入 *** 作是幂等的 *** 作,重复写入,写入的结果还一样。

事务写入:构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中

对于事务性写入,具体又有两种实现方式: 预写日志(WAL)和两阶段提交(2PC) 。DataStream API 提供了 GenericWriteAheadSink 模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。其中 预写日志(WAL)只能保证至少一次精确。

Flink+Kafka 端到端状态一致性的保证

内部 —— 利用 checkpoint 机制,把状态存盘,发生故障的时候可以恢复, 保证内部的状态一致性

source —— kafka consumer 作为 source,可以将偏移量保存下来,如果后 续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据, 保证一致性

sink —— kafka producer 作为 sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction

由于端到端保证一致性需要用到两阶段提交(2PC)TwoPhaseCommitSinkFunction,我们来了解一下两阶段提交的方式:

第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”

jobmanager 触发 checkpoint *** 作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager

sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据

jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成

sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据

外部 kafka 关闭事务,提交的数据可以正常消费了。

我们也可以看到,如果宕机需要通过 StateBackend 进行恢复,只能恢复所有确认提交的 *** 作,之于有关后端状态的选择,后面再单独聊聊


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/8375487.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-15
下一篇 2023-04-15

发表评论

登录后才能评论

评论列表(0条)

保存