node2:/root/sbin/kafka#cat kafka_produce.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
class Kafka_producer():
'''
使用kafka的生产模块
'''
def __init__(self, kafkahost,kafkaport, kafkatopic):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
))
def sendjsondata(self, params):
try:
parmas_message = json.dumps(params)
producer = self.producer
producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
producer.flush()
except KafkaError as e:
print e
producer = Kafka_producer("192.168.137.3", 9092, "topic_20211214")
#producer.sendjsondata('a1111111111')
producer.sendjsondata('z9999999999999')
node2:/root/sbin/kafka#cat cosumer1.py
#!/usr/bin/env python
# coding=utf-8
from kafka import *
from kafka import KafkaConsumer
import datetime,time
import json
def get_kafka_reviews(bootstrap_servers,topics):
# print type(self.bootstrap_servers)
consumer = KafkaConsumer(bootstrap_servers=[bootstrap_servers], group_id='con01', auto_offset_reset='latest', enable_auto_commit=True)
consumer.subscribe(topics=(topics)) #订阅要消费的主题
# print consumer.topics()
# print "+++++++",consumer.position(TopicPartition(topic=u'ctripapi_duplicateddata_review', partition=1)) #获取当前主题的最新偏移量
review_list =[]
for message in consumer:
print message
# str_time = datetime.datetime.fromtimestamp(message.timestamp / 1000)
# print message.timestamp
# print type(message.timestamp)
# #print message.topic ,message.timestamp,message.value
#
#
# print message.topic, str_time, message.value
# print type(message.value)
# dict1 = json.loads(message.value)
# print dict1
# print type(dict1)
# print '-------------------'
# for key in dict1:
#
# print str_time,key,dict1[key]
# print '-------------------'
# #print '====%s:%d:%d:key-%s value=%s=='%(message.topic,message.partition,message.offset,message.key,message.value)
# #review_list.append(message.value)
#return review_list
print get_kafka_reviews('192.168.137.3:9092','topic_20211214')
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)