- 前言
- 一、pulsar-thread是什么?
- 二、安装pulsar-thread
- 三、使用说明
- 1. 连接 (client)
- 2. 生产者(producer)
- 3. 消费者(consumer)
- 四、当使用 pulsar-client(pulsar.Client)连接时
- 1. 生产者(producer)
- 2. 消费者(consumer)
- 五、关于schema模式
- 1. schema支持的模式
- 2. schema参数用法和pulsar-client相同
pulsar是一个云原生的分布式消息和流平台。
本文主要介绍使用pulsar-thread包进行生产和消费。
一个连接pulsar消息队列的包,优点是:支持多线程生产和消费。
- 本包是以pulsar-client为基础创建的
- pulsar-client使用链接:https://pulsar.apache.org/docs/en/client-libraries-python/
- 默认 schema=pulsar.schema.StringSchema()
- 若想使用其他的schema, 使用方法与pulsar-client相同, 详情可看上面pulsar-client使用链接
- 默认的多线程最大数thread_count为5个
pip install pulsar-thread
三、使用说明 1. 连接 (client)import pulsar_thread as pt client = pt.client('pulsar://0.0.0.0:6655') #请将 0.0.0.0:6655 换成你的pulsar地址2. 生产者(producer)
import json import pulsar_thread as pt # 1. 连接client client = pt.client('pulsar://0.0.0.0:6655') # 模拟要发送的数据 data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']} data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']} data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']} data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']} # 2. 将要发送的数据和topic组合成字典 # {'topic_1': msg_1, ... , 'topic_n': msg_n} data_dict = {'test1':[json.dumps(data), json.dumps(data2)], 'test2':[json.dumps(data3), json.dumps(data4)]} # 3. 创建生产者 producer = client.create_producer() # 4. 发送消息 # 可选 4.1 同步发送send 或 4.2 异步发送send_async # 4.1 同步发送 # 可选 4.1.1 默认模式 或 4.1.2 自定义模式 # 4.1.1 默认模式 # 默认参数:thread_count=5, schema=pulsar.schema.StringSchema() # 默认多线程最大数thread_count为5个, schema是以StringSchema()字符串模式 result = producer.send(data_dict) # 4.1.2 自定义模式 # schema参数设置规范详见pulsar-client的使用 result = producer.send_async(data_dict, thread_count=10, schema=pulsar.schema.StringSchema()) # 4.2 异步发送 # 可选 4.2.1 默认模式 或 4.2.2 自定义模式 # 4.2.1 默认模式 # 默认参数:callback=None, thread_count=5, schema=pulsar.schema.StringSchema() # 默认回调函数callback为None, 多线程最大数thread_count为5个, schema是以StringSchema()字符串模式 result = producer.send_async(data_dict) # 4.2.2 自定义模式(callback, schema参数设置规范详见pulsar-client的使用) result = producer.send_async(data_dict, callback=None, thread_count=10, schema=pulsar.schema.StringSchema())3. 消费者(consumer)
import json import pulsar_thread as pt # 业务程序,处理消息队列发来的消息 # msg 是 接收的消息队列传来的消息 def deal_msg(msg): print(msg.value()) import time time.sleep(5) print(json.loads(msg.data())) # 1. 连接 client = pt.client('pulsar://0.0.0.0:6655') # 2. 创建consumer # 可从多个 topic 里接收数据 # 默认接收的 schema=pulsar.schema.StringSchema() # 格式:consumer = client.create_consumer(['topic_1', ......, 'topic_n'], 消费者的名字, # schema=pulsar.schema.StringSchema()) consumer = client.create_consumer(['test1', 'test2'], 'my-subscription') # 3. 接收数据并用业务程序(例:deal_msg)处理 # 可选 3.1 单线程处理 consumer.receive() 或 # 3.2 多线程处理 consumer.receive_thread() # 3.1 单线程处理 # 可选 3.1.1 默认模式 或 3.1.2 自定义模式 # 阻塞模式,消费一个,业务程序处理一个,业务程序处理完成,再消费下一个 # 3.1.1 默认模式 # 默认参数:timeout_millis=None, logger=None # 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms # 默认 日志收集器logger 为 None consumer.receive(deal_msg) # 3.1.2 自定义模式 import logging,sys def LogSet(): # 获取logger实例,如果参数为空则返回root logger logger = logging.getLogger("test.log") # 指定logger输出格式 formatter = logging.Formatter('%(asctime)s %(pathname)s %(lineno)d %(levelname)-8s: %(message)s') # 文件日志 file_handler = logging.FileHandler("./test.log") file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式 # 控制台日志 console_handler = logging.StreamHandler(sys.stdout) console_handler.formatter = formatter # 也可以直接给formatter赋值 # 为logger添加的日志处理器 logger.addHandler(file_handler) logger.addHandler(console_handler) # 指定日志的最低输出级别,默认为WARN级别 logger.setLevel(logging.DEBUG) # 移除一些日志处理器 return logger, file_handler # 获取 logger logger,_=LogSet() consumer.receive(deal_msg, timeout_millis=5000, logger=logger) # 3.2 多线程处理 # 可选 3.2.1 默认模式 或 3.2.2 自定义模式 # 可以使用多线程进行并发消费,处理业务数据,提高效率 # 3.2.1 默认模式 # 默认参数:thread_count=5, timeout_millis=None, logger=None # 默认 最大线程数thread_count为5个 # 默认 订阅超时限制 timeout_millis(慎用) 为 None, 单位上ms # 默认 日志收集器logger 为 None consumer.receive_thread(deal_msg) # 3.1.2 自定义模式 # 例 1 consumer.receive_thread(deal_msg, 2) # 例 2 consumer.receive_thread(deal_msg, 10, timeout_millis=5000, logger=logger)四、当使用 pulsar-client(pulsar.Client)连接时
pulsar_thread的create_producer和create_consumer的使用
1. 生产者(producer)import json import pulsar_thread as pt import pulsar # 使用 pulsar 连接 client = pulsar.Client('pulsar://0.0.0.0:6655') data = {'name':'jack', 'age':25, 'have': ['item1', 'item2']} data2 = {'name':'rose', 'age':28, 'have': ['item1', 'item2']} data3 = {'name':'joe', 'age':28, 'have': ['item1', 'item2']} data4 = {'name':'mark', 'age':28, 'have': ['item1', 'item2']} data_dict = {'test1':[json.dumps(data), json.dumps(data2)], 'test2':[json.dumps(data3), json.dumps(data4)]} # 使用 pulsar_thread 创建生产者 producer = pt.create_producer(client) result = producer.send(data_dict)2. 消费者(consumer)
import json import pulsar_thread as pt import pulsar def deal_msg(msg): print(msg.value()) import time time.sleep(5) print(json.loads(msg.data())) # 使用 pulsar 连接 client = pulsar.Client('pulsar://0.0.0.0:6655') # 使用 pulsar_thread 创建消费者 consumer = pt.create_consumer(client, ['test1', 'test2'], 'my-subscription') consumer.receive_thread(deal_msg, 2)五、关于schema模式 1. schema支持的模式
若想拓展使用schema,请移步至pulsar-client文档,阅读使用schema。
pulsar-client文档链接:https://pulsar.apache.org/docs/en/client-libraries-python/
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)