“什么是最好的方法”的答案在很大程度上取决于队列的使用模式以及“最好”的含义。由于我无法对问题发表评论,因此我将尝试提出一些可能的解决方案。
在每个示例中,我将假定已声明交换。
线程数您可以在单个进程中使用来自不同主机上两个队列的消息
pika。
您是对的-
正如它自己的FAQ所述,
pika它不是线程安全的,但是可以通过为每个线程创建与RabbitMQ主机的连接来以多线程方式使用它。使此示例使用
threading模块在线程中运行如下所示:
import pikaimport threadingclass ConsumerThread(threading.Thread): def __init__(self, host, *args, **kwargs): super(ConsumerThread, self).__init__(*args, **kwargs) self._host = host # Not necessarily a method. def callback_func(self, channel, method, properties, body): print("{} received '{}'".format(self.name, body)) def run(self): credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection( pika.ConnectionParameters(host=self._host, credentials=credentials)) channel = connection.channel() result = channel.queue_declare(exclusive=True) channel.queue_bind(result.method.queue, exchange="my-exchange", routing_key="*.*.*.*.*") channel.basic_consume(self.callback_func, result.method.queue, no_ack=True) channel.start_consuming()if __name__ == "__main__": threads = [ConsumerThread("host1"), ConsumerThread("host2")] for thread in threads: thread.start()
我已经声明
callback_func为纯粹
ConsumerThread.name在打印邮件正文时使用的方法。它也可能是
ConsumerThread类之外的函数。工艺流程
另外,您始终可以只对要使用事件的每个队列使用用户代码运行一个进程。
import pikaimport sysdef callback_func(channel, method, properties, body): print(body)if __name__ == "__main__": credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection( pika.ConnectionParameters(host=sys.argv[1], credentials=credentials)) channel = connection.channel() result = channel.queue_declare(exclusive=True) channel.queue_bind(result.method.queue, exchange="my-exchange", routing_key="*.*.*.*.*") channel.basic_consume(callback_func, result.method.queue, no_ack=True) channel.start_consuming()
然后运行:
$ python single_consume.py host1$ python single_consume.py host2 # e.g. on another console
如果您要处理来自队列的消息的工作量很大,并且只要CPU中的核心数>
=使用者数,通常最好使用这种方法-除非您的队列在大多数情况下是空的,并且消费者不会利用此CPU时间*。
另一个选择是涉及一些异步框架(例如
Twisted)并在单个线程中运行整个程序。
您不能再
BlockingConnection在异步代码中使用;幸运的是,
pika有适配器
Twisted:
from pika.adapters.twisted_connection import TwistedProtocolConnectionfrom pika.connection import ConnectionParametersfrom twisted.internet import protocol, reactor, taskfrom twisted.python import logclass Consumer(object): def on_connected(self, connection): d = connection.channel() d.addCallback(self.got_channel) d.addCallback(self.queue_declared) d.addCallback(self.queue_bound) d.addCallback(self.handle_deliveries) d.addErrback(log.err) def got_channel(self, channel): self.channel = channel return self.channel.queue_declare(exclusive=True) def queue_declared(self, queue): self._queue_name = queue.method.queue self.channel.queue_bind(queue=self._queue_name, exchange="my-exchange", routing_key="*.*.*.*.*") def queue_bound(self, ignored): return self.channel.basic_consume(queue=self._queue_name) def handle_deliveries(self, queue_and_consumer_tag): queue, consumer_tag = queue_and_consumer_tag self.looping_call = task.LoopingCall(self.consume_from_queue, queue) return self.looping_call.start(0) def consume_from_queue(self, queue): d = queue.get() return d.addCallback(lambda result: self.handle_payload(*result)) def handle_payload(self, channel, method, properties, body): print(body)if __name__ == "__main__": consumer1 = Consumer() consumer2 = Consumer() parameters = ConnectionParameters() cc = protocol.ClientCreator(reactor, TwistedProtocolConnection, parameters) d1 = cc.connectTCP("host1", 5672) d1.addCallback(lambda protocol: protocol.ready) d1.addCallback(consumer1.on_connected) d1.addErrback(log.err) d2 = cc.connectTCP("host2", 5672) d2.addCallback(lambda protocol: protocol.ready) d2.addCallback(consumer2.on_connected) d2.addErrback(log.err) reactor.run()
如果您从中使用更多的队列,并且消费者执行的工作占用的CPU越少,该方法将更好。
Python 3既然您已经提到
pika过,由于
pika尚未移植,我将自己局限于基于Python 2.x的解决方案。
但是,如果您想转到> =
3.3,则一种可能的选择是
asyncio与AMQP协议(您在RabbitMQ中使用的协议)之一配合使用,例如
asynqp或
aioamqp。
*-请注意,这些技巧很浅-在大多数情况下,选择并不那么明显;对您而言最合适的取决于队列“饱和”(消息/时间),在收到这些消息后您将做什么工作,在什么环境下运行消费者等?除了对所有实现进行基准测试之外,没有其他方法可以确保
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)