RabbitMQ-Java-07-发布确认高级

RabbitMQ-Java-07-发布确认高级,第1张

RabbitMQ-Java-07-发布确认高级 说明
  • RabbitMQ-Java-07-发布确认高级
  • 本案例是一个Maven+SpringBoot项目
  • 假设你已经实现了上一节延迟队列
  • 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/
核心概念 》发布确认高级说明
  • 发布确认高级是为了解决什么问题呢?
    • 比如RabbitMq服务器宕机或重启导致数据丢失问题
  • 主要 *** 作有哪些呢?
    • 准备一个Maven+SpringBoot项目,添加必要的Maven依赖以及RabbitMQ配置项,具体可以参考上一节延迟队列
    • 在上一节添加了RabbitMQ配置后还需要额外增加两项配置
      • 设置发布确认回调类型:发送消息到交换机会回调(成功失败都回调)
        spring.rabbitmq.publisher-/confirm/i-type=correlated
        
        • none:禁用(默认值)
        • correlated:发送消息到交换机会回调(成功失败都回调)
        • simple:在作用域中使用,相当于同步确认机制,有关闭channel风险
      • 开启发布返回
        spring.rabbitmq.publisher-returns=true
        
    • 新建发布确认高级-配置类
    • 新建发布确认高级-发布确认回调组件
      • @Component注解注册组件,implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnsCallback
      • @Autowired注解自动装配RabbitTemplate
      • @PostConstruct注解把当前类自动注入到全局RabbitTemplate中
        • 注意:@PostConstruct必须在@Autowired之后
      • 重写/confirm/i方法:交换机确认回调(成功失败都回调)
      • 重写returnedMessage方法:队列确认回调(仅送达失败时回调)
    • 新建发布确认高级-消费者01
    • 发布确认高级-控制器(生产者)
*** 作步骤 》完整代码
  • application.properties
    spring.rabbitmq.host=192.168.3.202
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456
    spring.rabbitmq.publisher-/confirm/i-type=correlated
    spring.rabbitmq.publisher-returns=true
    
  • My/confirm/iConfig
    package cn.cnyasin.rabbit.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    @Configuration
    public class MyConfirmConfig {
        // 交换机
        public static final String EXCHANGE_ConFIRM = "exchange_/confirm/i";
        // 队列
        public static final String QUEUE_ConFIRM = "queue_/confirm/i";
        // 路由key
        public static final String ROUTING_ConFIRM = "routing_/confirm/i";
    
        // 声明交换机
        @Bean
        public DirectExchange exchangeConfirm() {
            return ExchangeBuilder.directExchange(EXCHANGE_/confirm/i).build();
        }
    
        // 声明队列
        @Bean
        public Queue queueConfirm() {
            return QueueBuilder.durable(QUEUE_/confirm/i).build();
        }
    
        // 绑定队列交换机路由key
        @Bean
        public Binding queueConfirmBindingExchange/confirm/i(
                @Qualifier("queue/confirm/i") Queue queue/confirm/i,
                @Qualifier("exchange/confirm/i") DirectExchange exchangeConfirm
        ) {
            return BindingBuilder.bind(queue/confirm/i).to(exchange/confirm/i).with(ROUTING_/confirm/i);
        }
    
    }
    
    
  • My/confirm/iCallback
    package cn.cnyasin.rabbit.component;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ReturnedMessage;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.Date;
    
    
    @Slf4j
    @Component
    public class MyConfirmCallback implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnsCallback {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnsCallback(this);
        }
    
        
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                log.info("[*] [{}] 确认回调成功,回调ID:{}", new Date().toString(), correlationData.getId());
            } else {
                log.info("[*] [{}] 确认回调失败,回调ID:{},失败原因:{}", new Date().toString(), correlationData.getId(), cause);
            }
        }
    
        
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            int code = returnedMessage.getReplyCode();
            String text = returnedMessage.getReplyText();
            byte[] message = returnedMessage.getMessage().getBody();
            String exchange = returnedMessage.getExchange();
            String routingKey = returnedMessage.getRoutingKey();
            log.info("[*] [{}] 消息未送达队列回调,错误码code:{}。原因:{}。消息:{}。交换机:{}。路由key:{}",
                    new Date().toString(), code, text, new String(message), exchange, routingKey);
        }
    }
    
    
  • My/confirm/iConsumer01
    package cn.cnyasin.rabbit.consumer;
    
    import cn.cnyasin.rabbit.config.My/confirm/iConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    
    @Slf4j
    @Component
    public class My/confirm/iConsumer01 {
        @RabbitListener(queues = My/confirm/iConfig.QUEUE_/confirm/i)
        public void receiveMessage(String message) {
            log.info("[*] [{}] 发布确认高级-消费者01 接收到消息:{}", new Date().toString(), message);
        }
    
    }
    
    
  • My/confirm/iController
    package cn.cnyasin.rabbit.controller;
    
    import cn.cnyasin.rabbit.config.My/confirm/iConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    
    @Slf4j
    @RestController
    @RequestMapping("//confirm/i")
    public class MyConfirmController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/send/{msg}")
        public String send(@PathVariable String msg) throws Exception {
            log.info("[*] [{}] 准备发送消息:{}", new Date().toString(), msg);
    
            CorrelationData correlationData = new CorrelationData("1");
            rabbitTemplate.convertAndSend(My/confirm/iConfig.EXCHANGE_/confirm/i, My/confirm/iConfig.ROUTING_/confirm/i, msg + 1, correlationData);
    
            CorrelationData correlationData2 = new CorrelationData("2");
            rabbitTemplate.convertAndSend(My/confirm/iConfig.EXCHANGE_ConFIRM + 2, My/confirm/iConfig.ROUTING_/confirm/i, msg + 2, correlationData2);
    
            CorrelationData correlationData3 = new CorrelationData("3");
            rabbitTemplate.convertAndSend(My/confirm/iConfig.EXCHANGE_/confirm/i, My/confirm/iConfig.ROUTING_ConFIRM + 3, msg + 3, correlationData3);
    
            return "OK";
        }
    }
    
    
备注
  • 该教程部分内容收集自网络,感谢原作者。
附录

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700091.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存