python使用kafka生产和消费案例

python使用kafka生产和消费案例,第1张

// confluent_kafka 使用案例
import json
from confluent-kafka import Producer

topic_name = ""
conf = {
// 集群,或者服务器名
"bootstrap.servers":"",
// 安全隧道
"security.protocol":"sasl_plaintext",
//加密方式
"sasl.mechanism":"SCRAM-SHA-256"
// 账号密码
"sasl.username":"",
"sasl.password":"",
# >> [详细配置见](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
....
}

produce = Producer(**conf)

a= {"name":"12344","age":18,"sex":"man"}
produce.produce(topic_name,json.dumps(a,ensure_ascii=False).encoding("utf-8"))

produce.pull()
produce.flush()

详解 confluent_kafka python版本的生产使用案例。
1.需要注意的是confluent_kafka 安装时候可能会出现报错,建议使用conda 安装,主要出现的问题在sasl上
2.confluent_kafka,我是用的情况来看,必须有账号和密码
3.confluent_kafka 没有常见的batch方式,你可以通过produce.poll(timeout=0.2)来对每条数据强制等待返回结果,可以使用flush(),批量等待结果(内部调用poll()实现具体任务)
4.如果想要提高效率,可以取消掉produce.flush() 和 produce.pull()
5.每一个produce(),传入字符串有个长度限制和容量显示,可以在详细配置见找message.max.bytes 来提高传入字符串的长度。

import json
from confluent-kafka import Consumer

topic_name = ""
conf = {
// 集群,或者服务器名
"bootstrap.servers":"",
// 安全隧道
"security.protocol":"sasl_plaintext",
//加密方式
"sasl.mechanism":"SCRAM-SHA-256"
// 账号密码
"sasl.username":"",
"sasl.password":"",
"group.id":["xxx","xxx1"],
// 需要增加offset相关的配置和消费策略

# >> [详细配置见](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
....
}

consumer= Consumer(**conf)
batch_size = 200
while True:
	// 消费数据
	data = consumer.consume(batch_size)
	keep_datas = []
	// 判断是否消费失败
	if data.error() is not None:
		for dat in data:
			if dat.error() is not None:
				keep_datas.append(dat.value())
			else:
				print("当前此条message消费失败......")
	else:
		print("当前此批消费失败~~")
	# json 解析判断
	use_items = []
	for keep_data in keep_datas:
		try:
			use_items.append(json.loads(keep_data))
		except Exception as e
			print("当前此条消息,json解析失败.....")	

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/798600.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存