相关代码段:
consumerGroup.setConsumer(`testKafka${new Date().getTime()}`, Config.KafkaInput, true, OFFSET.eariest, //latest和earliest的设置 (msg) => { console.log(JSON.stringify(msg)); }, (err) => { console.log("Kafka Error:" + err); } );
auto.offset.reset关乎kafka数据的读取,常用的二个值是latest和earliest
latest和earliest区别1、earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
2、latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。
如果kafka只接收数据,从来没来消费过,程序一开始不要用latest,不然以前的数据就接收不到了。应当先earliest,然后二者都可以
相关参数配置和解释(来源于网络)原文链接:https://blog.csdn.net/weixin_44874132/article/details/117625895
props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);//消费组id props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//latest,earliest props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
在auto.offset.reset标志位为earliest/或者latest,创建的第一个消费者都将在enable.auto.commit的基础之上进行继续消费(如果enable.auto.commit=TRUE,则同一个消费者重启之后不会重复消费之前消费过的消息;enable.auto.commit=FALSE,则消费者重启之后会消费到相同的消息)
auto.offset.reset是对新的消费者而言(不同的groupId对应着不同的消费者)
在auto.offset.reset=earliest情况下,新的消费者(消费者二)将会从头开始消费Topic下的消息,即从offset=0的位置开始消费。
在auto.offset.reset=latest情况下,新的消费者将会从其他消费者最后消费的offset处开始消费Topic下的消息,比如上一个消费者提交的offset是30,则另一个消费组会开始从offset为30的位置开始消费。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)