kafka消费者java版本读取不到消息怎么办

kafka消费者java版本读取不到消息怎么办,第1张

3 启动服务

31 启动zookeeper

启动zk有两种方式,第一种是使用kafka自己带的一个zk。

bin/zookeeper-server-startsh config/zookeeperproperties&

另一种是使用其它的zookeeper,可以位于本机也可以位于其它地址。这种情况需要修改config下面的sercerproperties里面的zookeeper地址

。例如zookeeperconnect=102024179:2181

32 启动 kafka

bin/kafka-server-startsh config/serverproperties

4创建topic

bin/kafka-topicssh --create --zookeeper 102024179:2181 --replication-factor 1 --partitions 1 --topic test

创建一个名为test的topic,只有一个副本,一个分区。

通过list命令查看刚刚创建的topic

bin/kafka-topicssh -list -zookeeper 102024179:2181

5启动producer并发送消息启动producer

bin/kafka-console-producersh --broker-list localhost:9092 --topic test

启动之后就可以发送消息了

比如

test

hello boy

按Ctrl+C退出发送消息

6启动consumer

bin/kafka-console-consumersh --zookeeper 102024179:2181 --topic test --from-beginning

启动consumer之后就可以在console中看到producer发送的消息了

可以开启两个终端,一个发送消息,一个接受消息。

如果这样都不行的话,查看zookeeper进程和kafka的topic,一步步排查原因吧。

failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member This means that the time between subsequent calls to poll() was longer than the configured maxpollintervalms, which typically implies that the poll loop is spending too much time message processing You can address this either by increasing maxpollintervalms or by reducing the maximum size of batches returned in poll() with maxpollrecords

1、有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者已经下线。

修改参数

2、 kafkaConsumerassign() 点对点消费方式 和 subscribe()订阅消费方式 ,使用了相同的消费组,也就是他们group id 相同时,group coordinator无法识别具有相同消费组group id的consumer,直接抛异常 CommitFailedException

如果是这种情况,提示的解决方法 maxpollintervalms,maxpollrecord 都没用,无法解决,只能修改消费组id

我们搭建好kafka集群后,对其进行性能测试。遇到这种场景:我搭建好了三台kakfa集群,然后停掉其中一台kafka,然后集群是否能正常工作呢?

kafka集群:

19216818353 节点1

19216818355 节点2

19216818362 节点3

创建的主题是hw_data:

三个分区、三个副本

三个节点启动以后,集群正常工作,正常生产、正常消费。

但是当我们停掉其中一个节点后,发现集群不能正常工作了。

我的这个问题的凶手是:__consumer_offsets

首先查看系统的_offsets副本是几个?

我们看到 副本数为1,这就是导致我们当一个节点宕机后集群无法正常工作的原因!

__consumer_offsets这个topic是由kafka自动创建的,默认50个,但是都存在一台kafka服务器上,这是不是就存在很明显的单点故障?

经测试,如果将存储consumer_offsets的这台机器kill掉,所有的消费者都停止消费了。

__consumer_offsets是一个非常重要的topic,我们怎么能允许它只有一个副本呢?这样就存在单点故障,也就是如果该分区所在的集群宕机了的话,我们的消费者就无法正常消费数据了。

1修改系统_offsets副本数为3

修改kafka的核心配置文件serverproperties

将numpartitions参数(默认为1)修改为3,

另外需要添加autocreatetopicsenable=true ,如果没有对应的topic可以主动创建topic。

由于__consumer_offsets是kafka默认的主题,无法删除,我们可以删除zookeeper中的__consumer_offsets。

进入zookeeper/bin目录执行/zkClish

先将集群停掉

在重新启动zookeeper和kafka

再次查看__consumer_offsets。发现副本数已经是3

在节点1修改__consumer_offsets后,在节点2和节点3查看__consumer_offsets发现副本数已经变为3,不需要再进行修改。

我的问题到这里就解决了:

启动kafka集群三个节点,然后停掉其中任意一个节点,集群是可以正常工作的。

查看创建的topic的副本数是否为1

进入zookeeper的bin目录,执行/zkClish

删除brokers下的ids

重启kafka,应该就可以了。

我首先使用rmr /brokers/topics/topicname删除了zookeeper中的topic ,这样查看kafka中的topic时,显示已经没有了,

但是我又创建我刚刚删除的主题时,发现他已经显示被标记删除。

我的配置文件已经设置了删除topic为true,但是我删除的主题还是显示被标记删除,没有真正的删除

解决办法:

进入zookeeper的bin目录,执行/zkClish

删除config、brokers、admin下的对应主题

数据丢失是一件非常严重的事情事,针对数据丢失的问题我们需要有明确的思路来确定问题所在,针对这段时间的总结,我个人面对kafka 数据丢失问题的解决思路如下:

1、是否真正的存在数据丢失问题,比如有很多时候可能是其他同事 *** 作了测试环境,所以首先确保数据没有第三方干扰。

2、理清你的业务流程,数据流向,数据到底是在什么地方丢失的数据,在kafka 之前的环节或者kafka之后的流程丢失?比如kafka的数据是由flume提供的,也许是flume丢失了数据,kafka 自然就没有这一部分数据。

3、如何发现有数据丢失,又是如何验证的。从业务角度考虑,例如:教育行业,每年高考后数据量巨大,但是却反常的比高考前还少,或者源端数据量和目的端数据量不符

4、 定位数据是否在kafka之前就已经丢失还事消费端丢失数据的

kafka支持数据的重新回放功能(换个消费group),清空目的端所有数据,重新消费。如果是在消费端丢失数据,那么多次消费结果完全一模一样的几率很低。如果是在写入端丢失数据,那么每次结果应该完全一样(在写入端没有问题的前提下)。

