import time
import json
import requests
from kafka import KafkaProducer
from kafka import KafkaConsumer
class KafkaOp:
def __init__(self, servers, topic):
self.servers = servers
self.topic = topic
self.producer = KafkaProducer(bootstrap_servers=self.servers)
print("init")
def close(self):
if self.producer is not None:
self.producer.close()
self.producer = None
def produce(self, msg):
if self.producer is not None:
if not isinstance(msg, bytes):
msg = msg.encode("utf-8") # 将str类型转换为bytes类型
self.producer.send(topic=self.topic, value=msg)
self.producer.flush()
print("{} send done".format(msg))
else:
print("no self.producer")
def consume(self):
consumer = KafkaConsumer(self.topic, bootstrap_servers=[self.servers, ],
auto_offset_reset='earliest',
# auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms=1000,
group_id="jgy2"
) # 同一group_id消费者只能消费一次msg
for record in consumer:
start_time = time.time()
try:
if record is not None:
print("record", record)
valuestr = record.value.decode() #从ibytes转为string类型
value = json.loads(valuestr)
except Exception as e:
print('have except {}'.format(e))
finally:
print("cost time{:.4f} s".format(time.time()-start_time))
consumer.close()
ko = KafkaOp("xxxx:9092", "test_topic")
对kafka理解
kafka *** 作命令
起服务
# zookeeper
nohup sh bin/zookeeper-server-start.sh config/zookeeper.properties >> zk.nohup.out &
# kafka
nohup bin/kafka-server-start.sh config/server.properties >> kafka.nohup.out &
# 查看zookeeper配置
vi config/server.properties
zookeeper.connect=localhost:2181
# 查看已有topic
kafka-topics.sh --list --bootstrap-server localhost:9092
删除topic
kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic xx
# 增加topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic xx
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic xx
# kafka理解
Kafka是分布式订阅-发布消息系统,
解释 分布式: 可以有多个broker组成
订阅-发布消息系统: 生产者消费者模式的系统
使用场景 : 削峰, 解耦, 去冗余(msg被消费了其实还在,换一个consumer又可以消费。 那么msg不删? 删除,到配置里的保存时间就删除)
重要概念:
topic 主题 订阅了什么主题就消费什么主题的msg, 同一group的消费者,只能消费一次某个主题的msg
partition 分区 把生产的同一主题的msg,按照策略存到各个分区 分区可以实现 高伸缩性,以及负载均衡 那消息就无序了?保存策略,默认轮询。
replication 副本 follower 从leader那里copy数据
ISR In-Sync Replicas 副本同步队列
OSR(Outof-Sync Replicas)
AR Assigned Replicas 所有副本
AR = ISR + OSR
kafka-python
消费者参数理解
auto_offset_reset='earliest',
auto_offset_reset='latest',
1,如果消费者组有消费者提交过offset, 不管是earliest还是latest,都从offset开始消费。
2,如果消费者组没有消费者提交过offset, earliest从分区开始位置消费,latest则从当前添加的最后位置开始消费。
命令行创建生产者和消费者
kafka-console_producer.sh --bootstrap-server localhost:9092 --topic xxx
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxx --from-beginning
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)