RabbitMQ之死信实现延时队列,并模拟订单过期抹除业务

RabbitMQ之死信实现延时队列,并模拟订单过期抹除业务,第1张

RabbitMQ之死信实现延时队列,并模拟订单过期抹除业务 前言

这几年一直在it行业里摸爬滚打,一路走来,不少总结了一些python行业里的高频面试,看到大部分初入行的新鲜血液,还在为各样的面试题答案或收录有各种困难问题

于是乎,我自己开发了一款面试宝典,希望能帮到大家,也希望有更多的Python新人真正加入从事到这个行业里,让python火不只是停留在广告上。

微信小程序搜索:Python面试宝典

或可关注原创个人博客:https://lienze.tech

也可关注微信公众号,不定时发送各类有趣猎奇的技术文章:Python编程学习

延时队列

消息不会立即被消费,在指定时间时,才将被消费,类似redis键空间

rabbitmq未直接实现延时队列,但是可以通过ttl+dlx实现延时队列 ,如果对这部分有不理解之处,可以参照我的上一篇博文

  • 用户注销,七日后账号从数据库抹去
  • 用户下单,30分钟检查数据库是否支付,未支付库存回滚,订单取消

比如模拟用户下单的需求,30分钟后,订单取消

死信有关队列初始化

def dead_queue_init(self):
    """
    死信交换机、队列初始化有关
	:return:
	"""
    self.channel.queue_declare(queue='dead_queue', durable=True)
    self.channel.exchange_declare(exchange='dead_direct', exchange_type=ExchangeType.direct)
    self.channel.queue_bind(queue='dead_queue', routing_key='dead_queue', exchange='dead_direct')

下单队列初始化及死信绑定

def order_queue_init(self):
    """
    下单队列初始化,绑定死信交换机及队列
    :return:
    """
    self.channel.queue_declare(
        queue='order_queue', durable=True, arguments={
            'x-dead-letter-exchange': 'dead_direct',  # 死信交换机
            'x-dead-letter-routing-key': 'dead_queue',  # 死信队列
            'x-message-ttl': "5000"
        })

初始化后的效果


现在用户正常下单,数据将会以5秒过期时间加入order_queue队列中

当5秒后,由dead_direct进入死信队列dead_queue中,此时死信消费者完成对当前订单的判断等逻辑


用户下单的生产者代码,非常简单,直接将消息发布至order_queue中即可

方便一些,可以记录当前下单时间,打印一下当前时间戳即可

def make_order(self):
    """
    下单,在下单队列中添加ttl为30(此处模拟为5)的消息
    :return:
    """
    self.channel.basic_publish(
        exchange='', routing_key='order_queue', body=json.dumps({"order_id": '1'}))
    print('[%s] order create.' % (datetime.datetime.now().strftime('%M:%S')))

消费者只需要消费死信队列dead_queue即可

def order_check(self, ch, method, properties, body):
    """订单检查
  	:return:
	"""
    order_id = json.loads(body.decode())
    print('[%s]: %s' % (datetime.datetime.now().strftime('%M:%S'), order_id))
    self.channel.basic_ack(delivery_tag=method.delivery_tag)
def order_consumer(self):
    """死信消费者,判断用户订单是否取消
    :return:
    """
    self.channel.basic_consume(
        queue='dead_queue',
        on_message_callback=self.order_check,
    )
    self.channel.start_consuming()

最后来看看效果

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688276.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存