RabbitMQ 消息确认机制 ——可靠抵达(发送端)

RabbitMQ 消息确认机制 ——可靠抵达(发送端),第1张

RabbitMQ 消息确认机制 ——可靠抵达(发送端) 前言 : rabbitMQ为了防止消息不丢失的情况,可以使用事物消息,但是性能下降250倍,为此引入确认机制


 如上图所示:

一、publisher   /confirm/iCallBack确认模式

 springboot开启rabbitmq可靠抵达 ——/confirm/iCallBack 

spring:
  rabbitmq:
    publisher-/confirm/i-type: correlated

当我们的publisher 到达 broker (服务器时候) ,返回/confirm/iCallback,当消息没有抵达broker的时候返回true,并会给出失败原因。

public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }


    
    @PostConstruct
    public void initRabbitTemplate(){
        
        rabbitTemplate. setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                
            }
        });

    }





}

二、publisher   returnCallBack 确认模式

springboot开启rabbitmq可靠抵达 —— returnCallBack

当我们开启publisher-returns 时候,将 spring.rabbitmq.template.mandatory 开启

作用:只要消息抵达队列 ,以异步方式优先回调 rerun/confirm/i

spring:
  rabbitmq:
    publisher-returns: true
    #只要消息抵达队列 ,以异步方式优先回调 rerunConfirm
    template:
      mandatory: true

 设置broker 抵达 queue 时候的returnCallBack回调

只要消息没有投递给指定的队列,就触发这个失败回调

        
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {

            }
        });

贴上RetrunedMessage实体返回的内容(此处为rabbitMQ自带实体,非自建)

public class ReturnedMessage {
    //投递失败的消息详细信息
    private final Message message;  
    //回复的状态码
    private final int replyCode;
    //回复的文本内容
    private final String replyText;
    //当时这个消息发给哪个交换机
    private final String exchange;
    //当时这个消息用哪个路由键
    private final String routingKey;

    public ReturnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        this.message = message;
        this.replyCode = replyCode;
        this.replyText = replyText;
        this.exchange = exchange;
        this.routingKey = routingKey;
    }

    public Message getMessage() {
        return this.message;
    }

    public int getReplyCode() {
        return this.replyCode;
    }

    public String getReplyText() {
        return this.replyText;
    }

    public String getExchange() {
        return this.exchange;
    }

    public String getRoutingKey() {
        return this.routingKey;
    }

    public String toString() {
        return "ReturnedMessage [message=" + this.message + ", replyCode=" + this.replyCode + ", replyText=" + this.replyText + ", exchange=" + this.exchange + ", routingKey=" + this.routingKey + "]";
    }
}

路由键绑定为 :

    
    @Test
    void bindingQueueExchange(){
        
        Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE,exchange,"chendazui.#",null);
        amqpAdmin.declareBinding(binding);
        log.info("[{}]绑定成功","chendazui-binding");
    }

当我们将路由键修改后:

    @Test
    void sendMessage(){
        MessageUtil messageUtil = new MessageUtil();
        messageUtil.setCode("1");
        messageUtil.setMsg("我到了");
        messageUtil.setOrder("陈大嘴的订单");
        messageUtil.setUser("陈大嘴");
        messageUtil.setDateTime(new Date());
        
        rabbitTemplate.convertAndSend(exchange,"chendazui1.#",messageUtil);
        log.info("[{}]消息已经发出",messageUtil);
    }

这个时候我们测试结果如下:

当前指向的交换机==>chendazui-exchange投递失败消息详情(Body:'{"code":"1","msg":"我到了","order":"陈大嘴的订单","user":"陈大嘴","dateTime":1634780951481}' MessageProperties [headers={__TypeId__=com.example.demo.util.MessageUtil}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])回复的文本内容NO_ROUTE路由键chendazui1.#回复的状态码=>312

这时候我们returnCallBack机制捕捉到失败消息,消息未抵达队列queue。

相关代码地址:

RabbitMQ Demo: rabbitmq 代码示例

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4683309.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存