kafka 代码和总结

kafka 代码和总结,第1张

kafka 代码和总结

import time
import json
import requests


from kafka import KafkaProducer
from kafka import KafkaConsumer


class KafkaOp:
    def __init__(self, servers, topic):
        self.servers = servers
        self.topic = topic
        self.producer = KafkaProducer(bootstrap_servers=self.servers)
        print("init")

    def close(self):
        if self.producer is not None:
            self.producer.close()
            self.producer = None

    def produce(self, msg):
        if self.producer is not None:
            if not isinstance(msg, bytes):
                msg = msg.encode("utf-8")  # 将str类型转换为bytes类型
            self.producer.send(topic=self.topic, value=msg)
            self.producer.flush()
            print("{} send done".format(msg))

        else:
            print("no self.producer")

    def consume(self):
        consumer = KafkaConsumer(self.topic, bootstrap_servers=[self.servers, ],
                                 auto_offset_reset='earliest',
                                 # auto_offset_reset='latest',
                                 enable_auto_commit=True,
                                 auto_commit_interval_ms=1000,
                                 group_id="jgy2"
               )  # 同一group_id消费只能消费一次msg
        for record in consumer:

            start_time = time.time()
            try:
                if record is not None:
                    print("record", record)
                    valuestr = record.value.decode()   #从ibytes转为string类型
                    value = json.loads(valuestr)
            except Exception as e:
                print('have except {}'.format(e))
            finally:
                print("cost time{:.4f} s".format(time.time()-start_time))
        consumer.close()

ko = KafkaOp("xxxx:9092", "test_topic")
对kafka理解

kafka *** 作命令

起服务
# zookeeper
nohup sh bin/zookeeper-server-start.sh config/zookeeper.properties >> zk.nohup.out &

# kafka
nohup bin/kafka-server-start.sh config/server.properties >> kafka.nohup.out &

# 查看zookeeper配置
vi config/server.properties
zookeeper.connect=localhost:2181

# 查看已有topic
kafka-topics.sh --list --bootstrap-server localhost:9092

删除topic
kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic xx

# 增加topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic xx

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic xx

# kafka理解 
Kafka是分布式订阅-发布消息系统,
解释 分布式: 可以有多个broker组成
订阅-发布消息系统: 生产者消费者模式的系统

使用场景 : 削峰, 解耦, 去冗余(msg被消费了其实还在,换一个consumer又可以消费。 那么msg不删? 删除,到配置里的保存时间就删除)

重要概念:
topic 主题 订阅了什么主题就消费什么主题的msg, 同一group的消费者,只能消费一次某个主题的msg
partition 分区 把生产的同一主题的msg,按照策略存到各个分区 分区可以实现 高伸缩性,以及负载均衡 那消息就无序了?保存策略,默认轮询。
replication 副本 follower 从leader那里copy数据
ISR In-Sync Replicas 副本同步队列
OSR(Outof-Sync Replicas)
AR Assigned Replicas 所有副本
AR = ISR + OSR 

kafka-python

消费者参数理解
auto_offset_reset='earliest',
auto_offset_reset='latest',

1,如果消费者组有消费者提交过offset, 不管是earliest还是latest,都从offset开始消费。
2,如果消费者组没有消费者提交过offset, earliest从分区开始位置消费,latest则从当前添加的最后位置开始消费。
命令行创建生产者和消费者

kafka-console_producer.sh --bootstrap-server localhost:9092 --topic xxx
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xxx --from-beginning

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5715862.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存