Python消息队列(RabbitMQ)

Python消息队列(RabbitMQ),第1张

概述RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。可维护多个队列,可实现消息的一对一和广播等方式发送 RabbitMQ是一个开源的AMQP实现

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。可维护多个队列,可实现消息的一对一和广播等方式发送

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

CentOs安装:

安装socat

yum -y install socat

安装erlang

wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
Rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm

安装rabbitmq

wget  http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
rpm -ivh rabbitmq-server-3.6.10-1.el7.noarch.rpm

启动:

systemctl start rabbitmq-server

查看状态:

rabbitmqctl status

配置网页管理端:

mkdir /etc/rabbitmq

启用插件:

rabbitmq-plugins enable rabbitmq_management

配置开放端口:

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent

重启防火墙:

systemctl restart firewalld.service

创建用户:

rabbitmqctl add_user ruroot rproot

修改角色为管理员:

rabbitmqctl set_user_Tags ruroot administrator

设置权限:

rabbitmqctl set_permissions -p / ruroot2 ".*" "

测试结果:

命令行消息管理:

得到所有队列及存在的数据条数

rabbitmqctl List_queues
Python简单 *** 控

安装

pip3 install pika

发送数据:

如果生成多个的话,实现效果是轮询发送,一个一个循环发送数据,如同“皇帝轮流做…”

 1 import pika 2  3 #建立连接 4 userx=pika.PlainCredentials(ruroot2",rproot2") 5 conn=pika.BlockingConnection(pika.ConnectionParameters(192.168.43.10'/',credentials=userx)) 6  7 开辟管道 8 channelx=conn.channel() 9 10 声明队列,参数为队列名11 channelx.queue_declare(queue=dongchannel1112 13 发送数据,发送一条,如果要发送多条则复制此段14 channelx.basic_publish(exchange="",15                        routing_key= 队列名16                        body=dongxiaodongtodata3"  发送的数据17                        )18 print(--------发送数据完成-----------19 20 关闭连接21 conn.close()

取出数据:

消息处理函数,执行完成才说明接收完成,此时才可以接收下一条,串行14 def dongcallbackfun(v1,v2,v3,bodyx):15     得到的数据为:16 接收准备18 channelx.basic_consume(dongcallbackfun,1)">收到消息的回调函数19                        queue=队列名20                        no_ack=True 是否发送消息确认21 22 -------- 开始接收数据 -----------23 24 开始接收消息25 channelx.start_consuming()

发送端是否设置数据保存时间:

默认服务器(rabbitmq-server)重启后消息队列和消息数据均会全部消失

消息队列的永久保存,开启后将仅仅实现服务器重启后消息队列依然在,但数据还是会丢失,如果要保存数据,请参考接下来

声明队列,参数为队列名#实现队列永久保存,durable=Truechannelx.queue_declare(queue=dongch1数据的永久保存(一直等待被取,即使服务器重启),将要永久保存的发送数据添加属性propertIEs

发送数据channelx.basic_publish(exchange= 队列名                       body=dongxiaodongtodata333335 发送的数据                       propertIEs=pika.BasicPropertIEs(                           delivery_mode=2,1)">实现消息永久保存                       )                       )

发送端实现能者多劳

在发送端发送数据前,添加下面一句,此句添加一次即可,可以实现自动判断多接收端的处理速度,实现接收端处理快则多派发任务,处理慢则少派发任务

channelx.basic_qos(prefetch_count=1)

接收端是否接收确认:

接收端开启消息确认(值为False),接收端则会在接收回调函数结束时手动发送确认消息到数据发送者,如果接收端在回调函数处理未完成时就挂掉了,那么发送端将会立即把当前数据转交到下一个接收端进行数据处理

1 2  dongcallbackfun(channlx,methodx,1)">3     4     channelx.basic_ack(delivery_tag=methodx.delivery_tag) 发送数据完成确认消息,手动确认5 6 7 channelx.basic_consume(dongcallbackfun,1)">8                        queue=9                        no_ack=False 是否在消息回调函数结束后发送确认信息到发消息者,true表示不发送

非阻塞版数据接收:

启用会立即返回结果,如果有数据则进入回调函数,无数据则进行下一条,可以配合while使用

conn.process_data_events() 使用连接对象进行数据接收判断无数据")
实现消息的订阅和发布:

发布:

192.168.1.175声明发布和订阅通道,如果可以确认通道存在则可以去掉该句11 channelx.exchange_declare(exchange=dongeefanout发送数据确定发布主题为:dongeedongxiaodongeeedata1118 19 20 22 conn.close()

订阅:

 2  3 userx=pika.PlainCredentials( 4 conn=pika.BlockingConnection(pika.ConnectionParameters( 5  6  7 channelx= 8  9 10 channelx.exchange_declare(exchange=11 12 声明队列,生成一个随机的且不存在的队列,该队列会在连接断开后自动销毁13 resqueue=channelx.queue_declare(exclusive=True)得到随机生成的队列名15 queuenamex=resqueue.method.queue将队列和发布数据绑定,确定订阅主题为:dongee18 channelx.queue_bind(exchange=queuenamex)22     25 channelx.basic_consume(dongcallbackfun,1)">26                        queue=queuenamex,1)">27                        no_ack=True28 29 30 31 32 33 channelx.start_consuming()
通过管道实现进一步的消息订阅和发布:

发布:

dongee2direct确定发布主题为:dongee2dongqu33确定发布的队列(发布的主题):dongqu33dongxiaodong333 确定发送的数据20 channelx.basic_publish(exchange=21                        routing_key=dongqu22确定发布的队列(发布的主题):dongqu2222                        body=dongxiaodong22223 24 25 26 27 28 conn.close()

订阅:

将队列和发布数据绑定,确定订阅主题为:dongqu11 和 dongqu22dongqu1119 channelx.queue_bind(exchange=23     队列名(订阅的主题名)为:%r  得到的数据为:%r  "%(methodx.routing_key,bodyx))26 channelx.basic_consume(dongcallbackfun,1)">27                        queue=queuenamex,1)">28                        no_ack=29 30 31 32 33 34 channelx.start_consuming()
总结

以上是内存溢出为你收集整理的Python消息队列(RabbitMQ)全部内容,希望文章能够帮你解决Python消息队列(RabbitMQ)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1189839.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-03
下一篇 2022-06-03

发表评论

登录后才能评论

评论列表(0条)

保存