python中kafka生产者和消费者实现

python中kafka生产者和消费者实现,第1张

概述安装kafka-python: C:anaconda3Scripts>pip install kafka-python import datetimeimport jsonfrom kafk

安装kafka-python:

C:\anaconda3\Scripts>pip install kafka-python


import datetime
import Json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

'''
使用kafka-python的生产模块
'''
class Kafka_producer():
def __init__(self,bootstrapServers,kafkatopic):
self.bootstrapServers = bootstrapServers
self.kafkatopic = kafkatopic
self.producer = KafkaProducer(bootstrap_servers=self.bootstrapServers)

def sendJsondata(self,params):
try:
parmas_message = Json.dumps(params)
producer = self.producer
future = producer.send(self.kafkatopic,parmas_message.encode('utf-8'))
producer.flush()
recordMetadata = future.get(timeout=10)
print(recordMetadata,datetime.datetime.Now().strftime('%Y%m%d%H%M%s'))
except KafkaError as e:
print(e)

'''
使用kafka-python的消费模块
'''
class Kafka_consumer():
def __init__(self,kafkatopic,groupID):
self.kafkatopic = kafkatopic
self.bootstrapServers = bootstrapServers
self.groupID = groupID
self.consumer = KafkaConsumer(self.kafkatopic,group_ID=self.groupID,bootstrap_servers=self.bootstrapServers)

def consume_data(self):
try:
for message in self.consumer:
yIEld message
except BaseException as e:
print(e)

if __name__ == '__main__':
bootstrapServers = ['ip1:port1','ip2:port2','ip3:port3']
topicStr = '主题'

print('-' * 20)
print('生产者')
print('-' * 20)

producer = Kafka_producer(bootstrapServers,topicStr)
for ID in range(5):
params = '{tst}:{null}---' + str(ID)
producer.sendJsondata(params)

print('-' * 20)
print('消费者')
print('-' * 20)

groupID = 'group名称'
consumer = Kafka_consumer(bootstrapServers,topicStr,groupID)
message = consumer.consume_data()
for i in message:
print(i.value)


 

总结

以上是内存溢出为你收集整理的python中kafka生产者和消费者实现全部内容,希望文章能够帮你解决python中kafka生产者和消费者实现所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1190403.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-03
下一篇 2022-06-03

发表评论

登录后才能评论

评论列表(0条)

保存