RabbitMQ

RabbitMQ,第1张

RabbitMQ 1.消息队列 1.定义

MQ全称为MessageQueue 消息队列,是分布式系统中重要的组件,是一种应用程序对应用程序的通信方法

2.目的

1.解决生产者和消费者的强解耦问题

2.异步消息:防止应用的阻塞

3.流量消锋:当流量过大时,使用消息队列作为一个缓存区,平衡了生产者和消费者的处理能力,防止应用挂掉

3.类别

常见的消息队列中间件

1.RabbitMQ

2.Kafka

2.RabbitMQ安装

RabbitMQ 是一个基于Erlang 语言和AMQP协议开发的消息中间件

安装参考

使用参考

3.RabbitMQ使用 3.1 自动应答

producer.py

#!/usr/bin/env python
import pika
# 创建凭证,使用rabbitmq用户密码登录
credentials = pika.PlainCredentials("guest", "guest")
connect_param=pika.ConnectionParameters('127.0.0.1', credentials=credentials)
# 新建连接,这里localhost可以更换为服务器ip
connection = pika.BlockingConnection(connect_param)
# 创建频道
channel = connection.channel()
# 新建一个ja队列,用于接收消息
channel.queue_declare(queue='ja')
# exchange三种模式:fanout,direct,topic
# exchange=''表示精确指定发送给哪个队列
channel.basic_publish(exchange='', routing_key='ja', body=b'hello')
print("已经发送了消息")
# 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接
connection.close()

comuser.py

#!/usr/bin/env python
import pika


def callback(ch, method, properties, body):
    print("消费者接收到了任务:%r" % body)


# 建立与rabbitmq的连接
credentials = pika.PlainCredentials("guest", "guest")
connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(connect_param)
channel = connection.channel()
channel.queue_declare(queue="ja")

# 指定监听的消息队列名称,以及是否自动应答,和相应的回调函数
channel.basic_consume("ja", auto_ack=True, on_message_callback=callback)
# 启动监听队列,有消息则执行callback,没消息则挂起
channel.start_consuming()

注意

1.存在多个消费者时,默认采用轮训的方式获取消息队列

3.2 手动应答

producer.py与上面保持一致

customer.py

import pika


def callback(ch, method, properties, body):
    print("消费者接收到了任务:%r" % body)
    # 手动告知消息队列消息已拿走
    ch.basic_ack(delivery_tag=method.deliver_tag)


credentials = pika.PlainCredentials("guest", "guest")
connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(connect_param)
channel = connection.channel()
channel.queue_declare(queue="ja")

# auto_ack默认为False
channel.basic_consume("ja",  on_message_callback=callback)
print("waiting for message")
channel.start_consuming()
3.3 消息持久化

producer.py

#!/usr/bin/env python
import pika

credentials = pika.PlainCredentials("guest", "guest")
connect_param=pika.ConnectionParameters('127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(connect_param)
channel = connection.channel()
channel.queue_declare(queue='ja',durable=True) #申明持久化的新队列
channel.basic_publish(exchange='', routing_key='ja', body=b'hello',
                      properties=pika.BasicProperties(delivery_mode=2,) # 支持数据持久化
                      )
print("已经发送了消息")
connection.close()

customize.py

import pika


def callback(ch, method, properties, body):
    print("消费者接收到了任务:%r" % body)
    ch.basic_ack(delivery_tag=method.deliver_tag)


credentials = pika.PlainCredentials("guest", "guest")
connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(connect_param)
channel = connection.channel()
#声明可持久化的队列
channel.queue_declare(queue="ja",durable=True)
channel.basic_consume("ja",  on_message_callback=callback)
print("waiting for message")
channel.start_consuming()
3.4 公平分发

producer.py

#!/usr/bin/env python
import pika

credentials = pika.PlainCredentials("guest", "guest")
connect_param=pika.ConnectionParameters('127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(connect_param)
channel = connection.channel()
# 限制每次只发送不超过1条消息到同一消费者,消费者必须手动反馈告知队列,才会发送下一个
channel.basic_qos(prefetch_count=1) 
channel.queue_declare(queue='ja',durable=True) #申明持久化的新队列
channel.basic_publish(exchange='', routing_key='ja', body=b'hello',
                      properties=pika.BasicProperties(delivery_mode=2,) # 支持数据持久化
                      )
print("已经发送了消息")
connection.close()
4.交换机模式 4.1广播模式

producer.py

#!/usr/bin/env python
import pika

credentials = pika.PlainCredentials("guest", "guest")
connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(connect_param)
channel = connection.channel()
# 将频道声明为广播模式
channel.exchange_declare(exchange="m1", exchange_type='fanout')
# 指定发送给什么频道
channel.basic_publish(exchange="m1",routing_key='',body=b'nihao')
print("已经发送了消息")
connection.close()

customer.py

import pika


def callback(ch, method, properties, body):
    print("消费者接收到了任务:%r" % body)


credentials = pika.PlainCredentials("guest", "guest")
connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(connect_param)
channel = connection.channel()
# 随机生成一个队列
result = channel.queue_declare(queue='', exclusive=True)
# 获取队列名
queue_name = result.method.queue
# 将队列绑定在频道上
channel.queue_bind(exchange='m1', queue=queue_name)
# 发送消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print("waiting for message")
channel.start_consuming()

运行多个消费者,即可发现生产者会将消息发给所有的队列

4.2路由模式

路由模式也叫关键字模式

producer.py

#!/usr/bin/env python
import pika

credentials = pika.PlainCredentials("guest", "guest")
connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(connect_param)
channel = connection.channel()
# 将频道声明为广播模式
channel.exchange_declare(exchange="m2", exchange_type='direct')
# 将消息发送给所有为ja的路由关键字
channel.basic_publish(exchange="m2",routing_key='ja',body=b'hello')
print("已经发送了消息")
connection.close()

customizer.py

#!/usr/bin/env python
import pika


def callback(ch, method, properties, body):
    print("消费者接收到了任务:%r" % body)


credentials = pika.PlainCredentials("guest", "guest")
connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(connect_param)
channel = connection.channel()
# 随机生成一个队列
result = channel.queue_declare(queue='', exclusive=True)
# 获取队列名
queue_name = result.method.queue
# 将队列绑定在频道上
channel.queue_bind(exchange='m1', queue=queue_name,routing_key="ja")
# 发送消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print("waiting for message")
channel.start_consuming()
4.3统配符模式

通配符模式类似于正则(但与正则表达式规则完全不同)

符号含义#“audit.#”能够匹配到audit.irs.corporate*audit.*”只会匹配到“audit.irs”

producer.py

#!/usr/bin/env python
import pika

credentials = pika.PlainCredentials("guest", "guest")
connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials)
with pika.BlockingConnection(connect_param) as connection:
    channel = connection.channel()
    # 将频道声明为统配符模式
    channel.exchange_declare(exchange="m2", exchange_type='topic')
    message1=b"1"
    message2=b"2"
    channel.basic_publish(exchange="m2",routing_key="europe.weather",body=message1)
    channel.basic_publish(exchange="m2", routing_key="europe.weather.bad", body=message2)
    print(f"Send {message1}")
    print(f"Send {message2}")

customer.py

#!/usr/bin/env python
import pika


def callback(ch, method, properties, body):
    print("消费者接收到了任务:%r" % body)


credentials = pika.PlainCredentials("guest", "guest")
connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials)
connection = pika.BlockingConnection(connect_param)
channel = connection.channel()
channel.exchange_declare(exchange='m2',exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='m2', queue=queue_name,routing_key="europe.")
print("waiting for message")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5719109.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存