RabbitMQ五大工作模式之简单模式、工作队列模式、发布订阅模式、路由模式、主题模式、RPC模式

RabbitMQ五大工作模式之简单模式、工作队列模式、发布订阅模式、路由模式、主题模式、RPC模式,第1张

RabbitMQ五大工作模式之简单模式、工作队列模式、发布订阅模式、路由模式、主题模式、RPC模式 前言

这几年一直在it行业里摸爬滚打,一路走来,不少总结了一些python行业里的高频面试,看到大部分初入行的新鲜血液,还在为各样的面试题答案或收录有各种困难问题

于是乎,我自己开发了一款面试宝典,希望能帮到大家,也希望有更多的Python新人真正加入从事到这个行业里,让python火不只是停留在广告上。

微信小程序搜索:Python面试宝典

或可关注原创个人博客:https://lienze.tech

也可关注微信公众号,不定时发送各类有趣猎奇的技术文章:Python编程学习

工作模式

通过与各种交换机搭配,rq提供了为如下几种主要的工作模式

简单模式

一队列,一消费者、生产者,使用默认交换机,即可构成最基本的模型

生产者

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消费者

import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello') # 队列声明

    def callback(ch, method, properties, body): # 消费者接收回调
        print(" [x] Received %r" % body)

    channel.basic_consume(
        queue='hello', on_message_callback=callback, auto_ack=True) # 回调声明

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming() # 开启消费

if __name__ == '__main__':
    main()
工作队列

类似简单模式,但是会有多个消费者,此时rq将会按照轮询的方式分别将消息给予每个消费者

消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失)

RabbitMQ 将理解消息未完全处理并将重新排队。如果有其他消费者同时在线,它会迅速将其重新交付给另一个消费者

默认情况,启动确认模式为手动确认

生产者

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 队列声明
channel.queue_declare(queue='task_queue', durable=True) # 队列持久化 

message = ' '.join(sys.argv[1:]) or "Hello World!"

# 消息发布
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # 消息持久化
    ))
print(" [x] Sent %r" % message)
connection.close()

消费者,可以启动N>1个守护进程

import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 队列声明
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C') 

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    # 消息确认
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()
pub/sub订阅模式

结合交换机实现一端生产,多个消费者可以都接收消费到相同的消息

实现该模式,主要基于扇形交换机fanout完成,他的特点是,将消息发布至绑定在交换机上的所有队列中

生产者

import pika
import sys
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# fanout交换机声明
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
# 消息发布
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

消费者no.1

import pika
# 队列声明
channel.queue_declare(queue='queue1', exclusive=True)
# 队列绑定
channel.queue_bind(exchange='logs', queue="queue1")
# 消费回调
def callback(ch, method, properties, body):
    print(" [x] %r" % body.decode())
	
# 消费者监听
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

消费者no.N

channel.queue_declare(queue='queueN', exclusive=True)
channel.queue_bind(exchange='logs', queue="queueN")
def callback(ch, method, properties, body):
    print(" [x] %r" % body.decode())
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

如果该fanout交换机未绑定任何队列,那么对于该交换机的消息发布,消息都将会丢失

查看所有当前队列绑定,可以在控制台或命令行使用进行查看

rabbitmqctl list_bindings
路由模式

通过结合交换器direct模式,可以为队列绑定指定routing_key路由规则,让交换器可以选择的为某些队列进行消息投递

生产者

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
for _ in range(10):
    # 偶数 -> orange
    # 奇数 -> black
    # 0 -> green
    if _ == 0 :
        routing_key = 'green'
	elif _ % 2 == 0:
        routing_key = 'orange'
    else:
        routing_key = 'black'
	channel.basic_publish(
    	exchange='direct_logs', routing_key=routing_key, body=message)
	print(" [x] Sent %r:%r" % (severity, message))
connection.close()

消费者

此时出现多队列,那么需要消费者指定队列进行消费,比如对black的消费

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body.decode()))
channel.basic_consume(
    queue="black", on_message_callback=callback, auto_ack=True)
channel.start_consuming()
主题模式

主题模式,借助topic交换机的匹配队列功能,可以使业务代码对于交换机的选择更为灵活

简而言之可以对业务进行包含关键key的分类,而key将作为队列的匹配规则

生产者

from multiprocessing import current_process
import pika
from pika.exchange_type import ExchangeType