5、kafka环节丢失数据,常见的kafka环节丢失数据的原因有:

如果autocommitenable=true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset *** 作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。网络负载很高或者磁盘很忙写入失败的情况下,没有自动重试重发消息。没有做限速处理,超出了网络带宽限速。kafka一定要配置上消息重试的机制,并且重试的时间间隔一定要长一些,默认1秒钟并不符合生产环境(网络中断时间有可能超过1秒)。如果磁盘坏了,会丢失已经落盘的数据

单批数据的长度超过限制会丢失数据,报kafkacommonMessageSizeTooLargeException异常解决:

6、partition leader在未完成副本数follows的备份时就宕机的情况,即使选举出了新的leader但是已经push的数据因为未备份就丢失了!kafka是多副本的,当你配置了同步复制之后。多个副本的数据都在PageCache里面,出现多个副本同时挂掉的概率比1个副本挂掉的概率就很小了。(官方推荐是通过副本来保证数据的完整性的)

7、kafka的数据一开始就是存储在PageCache上的,定期flush到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache上的数据就丢失了。可以通过logflushintervalmessages和logflushintervalms来配置flush间隔,interval大丢的数据多些,小会影响性能但在08版本,可以通过replica机制保证数据不丢,代价就是需要更多资源,尤其是磁盘资源,kafka当前支持GZip和Snappy压缩,来缓解这个问题 是否使用replica取决于在可靠性和资源代价之间的balance。

同时kafka也提供了相关的配置参数,来让你在性能与可靠性之间权衡(一般默认):

当达到下面的消息数量时,会将数据flush到日志文件中。默认10000

当达到下面的时间(ms)时,执行一次强制的flush *** 作。intervalms和intervalmessages无论哪个达到,都会flush。默认3000ms

检查是否需要将日志flush的时间间隔

high-level版本已经封装了对partition和offset的管理,默认是会定期自动commit offset,这样可能会丢数据的low-level版本自己管理spout线程和partition之间的对应关系和每个partition上的已消费的offset(定期写到zk)并且只有当这个offset被ack后,即成功处理后,才会被更新到zk,所以基本是可以保证数据不丢的即使spout线程crash(崩溃),重启后还是可以从zk中读到对应的offset

不能让内存的缓冲池太满,如果满了内存溢出,也就是说数据写入过快,kafka的缓冲池数据落盘速度太慢,这时肯定会造成数据丢失。尽量保证生产者端数据一直处于线程阻塞状态,这样一边写内存一边落盘。异步写入的话还可以设置类似flume回滚类型的batch数,即按照累计的消息数量,累计的时间间隔,累计的数据大小设置batch大小。

不过异步写入丢失数据的情况还是难以控制还是得稳定整体集群架构的运行,特别是zookeeper,当然正对异步数据丢失的情况尽量保证broker端的稳定运作吧

kafka不像hadoop更致力于处理大量级数据,kafka的消息队列更擅长于处理小数据。针对具体业务而言,若是源源不断的push大量的数据(eg:网络爬虫),可以考虑消息压缩。但是这也一定程度上对CPU造成了压力,还是得结合业务数据进行测试选择

topic设置多分区,分区自适应所在机器,为了让各分区均匀分布在所在的broker中,分区数要大于broker数。分区是kafka进行并行读写的单位,是提升kafka速度的关键。

关闭自动更新offset,等到数据被处理后再手动跟新offset。

在消费前做验证前拿取的数据是否是接着上回消费的数据,不正确则return先行处理排错。

一般来说zookeeper只要稳定的情况下记录的offset是没有问题,除非是多个consumer group 同时消费一个分区的数据,其中一个先提交了,另一个就丢失了。

kafka的数据一开始就是存储在PageCache上的,定期flush到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache上的数据就丢失了。这个是总结出的到目前为止没有发生丢失数据的情况

强行kill线程,导致消费后的数据,offset没有提交,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(010x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。

如果在close之前调用了consumerunsubscribe()则有可能部分offset没提交,下次重启会重复消费。

kafka数据重复 kafka设计的时候是设计了(at-least once)至少一次的逻辑,这样就决定了数据可能是重复的,kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。

kafka的数据重复一般情况下应该在消费者端,这时logcleanuppolicy = delete使用定期删除机制。

遇到了需要暂停消费的场景,使用pause()方法暂停消费,resume()方法恢复消费,基于springboot的demo如下:

assign 和 subscribe 的区别 :assign方法由用户直接手动consumer实例消费哪些具体分区,assign的consumer不会拥有kafka的group management机制,也就是当group内消费者数量变化的时候不会有reblance行为发生。assign的方法不能和subscribe方法同时使用。

通过switchOn变量来手动的控制暂停跟恢复

earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

简单来说,如果partition里已经有数据,但还没有消费,earliest就会从没消费的起始点来消费,反观latest就不会去消费;如果partition已经有已消费的数据,再放新的数据进去,那么它们都会从新的数据开始消费。

offset会保存在kafka内部,一开始发送数据到kafka的时候就有offset,只是有没有提交而已。而使用spring-kafka时,客户端在监听topic的时候,它有2种提交offset的方式:

1、自动提交,设置enableautocommit=true,更新的频率根据参数autocommitintervalms来定。这种方式也被称为at most once,fetch到消息后就可以更新offset,无论是否消费成功。

2、手动提交,设置enableautocommit=false,这种方式称为at least once。fetch到消息后,等消费完成再调用方法consumercommitSync(),手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。

spring-kafka版本255,官网 >

以上就是关于kafka消费者java版本读取不到消息怎么办全部的内容,包括:kafka消费者java版本读取不到消息怎么办、kafka消费相同消费组问题、当kafka集群其中一台宕机后,会怎么样等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/10163173.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存