这几年一直在it行业里摸爬滚打,一路走来,不少总结了一些python行业里的高频面试,看到大部分初入行的新鲜血液,还在为各样的面试题答案或收录有各种困难问题
于是乎,我自己开发了一款面试宝典,希望能帮到大家,也希望有更多的Python新人真正加入从事到这个行业里,让python火不只是停留在广告上。
微信小程序搜索:Python面试宝典
或可关注原创个人博客:https://lienze.tech
也可关注微信公众号,不定时发送各类有趣猎奇的技术文章:Python编程学习
TTLTTL: time to live
表示消息/队列的存活时间,需在生产者发布消息时进行创建
队列存活一定时间
使用x-expires属性控制,不管队列中是否还有消息,都将删除该队列
单位为毫秒
channel.queue_declare( queue='ttl_queue', durable=True, arguments={'x-expires': 10000} # 队列ttl )
消息统一存活一定时间
使用x-message-ttl属性控制
单位为毫秒,具备当前属性声明的队列,其中所有的消息都将在指定毫秒数后消失
channel.queue_declare( queue='ttl_queue', arguments={'x-message-ttl': 10000}# 消息ttl )
消息单独存活一定时间
在消息发布时,使用expiration参数,注意参数的值为字符串
channel.basic_publish( exchange='', routing_key='ttl_queue', body="test", properties=pika.BasicProperties( expiration="5000"), # 设置单独消息过期 )
如果队列和消息均设置了过期时间,遵循短板原则
死信并且,rabbitmq对于过期消息的检测为懒惰,只有在消息处于队列顶端,即将消费时,才会检测是否过期,如过期则移除
成为死信的条件
-
basic_nack或basic_reject时,如给定requeue属性,则此时消息转为dead letter
-
消息TTL过期
-
队列达到阈值
当消息称为死信后,可以被重新发送到另外一个交换机,这个交换机被称为DLX(Dead Letter Exchange)
当消息过期时,无法被消费,如果当前队列绑定了一个DLX,那么此时这个过期消息不会直接消失,而是会被转投到死信交换机上
那么也就可以重复消费了
死信交换机
一个队列绑定死信交换机可能需要如下两个属性设置
-
x-dead-letter-exchange: 死信交换机,可以是任意交换机类型
-
x-dead-letter-routing-key: 死信交换机绑定队列时需要的路由键,不设置则使用默认
- 设置这个值可以让死信队列在一个死信交换机所有绑定的队列中,选择某队列进行死信消息的发布
- 但如果死信交换机是fanout属性,则该设置没啥意义
- 创建死信交换机,和正常交换机无区别,其他队列绑定至当前死信交换机时,需要有routing_key
# 死信交换机 channel.exchange_declare( exchange='dlx_direct', exchange_type=ExchangeType.direct)
- 创建死信队列并绑定至死信交换机,接收死信交换机发来的死信消息
# 死信队列 channel.queue_declare(queue='dead_queue') # 死信队列绑定 channel.queue_bind( queue='dead_queue', exchange='dlx_direct', routing_key='dead_queue')
- 创建正常队列,并设置死信属性
# 正常队列 channel.queue_declare( queue="dlx_queue", arguments={ 'x-dead-letter-exchange': 'dlx_direct', 'x-dead-letter-routing-key': 'dead_queue' })成为死信消息
成为死信消息,可以有如下的方式,不过一般较多的实现还是通过ttl时间进行
- 队列长度达到限制,新加入消息将成为死信
channel.queue_declare( queue='ttl_queue2', durable=True, arguments={ 'x-max-length': 10, # 队列长度 } )
先让队列满员
for var in range(10): channel.basic_publish( exchange='', routing_key='ttl_queue', body="test", )
再发送一条死信
channel.basic_publish( exchange='', routing_key='ttl_queue', body="dead letter", )
- 消费者接收消息后,不ack消息,并且requeue为false,比如basic_reject或basic_nack
消费者获取一条消费信息,并不会自动确认
method, properties, body = channel.basic_get('ttl_queue', auto_ack=False) channle.basic_reject('ttl_queue', delivery_tag=method.delivery_tag)
- 存在消息过期设置,且消息过期
channel.basic_publish( exchange='', routing_key='dlx_queue', body="dead letter", properties=pika.BasicProperties(expiration="5000"), # 5秒后成为死信 )死信消费者
import time import pika from pika.exchange_type import ExchangeType class Consumer: def __init__(self): self.user = "guest" self.pwd = "guest" self.ip = "192.168.17.3" self.port = 5672 def __call__(self): self.consumer() @staticmethod def callback(channel, method, properties, body): print('[dead_queue]-[get]:', body.decode()) def consumer(self): credentials = pika.PlainCredentials(self.user, self.pwd) connection = pika.BlockingConnection( pika.ConnectionParameters(self.ip, self.port, '/', credentials)) channel = connection.channel() # 死信交换机 channel.exchange_declare( exchange='dlx_direct', exchange_type=ExchangeType.direct) # 死信队列 channel.queue_declare(queue='dead_queue') # 死信队列绑定 channel.queue_bind(queue='dead_queue', exchange='dlx_direct', routing_key='dead_queue') # 正常队列 channel.queue_declare( queue="dlx_queue", arguments={ 'x-dead-letter-exchange': 'dlx_direct', 'x-dead-letter-routing-key': 'dead_queue' }) print('[consumer]-[dead_queue] start...') channel.basic_consume('dead_queue', on_message_callback=self.callback, auto_ack=True) channel.start_consuming() channel.close() connection.close() def main(): c = Consumer() c() print('...... over') if __name__ == '__main__': main()死信生产者
import pika class Producer: def __init__(self): self.user = "guest" self.pwd = "guest" self.ip = "192.168.17.3" self.port = 5672 def __call__(self): self.producer() def producer(self): credentials = pika.PlainCredentials(self.user, self.pwd) connection = pika.BlockingConnection( pika.ConnectionParameters(self.ip, self.port, '/', credentials)) channel = connection.channel() channel.basic_publish( exchange='', routing_key='dlx_queue', body="dead letter", properties=pika.BasicProperties(expiration="5000"), ) connection.close() # 连接关闭 def main(): c = Producer() c() print('...... over') if __name__ == '__main__': main()效果
当运行生产者后,dlx_queue队列中将出现一条5秒后消失的消息
经过5秒后
那么该消息将会进入dead_queue死信队列
思考: 如果死信队列的消费者拿到了死信消息后,ack为false,此时还可以继续为当前死信队列再链一个死信交换机吗?
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)