RabbitMQ的高级特性--TTL、死信队列、延迟队列

RabbitMQ的高级特性--TTL、死信队列、延迟队列,第1张

RabbitMQ的高级特性--TTL、死信队列、延迟队列

目录

1.TTL机制

       1.1 实现方案

       1.2 原生API实现

       1.3 SpringBoot实现

2.死信队列

        2.1 原生API实现

        2.2 SpringBoot实现

3.延迟队列

        3.1 延时队列的使用


1.TTL机制        1.1 实现方案

         目前的电商业务中订单创建成功,等待支付一般都会给一定的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。

         如何实现这个功能?

         定时轮询(数据库等)

                用户下单成功,将订单数据放入数据库,同时将支付状态放入数据库,用户付款更
改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。

                优点:设计实现简单。

                缺点: 需要对数据库进行大量的IO *** 作,效率低下。

         Timer

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
    @Override
    public void run() {
        System.out.println("用户没有付款,交易取消:" +
        simpleDateFormat.format(new
        Date(System.currentTimeMillis())));
        timer.cancel();
    }
};
System.out.println("等待用户付款:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
// 10秒后执行timerTask
timer.schedule(timerTask, 10 * 1000);

        

                优点: 没有想到

                缺点: 没有持久化机制。

                            不灵活 (只可以设置开始时间和重复间隔, 对于其它业务不太合适)。

                            不能利用线程池,一个timer一个线程。

                            没有真正的管理计划。

         Scheduler及其它定时器

// 线程工厂
ThreadFactory factory = Executors.defaultThreadFactory();
// 使用线程池
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
System.out.println("开始等待用户付款10秒:" + format.format(new Date()));
service.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("用户未付款,交易取消:" +
        format.format(new Date()));
    }// 等待10s 单位秒
}, 10, TimeUnit.SECONDS);

                优点:可多线程执行,一定程度上避免任务互相影响,单个任务异常不影响其它任务

                缺点:高并发下不建议使用定时任务,较浪费服务器性能

         RabbitMQ 的 TTL

                通过对 消息 和 队列 两个维度来设置 TTL

                优点:无需消费过多的服务器性能即可实现定时的功能

                缺点:任何中间件的容量和堆积能力都是有限的,且消息中间件的引入会增加许多问题

        

        由于消息中间件的容量和堆积能力都是有限的,如果有些消息总是无法消费掉,就需要有一种东西进行兜底。

        目前有两种方法设置消息的TTL,两种方法同时使用,则消息的TTL已两者之间 较小数值为准。

                1. 通过 Queue 属性设置,队列中所有消息都有相同的过期时间。

                2. 对消息自身进行单独设置,每条消息的TTL可以不同。

       1.2 原生API实现
try(Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()) {
    // 创建队列(实际上使用的是AMQP default这个direct类型的交换器)
    // 设置队列属性
    Map arguments = new HashMap<>();
    // 设置队列的TTL
    arguments.put("x-message-ttl", 30000);
    // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
    arguments.put("x-expires", 10000);

    channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);

    channel.basicPublish("", 
                         QUEUE_NAME, 
                         //设置消息本身的过期时间
                         new AMQP.BasicProperties().builder().expiration("30000").build(),
                         message.getBytes())

} catch (TimeoutException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

 此外,还可以通过命令行方式设置全局TTL,执行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

默认规则:

        1.如果不设置 x-message-ttl, 则表示消息不会过期。

        2.如果 x-message-ttl 设置为0, 则表示除非消息可以直接将消息投递到消费者,否则消息会被立即丢弃 。

       1.3 SpringBoot实现
@Configuration
public class RabbitConfig {
    @Bean
    public Queue queueTTLWaiting() {
        Map props = new HashMap<>();
        // 对于该队列中的消息,设置都等待10s
        props.put("x-message-ttl", 10000);
        // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
        props.put("x-expires", 1000);
        Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
        return queue;
    }
}
@RestController
public class controller{
    
   @RequestMapping("/pay/msgttl")
   public String sendTTLMessage() throws UnsupportedEncodingException {
       MessageProperties properties = new MessageProperties();
       //设置消息本身过期时间
       properties.setExpiration("5000");
       Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);
       rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
       return "msg-ttl-ok";
    }     
}

