RabbitMQ高级特性

RabbitMQ高级特性,第1张

RabbitMQ高级特性

RabbitMQ高级特性
  • TTL
    • 案例
    • TTL 小结
  • 死信队列
    • 消息成为死信的三种情况:
    • 队列绑定死信交换机
    • 案例
    • 死信队列小结
  • 延迟队列
    • 定时任务的时效性问题
    • RabbitMQ中实现延迟队列
    • 案例
    • 延迟队列小结
  • 消息幂等性保障
  • 消息积压

TTL
  • TTL 全称 Time To Live(存活时间/过期时间)。
  • 当消息到达存活时间后,还没有被消费,会被自动清除。
  • RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
案例

此处只需要启动生产端程序,然后观察指定的时间之后,消息是否被清除即可验证该功能

  • 修改生产端配置,并增加测试用例

    
    
        
        
    




    
        
    

 @Test
 public void testTtl() {


     for (int i = 0; i < 10; i++) {
         // 发送消息
         rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.baiqi", "message ttl....");
     }
 }


TTL 小结
  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
  • 如果两者都进行了设置,以时间短的为准。
死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:
  1. 队列消息长度到达限制;
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费;
队列绑定死信交换机

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

案例
  • 生产端配置和单元用例




    
    
        
        

        
        

        
        
        
        
    



    
        
    







    
        
    

 
 @Test
 public void testDlx(){
     //1. 测试过期时间,死信消息
     rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe","死信消息测试1,开始...");

     //2. 测试长度限制后,消息死信
    

     //3. 测试消息拒收
     //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.baiqi","死信消息测试3,开始...");

 }
  • 消费端配置以及监听器


    
    

package com.baiqi.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


@Component
public class DlxListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            //int i = 3/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

  • 测试过期消息进入死信队列,这种情况只启动发送端即可。不用启动消费端。

  • 测试长度限制后,消息死信,队列长度为10.现在发送20个消息,必定有10个消息进入死信队列
  • 测试消息拒收
package com.baiqi.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


@Component
public class DlxListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

死信队列小结
  1. 死信交换机和死信队列和普通的没有区别
  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
  3. 消息成为死信的三种情况:
    1. 队列消息长度到达限制;
    2. 消费者拒接消费消息,并且不重回队列;
    3. 原队列存在消息过期设置,消息到达超时时间未被消费;
延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  • 定时器
  • 延迟队列

定时任务的时效性问题

RabbitMQ中实现延迟队列

在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

案例
  • 发送端配置和单元用例


    
    
        
        
        
    



    
        
    






    
        
    

@Test
public  void testDelay() throws InterruptedException {
    //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
    rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2020年12月...");


    //2.打印倒计时10秒
    for (int i = 10; i > 0 ; i--) {
        System.out.println(i+"...");
        Thread.sleep(1000);
    }

}
  • 消费端配置及代码

    
    

package com.baiqi.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


@Component
public class OrderListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

  • 案例演示



延迟队列小结
  1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
  2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。
消息幂等性保障

出现非幂等性的情况

  • 1、生产者已把消息发送到mq,在mq给生产者返回ack的时候网络中断,故生产者未收到确定信息,生产者认为消息未发送成功,但实际情况是,mq已成功接收到了消息,在网络重连后,生产者会重新发送刚才的消息,造成mq接收了重复的消息
  • 2、消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

  • 1、mq接收生产者传来的消息:
    mq内部会为每条消息生成一个全局唯一、与业务无关的消息id,当mq接收到消息时,会先根据该id判断消息是否重复发送,mq再决定是否接收该消息。
  • 2、消费者消费mq中的消息:
    也可利用mq的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过
消息积压
  • 消费者宕机积压
  • 消费者消费能力不足积压
  • 发送者发流量太大

上线更多的消费者,进行正常消费
上线专门的队列消费服务,
将消息先批量取出来,记录数据库,再慢慢处理

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5576242.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存