- 前言
- 代码实现
我们都知道 kafka 可以根据制定的分区和偏移量来消费。但是最近碰到一个需求,需要把之前一周的消息都拉出来做分析,那么就要根据时间戳来进行消费。
代码实现public void seekBeforeTimestamp() { // 初始化 kafka KafkaConsumerconsumer = init(); Set assignment = new HashSet<>(); // 在poll()方法内部执行分区分配逻辑,该循环确保分区已被分配。 // 当分区消息为0时进入此循环,如果不为0,则说明已经成功分配到了分区。 while (assignment.size() == 0) { consumer.poll(Duration.ofMillis(1000)); // assignment()方法是用来获取消费者所分配到的分区消息的 assignment = consumer.assignment(); } System.out.println("assignment.size() = " + assignment.size()); Map timestampToSearch = new HashMap<>(); for (TopicPartition tp : assignment) { // 设置查询分区时间戳的条件:获取当前时间前周之后的消息 timestampToSearch.put(tp, System.currentTimeMillis() - 7 * 24 * 3600 * 1000); } Map offsets = consumer.offsetsForTimes(timestampToSearch); for (TopicPartition tp : assignment) { // 获取该分区的offset以及timestamp OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp); // 如果offsetAndTimestamp不为null,则证明当前分区有符合时间戳条件的消息 if (offsetAndTimestamp != null) { consumer.seek(tp, offsetAndTimestamp.offset()); } } }
上面代码中的 init() 方法为初始化 kafka 配置的方法,此处略,大家根据自己的需求配置初始化参数即可。这里就说明一下,如果要订阅 Topic 中的全部分区的实现方法。
// 订阅全部分区 Listpartitions = Lists.newArrayList(); List partitionInfos = consumer.partitionsFor(KAFKA_TOPIC); System.out.println("partitionInfos.size() = " + partitionInfos.size()); if (partitionInfos.size() > 0) { for (PartitionInfo info : partitionInfos) { partitions.add(new TopicPartition(info.topic(), info.partition())); } } consumer.assign(partitions);
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)