这几年一直在it行业里摸爬滚打,一路走来,不少总结了一些python行业里的高频面试,看到大部分初入行的新鲜血液,还在为各样的面试题答案或收录有各种困难问题
于是乎,我自己开发了一款面试宝典,希望能帮到大家,也希望有更多的Python新人真正加入从事到这个行业里,让python火不只是停留在广告上。
微信小程序搜索:Python面试宝典
或可关注原创个人博客:https://lienze.tech
也可关注微信公众号,不定时发送各类有趣猎奇的技术文章:Python编程学习
基本概念文档地址: https://pika.readthedocs.io/en/stable
AMQP协议进程间传递异步消息的网络协议
AMQP组成: 生产者(publish)、交换机(exchange)、路由(routes)、队列(queue)、消费者(consumer)
交换机: 任务分发的策略
路由: 寻址规则
队列: 任务的存储
交换机类型
Direct exchange(amq.direct): 直连交换机
Fanout exchange(amq.fanout): 扇形交换机(广播)
Topic exchange(amq.topic): 主题交换机
Headers exchange(amq.match): 头交换机
声明交换机
def exchange_declare( self, exchange=None, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, arguments=None): """ exchange: 交换机的名字,为空则自动创建一个名字 exchange_type: 默认交换机类型为direct passive: 检查交换机是否存在,存在返回状态信息,不存在返回404错误 durable: 设置是否持久化 auto_delete: 最后一个队列解绑则删除 internal: 是否设置为值接收从其他交换机发送过来的消息,不接收生产者的消息 arguments: 一个字典,用于传递额外的参数 """默认交换机
一般来说,默认交换机则是名为空的直连交换机,每个新建的队列都会自动绑定在默认交换机上
channel.queue_declare(queue='queue', durable=True) # 新建队列 channel.basic_publish( exchange='', # 默认交换机 routing_key='queue', # 队列 body="", # 发送内容 properties=pika.BasicProperties(delivery_mode=2,) # 消息持久化等其他属性设置 )
直连交换机
根据对应的routing_key绑定的队列进行消息投递
routing_key为booking时,此时交换机将消息路由至Queue1进行存储
当routing_key为create、booking、/confirm/i时,将会路由至Queue2进行存储
channel.exchange_declare( exchange='direct_exchange', exchange_type="direct", durable=True) # 创建直连模式 交换机
创建消费队列
channel.queue_declare(queue='queue1', durable=True) channel.queue_declare(queue='queue2', durable=True)
绑定消费队列至routing_key
channel.queue_bind(exchange="direct_exchange", queue="queue1", routing_key="queue1") channel.queue_bind(exchange="direct_exchange", queue="queue2", routing_key="queue2")
生产者按照数据不同对不同routing_key绑定下的队列进行数据传入
for num in range(10): if num % 2 == 0: routing_key = 'queue1' # 队列balance else: routing_key = 'queue2' # 队列balance channel.basic_publish( exchange='direct_exchange', routing_key=routing_key, body="%s" % num, # 发送内容 properties=pika.BasicProperties(delivery_mode=2) # 消息持久化 )
消费需要根据不同routing_key启动不同的业务消费者
self.channel.exchange_declare( exchange='direct_exchange', exchange_type="direct", durable=True) for method, properties, body in self.channel.consume('queue1'): # 同步阻塞模式 print(body.decode()) # 数据体 self.channel.basic_ack(method.delivery_tag)扇形交换机
扇形交换机会将消息复制广播至绑定在自身交换机上的所有队列,并不会考虑routing_key
扇型用来交换机处理消息的广播路由broadcast routing
创建与扇形交换机相绑定的交换机对象
channel.exchange_declare( exchange='fanout_exchange', exchange_type=ExchangeType.fanout, durable=True ) # 交换机声明 for num in range(3): channel.queue_declare( queue='fanout_queue{}'.format(str(num)), durable=True ) # 队列创建 channel.queue_bind( exchange="direct_exchange", queue='fanout_queue{}'.format(str(num)), routing_key='', ) # 队列绑定
在实际发布的代码中,如果标记当前为某个routing_key其实也是不生效的,此时按照交换机规则任务发布
for num in range(10): channel.basic_publish( exchange='fanout_exchange', routing_key='', body="fanout_%s" % num, # 发送内容 properties=pika.BasicProperties(delivery_mode=2) # 消息持久化 )
由于此时任务已经被复制到多个队列中,类似直连模式,需要消费者指定消费队列
for method, properties, body in channel.consume('fanout_queue1'): print(body.decode()) channel.basic_ack(method.delivery_tag) break
主题交换机注意: 某个队列被消费的数据,并不会在其他同交换机的队列中移除,这可能在多款消费中,造成同个任务多次消费
类似direct交换机,不过主题模式topic的routing_key匹配为模糊匹配,符合一定规则即可发送消息
当消费者绑定routing_key为A.C.B,那么将路由至Queue1与Queue2
为A.B.B时,路由至Queue2
规则为类正则,但不同于正则
# : 匹配全部 * : 匹配一个词
比如建立两款不同的队列routing_key,队列routing_key分别为A.*与*.B
for num in range(2): channel.queue_declare( queue='topic_queue{}'.format(str(num + 1)), durable=True) # 队列创建
进行交换机路由绑定
channel.queue_bind( exchange="topic_exchange", queue='topic_queue1', routing_key='A.*'.format(str(num))) channel.queue_bind( exchange="topic_exchange", queue='topic_queue2', routing_key='*.B'.format(str(num)))
使用主题交换机为两个队列同时发送消息
channel.basic_publish( exchange='topic_exchange', routing_key="A.B", # 向两款同时发送消息 body="topic_%s" % num, # 发送内容 )
或者只为某个符合模糊匹配规则的进行发送,比如此时的routing_key为A.C,这将只满足发送消息至topic_queue1
channel.basic_publish( exchange='topic_exchange', routing_key="A.C", # 向两款同时发送消息 body="topic_%s" % num, # 发送内容 properties=pika.BasicProperties(delivery_mode=2) # 消息持久化 )
头交换机主题模式可用在范围队列匹配发送消息的需求上
头交换机不需要routing_key中的规则进行队列匹配路由,而是通过消息内的headers属性进行匹配
消息头需要具备一个x-match参数,当x-match为any时,消息头的任意一个值被匹配则可以满足条件
如果x-match为all时,则需要所有值匹配成功才可以路由至指定队列
比直连交换机更为灵活的是,头交换机可以提供整数、字典等结构
声明头交换机
channel.exchange_declare( exchange='headers_exchange', exchange_type=ExchangeType.headers, durable=True)
声明x-match为all的消息队列,绑定至headers_exchange上
fileds = {"x-match": "all", "a": 1, "b": 2} # 头部匹配规则 channel.queue_bind( exchange="headers_exchange", queue='headers_queue1', routing_key='', arguments=fileds, # 当前队列绑定时,传入arguments参数控制路由规则 )
向当前路由规则的队列进行消息发布
channel.basic_publish( exchange='headers_exchange', routing_key="", body="headers_%s" % num, # 发送内容 properties=pika.BasicProperties(headers={"a": 1, "b": 2}), # 此处需严格匹配 )
消费对应规则队列内的消息,指明队列名即可
self.channel.consume('headers_queue1')
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)