import json from kafka import KafkaConsumer from kafka import TopicPartition consumer = KafkaConsumer(bootstrap_servers='xxx:9092', group_id="group_test") partition = TopicPartition('topic_name', 4) offsetStart = 30570 end = 34387 consumer.assign([partition]) consumer.seek(partition, offsetStart) i = 0 for msg in consumer: print(f"msg.offset: {msg.offset} mgs:{msg.value} i: {i}") if msg.offset > end: break i += 1 task_info = msg.value.decode('utf-8') task_info = json.loads(task_info) print(task_info)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)