谷粒商城--消息队列--高级篇笔记十

谷粒商城--消息队列--高级篇笔记十,第1张

谷粒商城--消息队列--高级篇笔记十 谷粒商城–消息队列–高级篇笔记十 1. 消息队列message queue 1.1 概述 1.1.1 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力 1.1.2 消息服务中两个重要概念:
  • 消息代理(message broker)和目的地(destination)
  • 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
1.1.3 消息队列主要有两种形式的目的地
  • 队列(queue):点对点消息通信(point-to-point)
  • 主题(topic):发布(publish) @Test void createExchange() { // public DirectExchange(String name, 交换机名称 // boolean durable, 是否持久化 // boolean autoDelete, 是否自动删除 // Map arguments) 参数 DirectExchange directExchange = new DirectExchange("test_exchange",false,false,null); amqpAdmin.declareExchange(directExchange); } }

    5.4.2 创建队列
        @Test
        void createQueue() {
    //        public Queue(String name,   队列名称
    //        boolean durable,            是否持久化
    //        boolean exclusive,            是否是排他队列(只能被一个consumer的连接占用)
    //        boolean autoDelete,          是否自动删除
    //        @Nullable Map arguments)  参数
            Queue queue = new Queue("test_queue",true,false,false);
            amqpAdmin.declareQueue(queue);
        }
    

    5.4.3 创建绑定
        @Test
        void createBinding() {
    //	public Binding(String destination                      【目的地,队列name或 交换机name(如果作为路由的话)】
    //                Binding.DestinationType destinationType,  【目的地类型 queue还是exchange(路由)】
    //                String exchange,                          【交换机】
    //                String routingKey,                        【路由键】
    //                @Nullable Map arguments)  【自定义参数】
            Binding binding = new Binding("test_queue", Binding.DestinationType.QUEUE,"test_exchange","test.binding",null);
            amqpAdmin.declareBinding(binding);
        }
    

    5.4.4 测试以上步骤是否有问题

    5.4.5 发送消息

    如果消息内容是一个对象,那么该对象必须实现Serializable接口,因为convertAndSend()默认使用的jdk序列化

        @Test
        void sendMessage() {
    //        public void convertAndSend(String exchange,     交换机
    //                String routingKey,                      路由键
    //                Object message,                         发送的消息
    //                MessagePostProcessor messagePostProcessor) 消息序列化器
            OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
            orderReturnReasonEntity.setId(1L);
            orderReturnReasonEntity.setName("sendMessageTest");
            orderReturnReasonEntity.setCreateTime(new Date());
            rabbitTemplate.convertAndSend("test_exchange","test.binding",orderReturnReasonEntity);
            log.info("消息发送完成");
        }
    

    5.4.5.1 使用json序列化对象

    gulimall-order/src/main/java/site/zhourui/gulimall/order/config/MyRabbitConfig.java

    package site.zhourui.gulimall.order.config;
    
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    @Configuration
    public class MyRabbitConfig {
    
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    }
    

    再次发送消息测试

        @Test
        void sendMessage() {
    //        public void convertAndSend(String exchange,     交换机
    //                String routingKey,                      路由键
    //                Object message,                         发送的消息
    //                MessagePostProcessor messagePostProcessor) 消息序列化器
            OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
            orderReturnReasonEntity.setId(1L);
            orderReturnReasonEntity.setName("sendMessageTest2");
            orderReturnReasonEntity.setCreateTime(new Date());
            rabbitTemplate.convertAndSend("test_exchange","test.binding",orderReturnReasonEntity);
            log.info("消息发送完成");
        }
    

    测试结果

    5.4.6 监听接收消息(@RabbitListener标在方法上)

    gulimall-order/src/main/java/site/zhourui/gulimall/order/listener/TestListener.java

    5.4.6.1 简单接收消息

    主启动类上需要加上@EnableRabbit

    在需要监听消息的方法上加上@RabbitListener,并指明监听队列名称

        @RabbitListener(queues = "test_queue")
        void receiveMessage(Object msg) {
            log.info("收到消息内容:"+msg+"==>类型:"+msg.getClass());
        }
    

    监听结果

    收到消息内容:(Body:'{"id":1,"name":"sendMessageTest2","sort":null,"status":null,"createTime":1639536120687}' MessageProperties [headers={__TypeId__=site.zhourui.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test.binding, deliveryTag=2, consumerTag=amq.ctag-OKodkqdQj7sbMy6xgVex5A, consumerQueue=test_queue])==>类型:class org.springframework.amqp.core.Message
    
    5.4.6.2 接收消息内容并反序列化对象

    上一步接收到消息还需要手动封装为对应对象,只需要将序列化的类放在message后就可以自动封装为对应对象(消息接收对象必须与消息发送对象一致)

    Message对象可以拿到消息的所有信息

        @RabbitListener(queues = "test_queue")
        void receiveMessage2(Message msg,OrderReturnReasonEntity orderReturnReasonEntity) {
            byte[] body = msg.getBody();
            MessageProperties messageProperties = msg.getMessageProperties();
            log.info("收到消息内容:"+msg+"==>内容"+orderReturnReasonEntity);
        }
    

    监听结果

    收到消息内容:(Body:'{"id":1,"name":"sendMessageTest2","sort":null,"status":null,"createTime":1639539092604}' MessageProperties [headers={__TypeId__=site.zhourui.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test.binding, deliveryTag=2, consumerTag=amq.ctag-58WYA7sSHI64OWjRLJpc9w, consumerQueue=test_queue])==>内容OrderReturnReasonEntity(id=1, name=sendMessageTest2, sort=null, status=null, createTime=Wed Dec 15 11:31:32 CST 2021)
    

    5.4.6.3 完整写法

    参数类型

    1、Mcssage message:原生消息详细信息。头+体
    2、T<发送的消息的类型>orderReturnReasonEntity content;3、Channel channel:当前传输数据的通道
    3、Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息场景:
    1)、订单服务启动多个;同一个消息,只能有一个客户端收到
    2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息

    gulimall-order/src/main/java/site/zhourui/gulimall/order/listener/TestListener.java

        @RabbitListener(queues = "test_queue")
        void receiveMessage3(Message msg,
                             OrderReturnReasonEntity orderReturnReasonEntity,
                             Channel channel) {
            log.info("信道:"+channel);
            byte[] body = msg.getBody();
            MessageProperties messageProperties = msg.getMessageProperties();
            log.info("收到消息内容:"+msg+"==>内容"+orderReturnReasonEntity);
        }
    

    5.4.6.4 验证多个消费者监听场景

    新建一个测试消息发送controller

    gulimall-order/src/main/java/site/zhourui/gulimall/order/controller/TestSendMessageController.java

    调用接口:http://localhost:9000/sendMessage?num=5发送多条消息

    package site.zhourui.gulimall.order.controller;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    import site.zhourui.common.utils.PageUtils;
    import site.zhourui.common.utils.R;
    import site.zhourui.gulimall.order.entity.OrderReturnReasonEntity;
    
    import java.util.Date;
    import java.util.Map;
    
    
    @RestController
    public class TestSendMessageController {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        
        @RequestMapping("/sendMessage")
        public R sendMany(@RequestParam("num") Integer num){
            for (int i = 0; i  
    

    复制一个订单模块,并启动

    调用消息发送接口发送10条消息

    http://localhost:9000/sendMessage?num=10

    结论:每个客户端都是接收的同一个队列,但是没有重复的消息

    并且在此期间只创建了两个连接,确认了一个客户端只有一个连接的说法

    5.4.7 监听接收消息(@RabbitListener标在类上,@RabbitHandler标在方法上)

    @RabbitListener(queues={“hello-java-queue”})放在类上【作用:用来指定接收哪个队列的消息】
    @RabbitHandler:标在方法上【作用:重载处理不同类型的数据】

    5.4.7.1 @RabbitListener与@RabbitHandler区别
    1. 作用域:
      • RabbitListener可以标在类上也可以标在方法上
      • RabbitHandler只能标在方法上
    5.4.7.2 重载处理不同类型的数据演示

    修改发送消息的接口,使其能够发送不同类型数据的消息

    gulimall-order/src/main/java/site/zhourui/gulimall/order/controller/TestSendMessageController.java

    package site.zhourui.gulimall.order.controller;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    import site.zhourui.common.utils.PageUtils;
    import site.zhourui.common.utils.R;
    import site.zhourui.gulimall.order.entity.OrderEntity;
    import site.zhourui.gulimall.order.entity.OrderReturnReasonEntity;
    
    import java.util.Date;
    import java.util.Map;
    import java.util.UUID;
    
    
    @RestController
    public class TestSendMessageController {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/sendMessage")
        public R sendMany(@RequestParam("num") Integer num){
            for (int i = 0; i  
    

    gulimall-order/src/main/java/site/zhourui/gulimall/order/listener/TestListener.java

    package site.zhourui.gulimall.order.listener;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    import site.zhourui.gulimall.order.entity.OrderEntity;
    import site.zhourui.gulimall.order.entity.OrderReturnReasonEntity;
    
    
    @Service
    @Slf4j
    @RabbitListener(queues = "test_queue")
    public class TestListener {
    //    @RabbitListener(queues = "test_queue")
    //    void receiveMessage(Object msg) {
    //        log.info("收到消息内容:"+msg+"==>类型:"+msg.getClass());
    //    }
    
    //    @RabbitListener(queues = "test_queue")
    //    void receiveMessage2(Message msg,OrderReturnReasonEntity orderReturnReasonEntity) {
    //        byte[] body = msg.getBody();
    //        MessageProperties messageProperties = msg.getMessageProperties();
    //        log.info("收到消息内容:"+msg+"==>内容"+orderReturnReasonEntity);
    //    }
    
    //    @RabbitListener(queues = "test_queue")
        @RabbitHandler
        void receiveMessage3(Message msg,
                             OrderReturnReasonEntity orderReturnReasonEntity,
                             Channel channel) {
    //        log.info("信道:"+channel);
    //        byte[] body = msg.getBody();
    //        MessageProperties messageProperties = msg.getMessageProperties();
            log.info("==>内容"+orderReturnReasonEntity);
        }
    
        @RabbitHandler
        void receiveMessage4(Message msg,
                             OrderEntity orderEntity,
                             Channel channel) {
    //        log.info("信道:"+channel);
    //        byte[] body = msg.getBody();
    //        MessageProperties messageProperties = msg.getMessageProperties();
            log.info("==>内容"+orderEntity);
        }
    }
    
    

    发送消息测试http://localhost:9000/sendMessage?num=10

    同一个队列拿出不同类型的数据,并且可以封装为对应的对象,做不同的处理,只使用RabbitListener是达不到这个效果的,

    但是如果接收消息与发送消息拿到的对象是自己将json封装的对象那么就这个功能用处就不大了

    5.5 消息可靠抵达 5.5.1 消息确认机制
    • 为什么不使用事务消息:保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制
    • publisher confirmCallback 确认模式(消息是否到达Broker消息代理)
    • publisher returnCallback 未投递到 queue 退回模式(只要消息没有投递给指定的队列,就触发这个失败回调)
    • consumer ack机制 (ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。)

    5.5.2 ConfirmCallback (确认模式:发送端确认)
    • spring.rabbitmq.publisher-confirms=true
      • 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback 。
        CorrelationData:用来表示当前消息唯一性。
      • 消息只要被 broker 接收到就会执行 /confirm/iCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 /confirm/iCallback。
      • 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。

    开启配置

      rabbitmq:
        host: 192.168.157.128
        port: 5672
        virtual-host: /
        #开启发送端确认
        publisher-/confirm/is: true
    

    自定义RabbitTemplate,开启确认模式

    package site.zhourui.gulimall.order.config;
    
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.PostConstruct;
    
    
    @Configuration
    public class MyRabbitConfig {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    
        
        @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
        public void initRabbitTemplate() {
    
            
            //设置确认回调
            rabbitTemplate.set/confirm/iCallback((correlationData,ack,cause) -> {
                System.out.println("/confirm/i...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
            });
    }
    
    

    5.5.3 returnCallback(回退模式:发送端确认)
    • spring.rabbitmq.publisher-returns=true
    • spring.rabbitmq.template.mandatory=true
      • confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式。
      • 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

    开启配置

    server:
      port: 9000
    spring:
      datasource:
        username: root
        password: root
        url: jdbc:mysql://192.168.157.128:3306/gulimall_oms?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
        driver-class-name: com.mysql.cj.jdbc.Driver
      cloud:
        nacos:
          discovery:
            server-addr: 127.0.0.1:8848
      rabbitmq:
        host: 192.168.157.128
        port: 5672
        virtual-host: /
        #开启发送端确认
        publisher-/confirm/is: true
    # 开启发送端消息抵达Queue确认
        publisher-returns: true
        # 只要消息抵达Queue,就会异步发送优先回调returnfirm
        template:
          mandatory: true
    
    

    自定义RabbitTemplate,开启回退模式

    gulimall-order/src/main/java/site/zhourui/gulimall/order/config/MyRabbitConfig.java

    package site.zhourui.gulimall.order.config;
    
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.PostConstruct;
    
    
    @Configuration
    public class MyRabbitConfig {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    
        
        @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
        public void initRabbitTemplate() {
    
            
            //设置确认回调
            rabbitTemplate.set/confirm/iCallback((correlationData,ack,cause) -> {
                System.out.println("/confirm/i...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
            });
    
    
            
            rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
                System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                        "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
            });
        }
    }
    
    

    因为该模式需要消息代理投递给指定队列失败才会触发,所以此处我们故意将路由键写错,此次测试后将错误改回来

    错误消息

    Fail Message[(Body:’{“id”:1,“name”:“消息—0”,“sort”:null,“status”:null,“createTime”:1639576478075}’ MessageProperties [headers={TypeId=site.zhourui.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])]>replyCode[312]>replyText[NO_ROUTE]>exchange[test_exchange]>routingKey[test.binding222]
    /confirm/i…correlationData[null]>ack:[true]>cause:[null]

    5.5.4 消息唯一id

    在消息发送的时候CorrelationData参数就是消息的唯一id

    这个id在发送端确认时是可以拿到的,排查那些消息未成功抵达是可以用来排查(与接收到的存放在数据库的消息唯一id进行对比)

    5.5.4 ack机制(消费端确认)
    • 消费者获取到消息,成功处理,可以回复Ack给Broker
      • basic.ack用于肯定确认;broker将移除此消息
      • basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
      • basic.reject用于否定确认;同上,但不能批量
    • 默认自动ack,消息被消费者收到,就会从broker的queue中移除
    • queue无消费者,消息依然会被存储,直到消费者消费
    • 消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
      • 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
      • 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
      • 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人
    5.5.4.1 消息收货

    目前存在的问题:

    默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息,此时会出现一个问题:

    在接收消息这里打上断点,将第一个消息走完,这时模拟突发状况,比如服务器宕机,关掉服务器

    关掉服务器后剩余的4个消息也都消失了,也没有经过消费端确认,相当于丢失了

    为了解决这个问题,我们将自动ack确认改为手动确认,只有手动ack确认的消息才能被队列移除或者再次放入队列

    开启手动ack机制

    server:
    port: 9000
    spring:
    datasource:
     username: root
     password: root
     url: jdbc:mysql://192.168.157.128:3306/gulimall_oms?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
     driver-class-name: com.mysql.cj.jdbc.Driver
    cloud:
     nacos:
       discovery:
         server-addr: 127.0.0.1:8848
    rabbitmq:
     host: 192.168.157.128
     port: 5672
     virtual-host: /
     #开启发送端确认
     publisher-/confirm/is: true
    # 开启发送端消息抵达Queue确认
     publisher-returns: true
     # 只要消息抵达Queue,就会异步发送优先回调returnfirm
     template:
       mandatory: true
     #    使用手动ack确认模式,关闭自动确认【消息丢失】
     listener:
       simple:
         acknowledge-mode: manual
    

    发送五个消息测试,就算客户端已经拿到了消息,但是没有确认,队列中的消息仍然不能移除,只不过状态由ready变为unacked

    此时关闭服务服务,消息的状态由unacked变为ready,下次客户端服务启动又会接收到消息ready变为unacked

    除非手动确认

    package site.zhourui.gulimall.order.listener;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    import site.zhourui.gulimall.order.entity.OrderEntity;
    import site.zhourui.gulimall.order.entity.OrderReturnReasonEntity;
    
    import java.io.IOException;
    
    
    @Service
    @Slf4j
    @RabbitListener(queues = "test_queue")
    public class TestListener {
    //    @RabbitListener(queues = "test_queue")
    //    void receiveMessage(Object msg) {
    //        log.info("收到消息内容:"+msg+"==>类型:"+msg.getClass());
    //    }
    
    //    @RabbitListener(queues = "test_queue")
    //    void receiveMessage2(Message msg,OrderReturnReasonEntity orderReturnReasonEntity) {
    //        byte[] body = msg.getBody();
    //        MessageProperties messageProperties = msg.getMessageProperties();
    //        log.info("收到消息内容:"+msg+"==>内容"+orderReturnReasonEntity);
    //    }
    
    //    @RabbitListener(queues = "test_queue")
        @RabbitHandler
        void receiveMessage3(Message msg,
                             OrderReturnReasonEntity orderReturnReasonEntity,
                             Channel channel) {
            System.out.println("接收到消息:---"+orderReturnReasonEntity);
            byte[] body = msg.getBody();
            MessageProperties messageProperties = msg.getMessageProperties();
            log.info("==>处理完成消息"+orderReturnReasonEntity.getName());
    
            long deliveryTag = messageProperties.getDeliveryTag();
    //        public void basicNack(long deliveryTag,  //channel内按顺序自增
    //        boolean multiple)                       //是否批量确认
    
            try {
                //debug模式无法模拟真实情况下的宕机,关闭了也会继续执行下去,这里模拟突然宕机部分消息未接到
                if(deliveryTag%2==0){
                channel.basicAck(deliveryTag,false);   //手动ack确认接收消息
                System.out.println("签收了货物---"+deliveryTag);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    //    @RabbitHandler
    //    void receiveMessage4(Message msg,
    //                         OrderEntity orderEntity,
    //                         Channel channel) {
            log.info("信道:"+channel);
            byte[] body = msg.getBody();
            MessageProperties messageProperties = msg.getMessageProperties();
    //        log.info("==>内容"+orderEntity);
    //    }
    }
    
    
    

    断点测试一个一个的放消息手动确认消息

    debug模式无法模拟真实情况下的宕机,关闭了也会继续执行下去,这里模拟突然宕机部分消息未接到,这里测试可以放开断点,

    此时只签收了货物—2,货物—4

    还有三个未确认

    关闭客户服务端,消息状态由unacked->ready

    剩余的消息仍然可以继续签收

    5.5.4.2 deliveryTag

    channel内按顺序自增

    个人理解:相当于channel信道中消息的唯一id

    上面为什么重启了就能再接收一个消息了?

    之前的发送的五个消息,只接受偶数项消息(2,4),还剩3条消息,客户端重启后剩余消息在channel中又重新排序(1,2,3),随意再次接收到一个消息

    5.5.4.3 退货
    package site.zhourui.gulimall.order.listener;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    import site.zhourui.gulimall.order.entity.OrderEntity;
    import site.zhourui.gulimall.order.entity.OrderReturnReasonEntity;
    
    import java.io.IOException;
    
    
    @Service
    @Slf4j
    @RabbitListener(queues = "test_queue")
    public class TestListener {
    //    @RabbitListener(queues = "test_queue")
    //    void receiveMessage(Object msg) {
    //        log.info("收到消息内容:"+msg+"==>类型:"+msg.getClass());
    //    }
    
    //    @RabbitListener(queues = "test_queue")
    //    void receiveMessage2(Message msg,OrderReturnReasonEntity orderReturnReasonEntity) {
    //        byte[] body = msg.getBody();
    //        MessageProperties messageProperties = msg.getMessageProperties();
    //        log.info("收到消息内容:"+msg+"==>内容"+orderReturnReasonEntity);
    //    }
    
    //    @RabbitListener(queues = "test_queue")
        @RabbitHandler
        void receiveMessage3(Message msg,
                             OrderReturnReasonEntity orderReturnReasonEntity,
                             Channel channel) {
            System.out.println("接收到消息:---"+orderReturnReasonEntity);
            byte[] body = msg.getBody();
            MessageProperties messageProperties = msg.getMessageProperties();
            log.info("==>处理完成消息"+orderReturnReasonEntity.getName());
    
            long deliveryTag = messageProperties.getDeliveryTag();
    //        public void basicNack(long deliveryTag,  //channel内按顺序自增
    //        boolean multiple)                         //是否批量确认
    
            try {
                //debug模式无法模拟真实情况下的宕机,关闭了也会继续执行下去,这里模拟突然宕机部分消息未接到
                if(deliveryTag%2==0){
                channel.basicAck(deliveryTag,false);   //手动ack确认接收消息
                System.out.println("签收了货物---"+deliveryTag);
                }else {
    //                public void basicNack(long deliveryTag, //channel内按顺序自增
    //                boolean multiple,                       //是否批量退货
    //                boolean requeue)                        //确认后是否重新入队 false丢弃
                    channel.basicNack(deliveryTag,false,false); //手动ack确认拒绝消息
    //                channel.basicReject();
                    System.out.println("拒接签收了货物---"+deliveryTag);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    //    @RabbitHandler
    //    void receiveMessage4(Message msg,
    //                         OrderEntity orderEntity,
    //                         Channel channel) {
            log.info("信道:"+channel);
            byte[] body = msg.getBody();
            MessageProperties messageProperties = msg.getMessageProperties();
    //        log.info("==>内容"+orderEntity);
    //    }
    }
    

    清除之前测试的消息,再次发送消息测试,

    因为这里的requeue为false,所有的消息都被清除了

    当requeue=true,deliveryTag奇数的消息被拒收的消息重新入队deliveryTag变为偶数,被接收

    6. RabbitMQ延时队列(实现定时任务) 6.1 为什么不使用定时任务

    定时任务时效性问题

    场景:比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。

    常用解决方案:spring的 schedule 定时任务轮询数据库

    缺点:消耗系统内存、增加了数据库的压力、存在较大的时间误差

    解决:rabbitmq的消息TTL和死信Exchange结合

    6.2 消息的存活时间(TTL)
    • 消息的TTL(Time To Live)就是消息的存活时间。
    • • RabbitMQ可以对队列和消息分别设置TTL。
      • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
      • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者xmessage-ttl属性来设置时间,两者是一样的效果。
    6.3 死信路由
    • 一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。(什么是死信)
      • 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
      • 上面的消息的TTL到了,消息过期了。
      • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
    • Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
    • 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
    • 手动ack&异常消息统一放在一个队列处理建议的两种方式
      • catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
      • 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
    6.4 延时队列实现-1(设置队列过期时间)

    6.5 延时队列实现-2(设置消息过期时间)

    6.6 延时队列定时关单模拟(设置队列过期时间) 6.6.1 实现方式

    基础版

    交换机与队列一对一,一台路由器路由一个队列

    升级版

    只需要一台交换机绑定多个队列

    6.6.2 实现 6.6.2.1 创建队列,交换机,绑定技巧

    容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下

    • 1、第一次使用队列【监听】的时候才会创建
    • 2、Broker没有队列、交换机才会创建
    6.6.2.2 实现

    gulimall-order/src/main/java/site/zhourui/gulimall/order/config/MyRabbitMQConfig.java

    package site.zhourui.gulimall.order.config;
    
    
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    
    
    @Configuration
    public class MyRabbitMQConfig {
    
    //    @RabbitHandler
    //    public void listen(Message message){
    //        System.out.println("收到消息:------>"+message);
    //    }
        
        @Bean
        public Queue orderDelayQueue(){
            HashMap arguments = new HashMap<>();
            
            Queue queue = new Queue("order.delay.queue",true,false,false,arguments);
            return queue;
        }
    
        
        @Bean
        public Queue orderReleaseQueue(){
            Queue queue = new Queue("order.release.order.queue",true,false,false);
            return queue;
        }
    
        
        @Bean
        public Exchange orderEventExchange(){
            
            TopicExchange topicExchange = new TopicExchange("order-event-exchange",true,false);
    
            return topicExchange;
        }
    
        
        @Bean
        public Binding orderCreateOrderBinding(){
            
            Binding binding = new Binding("order.delay.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.create.order",
                    null);
            return binding;
        }
    
        
        @Bean
        public Binding orderReleaseOrderBinding(){
            Binding binding = new Binding("order.release.order.queue",
                    Binding.DestinationType.QUEUE,
                    "order-event-exchange",
                    "order.release.order",
                    null);
            return binding;
        }
    
    
    }
    
    

    启动订单服务后

    队列

    交换机

    绑定关系

    6.6.2.3 测试

    向延时队列发送一条消息

    延时队列收到一条消息

    一分钟后该消息变为死信,再将该消息转发给死信队列

    死信队列收到消息

    使用测试代码测试

    发送消息

    gulimall-order/src/main/java/site/zhourui/gulimall/order/web/HelloController.java

        @Autowired
        private RabbitTemplate rabbitTemplate;
        @ResponseBody
        @GetMapping(value = "/test/createOrder")
        public String createOrderTest() {
    
            //订单下单成功
            OrderEntity orderEntity = new OrderEntity();
            orderEntity.setOrderSn(UUID.randomUUID().toString());
            orderEntity.setModifyTime(new Date());
    
            //给MQ发送消息
            rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
    
            return "ok";
        }
    

    接收消息

    gulimall-order/src/main/java/site/zhourui/gulimall/order/config/MyRabbitMQConfig.java

        @RabbitListener(queues = "order.release.order.queue")
        public void listen(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
            System.out.println("收到过期订单消息,准备关闭订单:------>"+orderEntity.getOrderSn());
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        }
    

    欢迎分享,转载请注明来源:内存溢出

    原文地址: http://outofmemory.cn/zaji/5688665.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存