RabbitMQ之队列消息TTL、死信交换机、死信生产者消费者、

RabbitMQ之队列消息TTL、死信交换机、死信生产者消费者、,第1张

RabbitMQ之队列/消息TTL、死信交换机、死信生产者/消费者、 前言

这几年一直在it行业里摸爬滚打,一路走来,不少总结了一些python行业里的高频面试,看到大部分初入行的新鲜血液,还在为各样的面试题答案或收录有各种困难问题

于是乎,我自己开发了一款面试宝典,希望能帮到大家,也希望有更多的Python新人真正加入从事到这个行业里,让python火不只是停留在广告上。

微信小程序搜索:Python面试宝典

或可关注原创个人博客:https://lienze.tech

也可关注微信公众号,不定时发送各类有趣猎奇的技术文章:Python编程学习

TTL

TTL: time to live

表示消息/队列的存活时间,需在生产者发布消息时进行创建


队列存活一定时间

使用x-expires属性控制,不管队列中是否还有消息,都将删除该队列

单位为毫秒

channel.queue_declare(
    queue='ttl_queue',
    durable=True,
    arguments={'x-expires': 10000} # 队列ttl
)

消息统一存活一定时间

使用x-message-ttl属性控制

单位为毫秒,具备当前属性声明的队列,其中所有的消息都将在指定毫秒数后消失

channel.queue_declare(
    queue='ttl_queue',
    arguments={'x-message-ttl': 10000}# 消息ttl
)

消息单独存活一定时间

在消息发布时,使用expiration参数,注意参数的值为字符串

channel.basic_publish(
    exchange='',
    routing_key='ttl_queue',
    body="test",
    properties=pika.BasicProperties(
        expiration="5000"), # 设置单独消息过期
)

如果队列和消息均设置了过期时间,遵循短板原则

并且,rabbitmq对于过期消息的检测为懒惰,只有在消息处于队列顶端,即将消费时,才会检测是否过期,如过期则移除

死信

成为死信的条件

  1. basic_nack或basic_reject时,如给定requeue属性,则此时消息转为dead letter

  2. 消息TTL过期

  3. 队列达到阈值


当消息称为死信后,可以被重新发送到另外一个交换机,这个交换机被称为DLX(Dead Letter Exchange)


当消息过期时,无法被消费,如果当前队列绑定了一个DLX,那么此时这个过期消息不会直接消失,而是会被转投到死信交换机上

那么也就可以重复消费了



死信交换机

一个队列绑定死信交换机可能需要如下两个属性设置

  • x-dead-letter-exchange: 死信交换机,可以是任意交换机类型

  • x-dead-letter-routing-key: 死信交换机绑定队列时需要的路由键,不设置则使用默认

    • 设置这个值可以让死信队列在一个死信交换机所有绑定的队列中,选择某队列进行死信消息的发布
    • 但如果死信交换机是fanout属性,则该设置没啥意义

  • 创建死信交换机,和正常交换机无区别,其他队列绑定至当前死信交换机时,需要有routing_key
# 死信交换机
channel.exchange_declare(
    exchange='dlx_direct', exchange_type=ExchangeType.direct)
  • 创建死信队列并绑定至死信交换机,接收死信交换机发来的死信消息
# 死信队列
channel.queue_declare(queue='dead_queue')
# 死信队列绑定
channel.queue_bind(
    queue='dead_queue', exchange='dlx_direct', routing_key='dead_queue')

  • 创建正常队列,并设置死信属性
# 正常队列
channel.queue_declare(
    queue="dlx_queue",
    arguments={
        'x-dead-letter-exchange': 'dlx_direct',
        'x-dead-letter-routing-key': 'dead_queue'
    })
成为死信消息

成为死信消息,可以有如下的方式,不过一般较多的实现还是通过ttl时间进行

  1. 队列长度达到限制,新加入消息将成为死信
channel.queue_declare(
    queue='ttl_queue2',
    durable=True,
    arguments={
        'x-max-length': 10,  # 队列长度
    }
)

先让队列满员

for var in range(10):
    channel.basic_publish(
        exchange='',
        routing_key='ttl_queue',
        body="test",
    )

再发送一条死信

channel.basic_publish(
    exchange='',
    routing_key='ttl_queue',
    body="dead letter",
)
  1. 消费者接收消息后,不ack消息,并且requeue为false,比如basic_reject或basic_nack

消费者获取一条消费信息,并不会自动确认

method, properties, body = channel.basic_get('ttl_queue', auto_ack=False)
channle.basic_reject('ttl_queue', delivery_tag=method.delivery_tag)
  1. 存在消息过期设置,且消息过期
channel.basic_publish(
    exchange='',
    routing_key='dlx_queue',
    body="dead letter",
    properties=pika.BasicProperties(expiration="5000"), # 5秒后成为死信
)
死信消费者
import time

import pika
from pika.exchange_type import ExchangeType


class Consumer:
    def __init__(self):
        self.user = "guest"
        self.pwd = "guest"
        self.ip = "192.168.17.3"
        self.port = 5672

    def __call__(self):
        self.consumer()

    @staticmethod
    def callback(channel, method, properties, body):
        print('[dead_queue]-[get]:', body.decode())

    def consumer(self):
        credentials = pika.PlainCredentials(self.user, self.pwd)
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(self.ip, self.port, '/', credentials))
        channel = connection.channel()

        # 死信交换机
        channel.exchange_declare(
            exchange='dlx_direct', exchange_type=ExchangeType.direct)
        # 死信队列
        channel.queue_declare(queue='dead_queue')
        # 死信队列绑定
        channel.queue_bind(queue='dead_queue', exchange='dlx_direct', routing_key='dead_queue')

        # 正常队列
        channel.queue_declare(
            queue="dlx_queue",
            arguments={
                'x-dead-letter-exchange': 'dlx_direct',
                'x-dead-letter-routing-key': 'dead_queue'
            })

        print('[consumer]-[dead_queue] start...')
        channel.basic_consume('dead_queue', on_message_callback=self.callback, auto_ack=True)
        channel.start_consuming()
        channel.close()
        connection.close()


def main():
    c = Consumer()
    c()
    print('...... over')


if __name__ == '__main__':
    main()
死信生产者
import pika


class Producer:
    def __init__(self):
        self.user = "guest"
        self.pwd = "guest"
        self.ip = "192.168.17.3"
        self.port = 5672

    def __call__(self):
        self.producer()

    def producer(self):
        credentials = pika.PlainCredentials(self.user, self.pwd)
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(self.ip, self.port, '/', credentials))
        channel = connection.channel()
        channel.basic_publish(
            exchange='',
            routing_key='dlx_queue',
            body="dead letter",
            properties=pika.BasicProperties(expiration="5000"),
        )
        connection.close()  # 连接关闭


def main():
    c = Producer()
    c()
    print('...... over')


if __name__ == '__main__':
    main()
效果

当运行生产者后,dlx_queue队列中将出现一条5秒后消失的消息

经过5秒后

那么该消息将会进入dead_queue死信队列

思考: 如果死信队列的消费者拿到了死信消息后,ack为false,此时还可以继续为当前死信队列再链一个死信交换机吗?

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688267.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存