Kafka 消费特定时间戳之后的消息

Kafka 消费特定时间戳之后的消息,第1张

Kafka 消费特定时间戳之后的消息

文章目录
    • 前言
    • 代码实现

前言

我们都知道 kafka 可以根据制定的分区和偏移量来消费。但是最近碰到一个需求,需要把之前一周的消息都拉出来做分析,那么就要根据时间戳来进行消费。

代码实现
    public void seekBeforeTimestamp() {
    	// 初始化 kafka
        KafkaConsumer consumer = init();
        
        Set assignment = new HashSet<>();
        // 在poll()方法内部执行分区分配逻辑,该循环确保分区已被分配。
        // 当分区消息为0时进入此循环,如果不为0,则说明已经成功分配到了分区。
        while (assignment.size() == 0) {
            consumer.poll(Duration.ofMillis(1000));
            // assignment()方法是用来获取消费者所分配到的分区消息的
            assignment = consumer.assignment();
        }

        System.out.println("assignment.size() = " + assignment.size());

        Map timestampToSearch = new HashMap<>();
        for (TopicPartition tp : assignment) {
            // 设置查询分区时间戳的条件:获取当前时间前周之后的消息
            timestampToSearch.put(tp, System.currentTimeMillis() - 7 * 24 * 3600 * 1000);
        }
        
        Map offsets = consumer.offsetsForTimes(timestampToSearch);

        for (TopicPartition tp : assignment) {
            // 获取该分区的offset以及timestamp
            OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
            // 如果offsetAndTimestamp不为null,则证明当前分区有符合时间戳条件的消息
            if (offsetAndTimestamp != null) {
                consumer.seek(tp, offsetAndTimestamp.offset());
            }
        }
    }

上面代码中的 init() 方法为初始化 kafka 配置的方法,此处略,大家根据自己的需求配置初始化参数即可。这里就说明一下,如果要订阅 Topic 中的全部分区的实现方法。

        // 订阅全部分区
        List partitions = Lists.newArrayList();
        List partitionInfos = consumer.partitionsFor(KAFKA_TOPIC);
        System.out.println("partitionInfos.size() = " + partitionInfos.size());
        if (partitionInfos.size() > 0) {
            for (PartitionInfo info : partitionInfos) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        consumer.assign(partitions);

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5671927.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存