python对接kafka有两个常用库:kafka-python,pykafka,前者github star较多,所以选用了前者。
生产者:
from kafka import KafkaProducer import json import datetime topic = 'test' producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda m: json.dumps(m).encode("utf-8")) # 参数bootstrap_servers:指定kafka连接地址 # 参数value_serializer:指定序列化的方式,我们定义json来序列化数据,当字典传入kafka时自动转换成bytes # 用户密码登入参数 # security_protocol="SASL_PLAINTEXT" # sasl_mechanism="PLAIN" # sasl_plain_username="maple" # sasl_plain_password="maple" for i in range(2): data = {"num": i, "ts": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} producer.send(topic, data) producer.close()
消费者:
import time from kafka import KafkaConsumer from multiprocessing import Process def run(process, group_id, *topics): consumer = KafkaConsumer(*topics, bootstrap_servers=['127.0.0.1:9092'], group_id=group_id, auto_offset_reset="earliest") # 参数bootstrap_servers:指定kafka连接地址 # 参数group_id:如果2个程序的topic和group_id相同,那么他们读取的数据不会重复,2个程序的topic相同,group_id不同,那么他们各自消费相同的数据,互不影响 # 参数auto_offset_reset:默认为latest表示offset设置为当前程序启动时的数据位置,earliest表示offset设置为0,在你的group_id第一次运行时,还没有offset的时候,给你设定初始offset。一旦group_id有了offset,那么此参数就不起作用了 for msg in consumer: print(msg) recv = "%s:%d:%d: key=%s value=%s process=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value, process) print(recv) time.sleep(1) # consumer.close() if __name__ == '__main__': p1 = Process(target=run, args=('进程1', 'group3', 'test')) # p2 = Process(target=run, args=('进程2', 'group1', 'test')) p1.start() # p2.start() print('主进程')
消费者在消费时,如果程序意外退出(如ctrl+c),offset未能及时保存,再次重启时会导致重复消费问题。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)