生产者:
import time,json from kafka import KafkaProducer import random producer = KafkaProducer(bootstrap_servers=':9092,10.9:9092,10.180:9092', value_serializer=lambda m: json.dumps(m).encode('utf-8')) dlp_log =[ { "AccidentId ": "11111-3ce5" }, { "AccidentId ": "22222-3ce5" } ] i=1 while i >0: data = dlp_log[random.randint(0,1)] print("i:",data) producer.send(topic="filterAnalysisResultIncident",value=data) time.sleep(2) producer.flush() producer.close()
消费者:
from kafka import KafkaConsumer import json consumer = KafkaConsumer('filcident', group_id='f_consumers1', bootstrap_servers=['10.1:9092', '10.1:9092', '10.19:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')), enable_auto_commit=False, auto_offset_reset="latest") try: for msg in consumer: try: print("value:",type(msg.value),msg.value) #print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)) except Exception as e: print("msg.value error:",e) except KeyboardInterrupt as e: print("KeyboardInterrupt:",e)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)