这几年一直在it行业里摸爬滚打,一路走来,不少总结了一些python行业里的高频面试,看到大部分初入行的新鲜血液,还在为各样的面试题答案或收录有各种困难问题
于是乎,我自己开发了一款面试宝典,希望能帮到大家,也希望有更多的Python新人真正加入从事到这个行业里,让python火不只是停留在广告上。
微信小程序搜索:Python面试宝典
或可关注原创个人博客:https://lienze.tech
也可关注微信公众号,不定时发送各类有趣猎奇的技术文章:Python编程学习
工作模式通过与各种交换机搭配,rq提供了为如下几种主要的工作模式
简单模式一队列,一消费者、生产者,使用默认交换机,即可构成最基本的模型
生产者
import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
消费者
import pika, sys, os def main(): connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') # 队列声明 def callback(ch, method, properties, body): # 消费者接收回调 print(" [x] Received %r" % body) channel.basic_consume( queue='hello', on_message_callback=callback, auto_ack=True) # 回调声明 print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 开启消费 if __name__ == '__main__': main()工作队列
类似简单模式,但是会有多个消费者,此时rq将会按照轮询的方式分别将消息给予每个消费者
消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失)
RabbitMQ 将理解消息未完全处理并将重新排队。如果有其他消费者同时在线,它会迅速将其重新交付给另一个消费者
默认情况,启动确认模式为手动确认
生产者
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 队列声明 channel.queue_declare(queue='task_queue', durable=True) # 队列持久化 message = ' '.join(sys.argv[1:]) or "Hello World!" # 消息发布 channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 消息持久化 )) print(" [x] Sent %r" % message) connection.close()
消费者,可以启动N>1个守护进程
import pika import time connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 队列声明 channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode()) time.sleep(body.count(b'.')) print(" [x] Done") # 消息确认 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()pub/sub订阅模式
结合交换机实现一端生产,多个消费者可以都接收消费到相同的消息
实现该模式,主要基于扇形交换机fanout完成,他的特点是,将消息发布至绑定在交换机上的所有队列中
生产者
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() # fanout交换机声明 channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" # 消息发布 channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
消费者no.1
import pika # 队列声明 channel.queue_declare(queue='queue1', exclusive=True) # 队列绑定 channel.queue_bind(exchange='logs', queue="queue1") # 消费回调 def callback(ch, method, properties, body): print(" [x] %r" % body.decode()) # 消费者监听 channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
消费者no.N
channel.queue_declare(queue='queueN', exclusive=True) channel.queue_bind(exchange='logs', queue="queueN") def callback(ch, method, properties, body): print(" [x] %r" % body.decode()) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
如果该fanout交换机未绑定任何队列,那么对于该交换机的消息发布,消息都将会丢失
查看所有当前队列绑定,可以在控制台或命令行使用进行查看
rabbitmqctl list_bindings路由模式
通过结合交换器direct模式,可以为队列绑定指定routing_key路由规则,让交换器可以选择的为某些队列进行消息投递
生产者
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') for _ in range(10): # 偶数 -> orange # 奇数 -> black # 0 -> green if _ == 0 : routing_key = 'green' elif _ % 2 == 0: routing_key = 'orange' else: routing_key = 'black' channel.basic_publish( exchange='direct_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
消费者
此时出现多队列,那么需要消费者指定队列进行消费,比如对black的消费
def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body.decode())) channel.basic_consume( queue="black", on_message_callback=callback, auto_ack=True) channel.start_consuming()主题模式
主题模式,借助topic交换机的匹配队列功能,可以使业务代码对于交换机的选择更为灵活
简而言之可以对业务进行包含关键key的分类,而key将作为队列的匹配规则
生产者
from multiprocessing import current_process import pika from pika.exchange_type import ExchangeType class CPUTest: def __init__(self): self.user = "guest" self.pwd = "guest" self.ip = "192.168.17.3" self.port = 5672 def __call__(self): self.producer() def producer(self): credentials = pika.PlainCredentials(self.user, self.pwd) connection = pika.BlockingConnection( pika.ConnectionParameters( self.ip, self.port, '/', credentials) ) # 建立channel channel = connection.channel() channel.exchange_declare( exchange='topic_exchange', exchange_type=ExchangeType.topic, durable=True ) for num in range(1, 3): channel.queue_declare( queue='topic_queue{}'.format(str(num)), durable=True) # 队列创建 channel.queue_bind( exchange="topic_exchange", queue='topic_queue1', routing_key='A.*'.format(str(num))) # A.zxcnzjcn 绑定时,是匹配A.(任意n个单词) channel.queue_bind( exchange="topic_exchange", queue='topic_queue2', routing_key='*.B'.format(str(num))) # A.B -> topic_queue1 topic_queue2 # A.C -> topic_queue1 # C.B -> topic_queue2 # 向消息队列发送一条消息 for num in range(5): channel.basic_publish( exchange='topic_exchange', routing_key="A.B", # 向两款同时发送消息 body="topic_%s" % num, # 发送内容 properties=pika.BasicProperties(delivery_mode=2) # 消息持久化 ) channel.cancel() connection.close() # 连接关闭 return current_process().name
消费者
消费者还是要指定对应的队列进行消费
生产者通过主题模糊匹配生产
RPC模式如果需要在远程计算机上运行一个函数并等待结果,那么将需要远程过程调用/RPC机制
实现rpc机制,需要创建有关于rpc客户端的回调队列,并且为了在回调队列中可以获取响应的详细归属,可以通过维护业务id来分辨
rpc模型下,将称为客户端、服务端,也就代表着,服务端可能同时成为消费者、生产者
数据接收的消费者,以及结果发布的生产者
服务端: 提供rpc远程调用,封装斐波那契数列方法,通过客户端请求发送n,返回n的斐波那契数
import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() # rpc_queue队列用来进行需求消息获取 channel.queue_declare(queue='rpc_queue') # 斐波那契数列 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body.decode()) # 获取客户端发来要获取的斐波那契n值 fibnumber = fib(n) # 斐波那契值 ch.basic_publish( exchange='', # 结果队列,将有消费者定义结果存放在何队列 routing_key=props.reply_to, # 结果的当前标记,用以确定结果是否为当前客户端所需 properties=pika.BasicProperties(correlation_id = props.correlation_id), # 结果 body=str(fibnumber) ) # 消息确认 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) channel.start_consuming()
客户端
客户端与服务端共同使用rpc_queue进行rpc调用信息的发布、接收
此外,客户端还需要在每次生产需求时,额外提供当前需求对应回复的key也就是correlation_id用以确保结果可靠
并且,为了避免分布式客户端情况下,可能会造成correlation_id重复而结果获取有误,可以在每个客户端维护一个单独的结果队列进行结果获取
首先是初始化工作,并且为了可以支持结果获取,还需要客户端额外订阅结果获取队列callback_queue,也就是服务端拿到的reply_to
class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() # 结果队列 result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, # 消费的结果回调方法,校验correlation_id on_message_callback=self.on_response, auto_ack=True, )
工作函数,用来接收斐波那契的n值,并将其发布至rpc_queue队列中
发布过程,携带reply_to以及correlation_id属性,correlation_id可以用uuid模块生成
def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) # 发布斐波那契获取消息 self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, # 结果队列 correlation_id=self.corr_id, # 结果标记 ), body=str(n) ) # 如果当前 并未有rpc调用结果 while self.response is None: # 非阻塞执行消费动作,获取结果 self.connection.process_data_events() # 拿到结果则返回 return int(self.response)
拿取结果的消费回调函数,需要校验corr_id是否与结果返回中的correlation_id即可
def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body
这只是考虑了基本的rpc,并不存在任何并发
如果在大量请求时,可以结合redis等工具完成任务的获取,虽然不能保证有序,但可以确保结果数据的有效
并发过多线程的服务端,可能导致标记的key并不为有序,但是可以将这些key存储在redis中进行校验
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)