Kafka生产者生产消息过慢导致的并发问题

Kafka生产者生产消息过慢导致的并发问题,第1张

Kafka生产者生产消息过慢导致的并发问题 1、问题背景

我负责的服务A中,有一个监听系统B发送的MQ消息,在打平消息体的结构后存储到本地数据库,并写入缓存的 *** 作,其中消息体的结构是数据库以及缓存key结构的聚合状态。

例如:消息体为四个字段 1、[2,3,4]、[5,6]、7、8时,数据结构数据就根据[2,3,4]和[5,6],完全打平存储到数据中,而缓存为:1_2_5:[7],[8]、1_3_5:[7],[8]等等。

可能产生的问题点:服务A有两台机器,针对缓存key的维度,两条消息是可能存在同一维度的数据的,因此可能因为并发产生写入缓存时数据被覆盖的问题。

考虑到B系统发送消息的频率很低,低于1QPS,因此认为并发存在的可能性很低,只是增加了巡检数据库与缓存数据不一致的情况进行告警提醒功能。

2、问题描述

某日接到告警,检测到缓存数据与数据库不一致,查看MQ发生消息的记录,发现是有一条消息的数据被覆盖了。而且值得关注的是,这两条数据从Kafka发送到消费组的时间是完全一致的。通过查看B系统的发送记录,其实这两条消息并不是同一时刻被发送到kafka的。

3、问题原因

通过问题描述发现,应该是kafka将消息进行了聚合发送,导致了并发问题的产生,追根溯源,就要考虑到kafka消息发送的特性了。

在kafka的消息发送流程,是一个异步发送的过程。共涉及两个线程:main线程和Sender线程和一个线程共享变量RecordAccumulator(计数累加器)。而发送的过程是main线程发消息给RecordAccumulator,Sender从RecordAccumulator拉取消息发送到kafka的broker。

在main线程中的流程:拦截->序列化->分区

send(ProducerRecord方法)Interceptors(过滤器)Serializer(序列化器)Partitioner(分区器)

发送流程:

 而Serder线程发送消息并不是有消息就往broker的,为了提高效率,Serder会在RecordAccumulator中的数据到达一定数量后,才会发送数据,或者一直等待一段直接都没达到要求的数据量,才会发送消息。

相关参数:

batch.size:只有数据累积到batch.size之后,sender才会发送数据

linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。

因此,本次问题的原因就是B系统发送消息的频率过低,导致Sender线程会等数据累积后一起发送,就导致了A系统的并发现象。

4、解决方案

常规对于并发问题的解决方案,一般都是使用同步锁,但是由于场景的特殊性,加锁就必须加在消息体结构下,而这时候的锁粒度对于业务来说是非常巨大的,相当于对于字段1加锁,会导致A系统基本上处于一个单线程的状态,因此采取了相对代价较小,但是并不那么常规的解法,即在巡检到数据异常后,直接使用数据库数据,覆盖缓存数据。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5709532.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存