如何使用python 连接kafka 并获取数据

如何使用python 连接kafka 并获取数据,第1张

连接

kafka

的库有两种类型,一种是直接连接

kafka

的,存储

offset

的事情要自己在客户端完成。还有一种是先连接

zookeeper

然后再通过

zookeeper

获取

kafka

brokers

信息,

offset

存放在

zookeeper

上面,由

zookeeper

来协调。

我现在使用

samsa

这个

highlevel

Producer示例

from

kazoo.client

import

KazooClientfrom

samsa.cluster

import

Clusterzookeeper

=

KazooClient()zookeeper.start()cluster

=

Cluster(zookeeper)topic

=

cluster.topics['topicname']topic.publish('msg')

**

Consumer示例

**

from

kazoo.client

import

KazooClientfrom

samsa.cluster

import

Clusterzookeeper

=

KazooClient()zookeeper.start()cluster

=

Cluster(zookeeper)topic

=

cluster.topics['topicname']consumer

=

topic.subscribe('groupname')for

msg

in

consumer:

print

msg

Tip

consumer

必需在

producer

kafka

topic

里面提交数据后才能连接,否则会出错。

Kafka

中一个

consumer

需要指定

groupname

groue

中保存着

offset

等信息,新开启一个

group

会从

offset

0

的位置重新开始获取日志。

kafka

的配置参数中有个

partition

,默认是

1

,这个会对数据进行分区,如果多个

consumer

想连接同个

group

就必需要增加

partition

,

partition

只能大于

consumer

的数量,否则多出来的

consumer

将无法获取到数据。

使用kafkapython读取实时数据小例子 使用kafkapython读取实时数据小例子 from kafka import KafkaConsumer from kafka.client import KafkaClient imp


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/6777697.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-28
下一篇 2023-03-28

发表评论

登录后才能评论

评论列表(0条)

保存