kafka topic重新消费

kafka topic重新消费,第1张

kafka topic重新消费
import json
from kafka import KafkaConsumer
from kafka import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='xxx:9092', group_id="group_test")
partition = TopicPartition('topic_name', 4)
offsetStart = 30570
end = 34387
consumer.assign([partition])
consumer.seek(partition, offsetStart)
i = 0
for msg in consumer:
    print(f"msg.offset: {msg.offset} mgs:{msg.value} i: {i}")
    if msg.offset > end:
        break
    i += 1
    task_info = msg.value.decode('utf-8')
    task_info = json.loads(task_info)
    print(task_info)

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5699501.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存