1、安装kafka的python包
pip install kafka-python
2、生产kafka数据
import json from kafka import KafkaProducer, KafkaConsumer producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:xxxx']) # 此处传入kafka的地址和端口 msg = json.dumps(data).encode() # 必须要编码为字节类型的数据,不可以用utf-8 producer.send(topic_name, value=msg) # 此处传入kafka的topic
3、消费kafka
import json from kafka import KafkaProducer, KafkaConsumer consumer = KafkaConsumer(topic_name, bootstrap_servers=['xx.xx.xx.xx:xxxx'], auto_offset_reset='earliest', # 设置偏移方式,当参数值为earliest时从上次未消费的地方开始消费;当参数值为latest时从最新数据开始消费 group_id='dev', consumer_timeout_ms=1000 # 该参数表示1000ms内如果没有新的数据产生则停止消费,否则会一直循环等待) res = [] for msg in consumer: msg = json.loads(msg.value) res.append(msg)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)