2.死信队列

       DLX,全称为 Dead-Letter-Excahnge, 死信交换机。

        在各个外卖系统中,用户下单调用订单服务, 然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单系统 采用 MQ异步通讯。

        用户在下单之后,外卖员接单之后再取消接单应该如何处理,用户下单长时间未被接单如何处理?

        这种场景下我们可以考虑定义一个死信交换机,并绑定一个死信队列。当消息变成死信时,该消息就会被发送到该死信队列上,这样方便我们查看消息失败的原因,从而j进行下一步的处理,是取消订单还是安排其他的外卖员。

        消息在到达队列之后,被重新发送到一个特殊的交换机(DLX)中, 同时,绑定DLX的队列就称为“死信队列”。

        一下几种情况导致消息变成死信:

                1. 消息被拒绝 (Basic.Reject / Basic.Nack), 并且设置 requeue 参数为 false。

                2. 消息过期。

                3. 队列达到最大长度。

        同时我们也可以在处理异常的时候,如果消息不能被消费者正常消费,可以放置到死信队列,后续进行分析程序中的异常情况,进而改善和优化系统。

        2.1 原生API实现
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

    // 定义一个死信交换器(也是一个普通的交换器)
    channel.exchangeDeclare("exchange.dlx", "direct", true);

    // 定义一个正常业务的交换器
    channel.exchangeDeclare("exchange.biz", "fanout", true);

    Map arguments = new HashMap<>();
    // 设置队列TTL
    arguments.put("x-message-ttl", 10000);
    // 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
    arguments.put("x-dead-letter-exchange", "exchange.dlx");
    // 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的 routingKey
    arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test");

    //定义交换机队列及绑定关系
    channel.queueDeclare("queue.biz", true, false, false, arguments);
    channel.queueBind("queue.biz", "exchange.biz", "");
    channel.queueDeclare("queue.dlx", true, false, false, null);
    // 死信队列和死信交换器
    channel.queueBind("queue.dlx", "exchange.dlx", "routing.key.dlx.test");

    channel.basicPublish("exchange.biz", 
                         "",
                         MessageProperties.PERSISTENT_TEXT_PLAIN,
                         "dlx.test".getBytes());
} catch (Exception e) {
    e.printStackTrace();
}
        2.2 SpringBoot实现
@Configuration
public class RabbitConfig {
    @Bean
    public Queue queue() {
        Map props = new HashMap<>();
        // 消息在队列中的生存时间 10s
        props.put("x-message-ttl", 10000);
        // 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
        props.put("x-dead-letter-exchange", "ex.go.dlx");
        // 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
        props.put("x-dead-letter-routing-key", "go.dlx");
        Queue queue = new Queue("q.go", true, false, false, props);
        return queue;
    }
}

一般来说,死信队列都会配合RabbitMQ的TTL来使用,在消息超时之后会被自动路由到死信队列中。 

3.延迟队列

        在一定的业务场景中,我们发送到消息队列中的消息不一定想立即被消费,就比如:

        在12306中购买火车票,选中一个座位中订单未支付的时间段中系统会将这个座位锁定,如果超过时间还没有付款的话系统会自动把座位释放掉,怎么实现类似的功能呢?

        1.可以用定时任务每分钟扫描一次,发现有超过15分钟的就释放掉,但这样非常浪费系统资源。

        2. 可以用分布式锁、分布式缓存的被动过期时间,15分钟之后锁会自动释放,但是这样会长期占用系统的资源。

        3. 可以通过TTL配合死信队列来实现。但是TTL扫描是从队列的头依此往后扫描,加入第一个消息没有超时,后续的消息是不会被扫描的,如果队列中的消息过期时间不是固定的,就会导致后面消息过期了,但是仍然没有被处理。

        因此可以使用延迟队列,锁座成功之后会发送一条延迟消息到延时交换机,延时交换机轮询所有的消息,消息到指定的时间后会被消费,消费的过程就是检查这个座位是否是“已付款”状态;

        RabbitMQ本身并没有提供延时队列的功能,可以用 rabbitmq_delayed_message_exchange 插件来实现,它与TTL最大的不同就是 TTL 存放消息在死信队列中,而它则是存放消息在延时交换机中。

        

1. 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
2. 延时交换机(exchange)存储消息持续扫描所有的消息,等待消息到期根据路由键(routekey)找到      绑定自己的队列(queue)并把消息给它
3. 队列(queue)再把消息发送给监听它的消费者(customer)

        3.1 延时队列的使用

                下载插件

                下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

                安装插件

、            将插件拷贝到rabbitmq-server的安装路径

# 启用插件
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

#重启rabbitmq-server
systemctl restart rabbitmq-server
@Configuration
public class RabbitConfig {
    @Bean
    public Exchange exchange() {
        Map props = new HashMap<>();
        props.put("x-delayed-type", ExchangeTypes.FANOUT);
        Exchange exchange = new CustomExchange("ex.delayed", 
                                               "xdelayed-message", 
                                               true,
                                               false, 
                                               props);
        return exchange;
}
public class PublishController {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/prepare/{seconds}")
    public String toMeeting(@PathVariable Integer seconds) throws UnsupportedEncodingException {
    // RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
    // 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
    // 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
    // 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
    MessageProperties properties = new MessageProperties();
    properties.setHeader("x-delay", (seconds - 10) * 1000);
    Message message = new Message((seconds + "秒后召开销售部门会议。").getBytes("utf-8"),properties);
    rabbitTemplate.convertAndSend("ex.delayed", "key.delayed",message);
    return "已经定好闹钟了,到时提前告诉大家";
}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5722358.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存