kafka
的库有两种类型,一种是直接连接
kafka
的,存储
offset
的事情要自己在客户端完成。还有一种是先连接
zookeeper
然后再通过
zookeeper
获取
kafka
的
brokers
信息,
offset
存放在
zookeeper
上面,由
zookeeper
来协调。
我现在使用
samsa
这个
highlevel
库
Producer示例
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']topic.publish('msg')
**
Consumer示例
**
from
kazoo.client
import
KazooClientfrom
samsa.cluster
import
Clusterzookeeper
=
KazooClient()zookeeper.start()cluster
=
Cluster(zookeeper)topic
=
cluster.topics['topicname']consumer
=
topic.subscribe('groupname')for
msg
in
consumer:
msg
Tip
consumer
必需在
producer
向
kafka
的
topic
里面提交数据后才能连接,否则会出错。
在
Kafka
中一个
consumer
需要指定
groupname
,
groue
中保存着
offset
等信息,新开启一个
group
会从
offset
0
的位置重新开始获取日志。
kafka
的配置参数中有个
partition
,默认是
1
,这个会对数据进行分区,如果多个
consumer
想连接同个
group
就必需要增加
partition
,
partition
只能大于
consumer
的数量,否则多出来的
consumer
将无法获取到数据。
使用kafkapython读取实时数据小例子 使用kafkapython读取实时数据小例子 from kafka import KafkaConsumer from kafka.client import KafkaClient imp欢迎分享,转载请注明来源:内存溢出
评论列表(0条)