4.7节中说到,在调用subcribe()方法时传进去一个ConsumerRebalanceListener实例,可以在为消费者分配新分区或移除分区时,可以调用执行一些代码。
- onPartitionsRevoked 方法会在再均衡开始之前和消费者读取消息之后被调用
- onParitionsAssigned 方法会在重新分配分区之后和消费者开始读取消息之前被调用
4.8节中,使用seek()方法,并在消费者启动或分配到新分区时,可以使用seek()方法查找保存在数据库里的偏移量。
示例:
public class SaveOffsetsOnrebalnce implements ConsumerRebalanceListener{ public void onPartitionsRevoked(Collectionpartitions){ commitDBTransaction(); // 1 } public void onParitionsAssigned(Collection partitions){ for (TopicPartition partition : partitions) consumer.seek(partition, getOffsetFromDB(partition)); // 2 } consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer)); consumer.poll(0); for (TopicPartition partition : consumer.assignment()) consumer.seek(partition, getOffsetFromDB(partition)); // 3 while (true){ ConsumerRecords records = consumer.poll(100); ... } }
书上说
2处,从数据库获取偏移量,在分配到新分区的时候,使用seek()方法定位到那些记录。
3处,订阅主题后,开始启动消费者,我们马上调用一次poll()方法,让消费者加入到消费者群组里,并获取分配到的分区,然后马上调用seek()方法定位分区的偏移量。
问题
- 为什么在有 onParitionsAssigned 中调用seek()的情况下(消费者加入后应该会调用),还要在3处seek()第二遍?
- 为什么需要poll(0)?不是很理解 <让消费者加入到消费者群组里>的解释。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)