MQ全称为MessageQueue 消息队列,是分布式系统中重要的组件,是一种应用程序对应用程序的通信方法
2.目的1.解决生产者和消费者的强解耦问题
2.异步消息:防止应用的阻塞
3.流量消锋:当流量过大时,使用消息队列作为一个缓存区,平衡了生产者和消费者的处理能力,防止应用挂掉
3.类别常见的消息队列中间件
1.RabbitMQ
2.Kafka
2.RabbitMQ安装RabbitMQ 是一个基于Erlang 语言和AMQP协议开发的消息中间件
3.RabbitMQ使用 3.1 自动应答安装参考
使用参考
producer.py
#!/usr/bin/env python import pika # 创建凭证,使用rabbitmq用户密码登录 credentials = pika.PlainCredentials("guest", "guest") connect_param=pika.ConnectionParameters('127.0.0.1', credentials=credentials) # 新建连接,这里localhost可以更换为服务器ip connection = pika.BlockingConnection(connect_param) # 创建频道 channel = connection.channel() # 新建一个ja队列,用于接收消息 channel.queue_declare(queue='ja') # exchange三种模式:fanout,direct,topic # exchange=''表示精确指定发送给哪个队列 channel.basic_publish(exchange='', routing_key='ja', body=b'hello') print("已经发送了消息") # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接 connection.close()
comuser.py
#!/usr/bin/env python import pika def callback(ch, method, properties, body): print("消费者接收到了任务:%r" % body) # 建立与rabbitmq的连接 credentials = pika.PlainCredentials("guest", "guest") connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials) connection = pika.BlockingConnection(connect_param) channel = connection.channel() channel.queue_declare(queue="ja") # 指定监听的消息队列名称,以及是否自动应答,和相应的回调函数 channel.basic_consume("ja", auto_ack=True, on_message_callback=callback) # 启动监听队列,有消息则执行callback,没消息则挂起 channel.start_consuming()
注意
3.2 手动应答1.存在多个消费者时,默认采用轮训的方式获取消息队列
producer.py与上面保持一致
customer.py
import pika def callback(ch, method, properties, body): print("消费者接收到了任务:%r" % body) # 手动告知消息队列消息已拿走 ch.basic_ack(delivery_tag=method.deliver_tag) credentials = pika.PlainCredentials("guest", "guest") connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials) connection = pika.BlockingConnection(connect_param) channel = connection.channel() channel.queue_declare(queue="ja") # auto_ack默认为False channel.basic_consume("ja", on_message_callback=callback) print("waiting for message") channel.start_consuming()3.3 消息持久化
producer.py
#!/usr/bin/env python import pika credentials = pika.PlainCredentials("guest", "guest") connect_param=pika.ConnectionParameters('127.0.0.1', credentials=credentials) connection = pika.BlockingConnection(connect_param) channel = connection.channel() channel.queue_declare(queue='ja',durable=True) #申明持久化的新队列 channel.basic_publish(exchange='', routing_key='ja', body=b'hello', properties=pika.BasicProperties(delivery_mode=2,) # 支持数据持久化 ) print("已经发送了消息") connection.close()
customize.py
import pika def callback(ch, method, properties, body): print("消费者接收到了任务:%r" % body) ch.basic_ack(delivery_tag=method.deliver_tag) credentials = pika.PlainCredentials("guest", "guest") connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials) connection = pika.BlockingConnection(connect_param) channel = connection.channel() #声明可持久化的队列 channel.queue_declare(queue="ja",durable=True) channel.basic_consume("ja", on_message_callback=callback) print("waiting for message") channel.start_consuming()3.4 公平分发
producer.py
#!/usr/bin/env python import pika credentials = pika.PlainCredentials("guest", "guest") connect_param=pika.ConnectionParameters('127.0.0.1', credentials=credentials) connection = pika.BlockingConnection(connect_param) channel = connection.channel() # 限制每次只发送不超过1条消息到同一消费者,消费者必须手动反馈告知队列,才会发送下一个 channel.basic_qos(prefetch_count=1) channel.queue_declare(queue='ja',durable=True) #申明持久化的新队列 channel.basic_publish(exchange='', routing_key='ja', body=b'hello', properties=pika.BasicProperties(delivery_mode=2,) # 支持数据持久化 ) print("已经发送了消息") connection.close()4.交换机模式 4.1广播模式
producer.py
#!/usr/bin/env python import pika credentials = pika.PlainCredentials("guest", "guest") connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials) connection = pika.BlockingConnection(connect_param) channel = connection.channel() # 将频道声明为广播模式 channel.exchange_declare(exchange="m1", exchange_type='fanout') # 指定发送给什么频道 channel.basic_publish(exchange="m1",routing_key='',body=b'nihao') print("已经发送了消息") connection.close()
customer.py
import pika def callback(ch, method, properties, body): print("消费者接收到了任务:%r" % body) credentials = pika.PlainCredentials("guest", "guest") connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials) connection = pika.BlockingConnection(connect_param) channel = connection.channel() # 随机生成一个队列 result = channel.queue_declare(queue='', exclusive=True) # 获取队列名 queue_name = result.method.queue # 将队列绑定在频道上 channel.queue_bind(exchange='m1', queue=queue_name) # 发送消息 channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) print("waiting for message") channel.start_consuming()
4.2路由模式运行多个消费者,即可发现生产者会将消息发给所有的队列
路由模式也叫关键字模式
producer.py
#!/usr/bin/env python import pika credentials = pika.PlainCredentials("guest", "guest") connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials) connection = pika.BlockingConnection(connect_param) channel = connection.channel() # 将频道声明为广播模式 channel.exchange_declare(exchange="m2", exchange_type='direct') # 将消息发送给所有为ja的路由关键字 channel.basic_publish(exchange="m2",routing_key='ja',body=b'hello') print("已经发送了消息") connection.close()
customizer.py
#!/usr/bin/env python import pika def callback(ch, method, properties, body): print("消费者接收到了任务:%r" % body) credentials = pika.PlainCredentials("guest", "guest") connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials) connection = pika.BlockingConnection(connect_param) channel = connection.channel() # 随机生成一个队列 result = channel.queue_declare(queue='', exclusive=True) # 获取队列名 queue_name = result.method.queue # 将队列绑定在频道上 channel.queue_bind(exchange='m1', queue=queue_name,routing_key="ja") # 发送消息 channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) print("waiting for message") channel.start_consuming()4.3统配符模式
通配符模式类似于正则(但与正则表达式规则完全不同)
producer.py
#!/usr/bin/env python import pika credentials = pika.PlainCredentials("guest", "guest") connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials) with pika.BlockingConnection(connect_param) as connection: channel = connection.channel() # 将频道声明为统配符模式 channel.exchange_declare(exchange="m2", exchange_type='topic') message1=b"1" message2=b"2" channel.basic_publish(exchange="m2",routing_key="europe.weather",body=message1) channel.basic_publish(exchange="m2", routing_key="europe.weather.bad", body=message2) print(f"Send {message1}") print(f"Send {message2}")
customer.py
#!/usr/bin/env python import pika def callback(ch, method, properties, body): print("消费者接收到了任务:%r" % body) credentials = pika.PlainCredentials("guest", "guest") connect_param = pika.ConnectionParameters('127.0.0.1', credentials=credentials) connection = pika.BlockingConnection(connect_param) channel = connection.channel() channel.exchange_declare(exchange='m2',exchange_type='topic') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='m2', queue=queue_name,routing_key="europe.") print("waiting for message") channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)