class CPUTest:
    def __init__(self):
        self.user = "guest"
        self.pwd = "guest"
        self.ip = "192.168.17.3"
        self.port = 5672

    def __call__(self):
        self.producer()

    def producer(self):
        credentials = pika.PlainCredentials(self.user, self.pwd)
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                self.ip, self.port, '/', credentials)
        )
        # 建立channel
        channel = connection.channel()

        channel.exchange_declare(
            exchange='topic_exchange',
            exchange_type=ExchangeType.topic,
            durable=True
        )

        for num in range(1, 3):
            channel.queue_declare(
                queue='topic_queue{}'.format(str(num)), durable=True)  # 队列创建

            channel.queue_bind(
            exchange="topic_exchange", queue='topic_queue1',
            routing_key='A.*'.format(str(num)))
        # A.zxcnzjcn 绑定时,是匹配A.(任意n个单词)

        channel.queue_bind(
            exchange="topic_exchange", queue='topic_queue2',
            routing_key='*.B'.format(str(num)))
        # A.B -> topic_queue1 topic_queue2
        # A.C -> topic_queue1
        # C.B -> topic_queue2
        # 向消息队列发送一条消息
        for num in range(5):
            channel.basic_publish(
                exchange='topic_exchange',
                routing_key="A.B",  # 向两款同时发送消息
                body="topic_%s" % num,  # 发送内容
                properties=pika.BasicProperties(delivery_mode=2)  # 消息持久化
            )

        channel.cancel()
        connection.close()  # 连接关闭
        return current_process().name

消费者

消费者还是要指定对应的队列进行消费

生产者通过主题模糊匹配生产

RPC模式

如果需要在远程计算机上运行一个函数并等待结果,那么将需要远程过程调用/RPC机制

实现rpc机制,需要创建有关于rpc客户端的回调队列,并且为了在回调队列中可以获取响应的详细归属,可以通过维护业务id来分辨

rpc模型下,将称为客户端、服务端,也就代表着,服务端可能同时成为消费者、生产者

数据接收的消费者,以及结果发布的生产者

服务端: 提供rpc远程调用,封装斐波那契数列方法,通过客户端请求发送n,返回n的斐波那契数

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# rpc_queue队列用来进行需求消息获取
channel.queue_declare(queue='rpc_queue')

# 斐波那契数列
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)
    
def on_request(ch, method, props, body):
    n = int(body.decode()) # 获取客户端发来要获取的斐波那契n值
    fibnumber = fib(n) # 斐波那契值
    ch.basic_publish(
        exchange='',
        # 结果队列,将有消费者定义结果存放在何队列
        routing_key=props.reply_to, 
        # 结果的当前标记,用以确定结果是否为当前客户端所需
        properties=pika.BasicProperties(correlation_id = props.correlation_id),
        # 结果
		body=str(fibnumber)
    )
    # 消息确认
    ch.basic_ack(delivery_tag=method.delivery_tag)
    
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
channel.start_consuming()

客户端

客户端与服务端共同使用rpc_queue进行rpc调用信息的发布、接收

此外,客户端还需要在每次生产需求时,额外提供当前需求对应回复的key也就是correlation_id用以确保结果可靠

并且,为了避免分布式客户端情况下,可能会造成correlation_id重复而结果获取有误,可以在每个客户端维护一个单独的结果队列进行结果获取

首先是初始化工作,并且为了可以支持结果获取,还需要客户端额外订阅结果获取队列callback_queue,也就是服务端拿到的reply_to

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        # 结果队列
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(
            queue=self.callback_queue,
            # 消费的结果回调方法,校验correlation_id
            on_message_callback=self.on_response,
            auto_ack=True,
        )

工作函数,用来接收斐波那契的n值,并将其发布至rpc_queue队列中

发布过程,携带reply_to以及correlation_id属性,correlation_id可以用uuid模块生成

def call(self, n):
    self.response = None
    self.corr_id = str(uuid.uuid4())
    # 发布斐波那契获取消息
    self.channel.basic_publish(
        exchange='',
        routing_key='rpc_queue',
        properties=pika.BasicProperties(
            reply_to=self.callback_queue, # 结果队列
            correlation_id=self.corr_id, # 结果标记
        ),
        body=str(n)
    )
    # 如果当前 并未有rpc调用结果
    while self.response is None: 
        # 非阻塞执行消费动作,获取结果
        self.connection.process_data_events()
    # 拿到结果则返回
    return int(self.response)

拿取结果的消费回调函数,需要校验corr_id是否与结果返回中的correlation_id即可

def on_response(self, ch, method, props, body):
    if self.corr_id == props.correlation_id:
        self.response = body

这只是考虑了基本的rpc,并不存在任何并发

如果在大量请求时,可以结合redis等工具完成任务的获取,虽然不能保证有序,但可以确保结果数据的有效

并发过多线程的服务端,可能导致标记的key并不为有序,但是可以将这些key存储在redis中进行校验

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700137.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存