《Kafka权威指南》——问题1——onParitionsAssigned

《Kafka权威指南》——问题1——onParitionsAssigned,第1张

《Kafka权威指南》——问题1——onParitionsAssigned 四、Kafka消费者——从Kafka读取数据 4.8 从特定偏移量处开始处理数据

4.7节中说到,在调用subcribe()方法时传进去一个ConsumerRebalanceListener实例,可以在为消费者分配新分区或移除分区时,可以调用执行一些代码。

  1. onPartitionsRevoked 方法会在再均衡开始之前和消费者读取消息之后被调用
  2. onParitionsAssigned 方法会在重新分配分区之后和消费者开始读取消息之前被调用

4.8节中,使用seek()方法,并在消费者启动或分配到新分区时,可以使用seek()方法查找保存在数据库里的偏移量。
示例:

public class SaveOffsetsOnrebalnce implements ConsumerRebalanceListener{
	public void onPartitionsRevoked(Collection partitions){
		commitDBTransaction(); // 1
	}
	public void onParitionsAssigned(Collection partitions){
		for (TopicPartition partition : partitions)
			consumer.seek(partition, getOffsetFromDB(partition)); // 2
	}
	consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
	consumer.poll(0);
	for (TopicPartition partition : consumer.assignment())
		consumer.seek(partition, getOffsetFromDB(partition)); // 3

	while (true){
		ConsumerRecords records = consumer.poll(100);
		...
	}
}

书上说
2处,从数据库获取偏移量,在分配到新分区的时候,使用seek()方法定位到那些记录。
3处,订阅主题后,开始启动消费者,我们马上调用一次poll()方法,让消费者加入到消费者群组里,并获取分配到的分区,然后马上调用seek()方法定位分区的偏移量。

问题

  1. 为什么在有 onParitionsAssigned 中调用seek()的情况下(消费者加入后应该会调用),还要在3处seek()第二遍?
  2. 为什么需要poll(0)?不是很理解 <让消费者加入到消费者群组里>的解释。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5699826.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存