- RabbitMQ-Java-07-发布确认高级
- 本案例是一个Maven+SpringBoot项目
- 假设你已经实现了上一节延迟队列
- 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/
- 发布确认高级是为了解决什么问题呢?
- 比如RabbitMq服务器宕机或重启导致数据丢失问题
- 主要 *** 作有哪些呢?
- 准备一个Maven+SpringBoot项目,添加必要的Maven依赖以及RabbitMQ配置项,具体可以参考上一节延迟队列
- 在上一节添加了RabbitMQ配置后还需要额外增加两项配置
- 设置发布确认回调类型:发送消息到交换机会回调(成功失败都回调)
spring.rabbitmq.publisher-/confirm/i-type=correlated
- none:禁用(默认值)
- correlated:发送消息到交换机会回调(成功失败都回调)
- simple:在作用域中使用,相当于同步确认机制,有关闭channel风险
- 开启发布返回
spring.rabbitmq.publisher-returns=true
- 设置发布确认回调类型:发送消息到交换机会回调(成功失败都回调)
- 新建发布确认高级-配置类
- 新建发布确认高级-发布确认回调组件
- @Component注解注册组件,implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnsCallback
- @Autowired注解自动装配RabbitTemplate
- @PostConstruct注解把当前类自动注入到全局RabbitTemplate中
- 注意:@PostConstruct必须在@Autowired之后
- 重写/confirm/i方法:交换机确认回调(成功失败都回调)
- 重写returnedMessage方法:队列确认回调(仅送达失败时回调)
- 新建发布确认高级-消费者01
- 发布确认高级-控制器(生产者)
- application.properties
spring.rabbitmq.host=192.168.3.202 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.publisher-/confirm/i-type=correlated spring.rabbitmq.publisher-returns=true
- My/confirm/iConfig
package cn.cnyasin.rabbit.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MyConfirmConfig { // 交换机 public static final String EXCHANGE_ConFIRM = "exchange_/confirm/i"; // 队列 public static final String QUEUE_ConFIRM = "queue_/confirm/i"; // 路由key public static final String ROUTING_ConFIRM = "routing_/confirm/i"; // 声明交换机 @Bean public DirectExchange exchangeConfirm() { return ExchangeBuilder.directExchange(EXCHANGE_/confirm/i).build(); } // 声明队列 @Bean public Queue queueConfirm() { return QueueBuilder.durable(QUEUE_/confirm/i).build(); } // 绑定队列交换机路由key @Bean public Binding queueConfirmBindingExchange/confirm/i( @Qualifier("queue/confirm/i") Queue queue/confirm/i, @Qualifier("exchange/confirm/i") DirectExchange exchangeConfirm ) { return BindingBuilder.bind(queue/confirm/i).to(exchange/confirm/i).with(ROUTING_/confirm/i); } }
- My/confirm/iCallback
package cn.cnyasin.rabbit.component; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Date; @Slf4j @Component public class MyConfirmCallback implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("[*] [{}] 确认回调成功,回调ID:{}", new Date().toString(), correlationData.getId()); } else { log.info("[*] [{}] 确认回调失败,回调ID:{},失败原因:{}", new Date().toString(), correlationData.getId(), cause); } } @Override public void returnedMessage(ReturnedMessage returnedMessage) { int code = returnedMessage.getReplyCode(); String text = returnedMessage.getReplyText(); byte[] message = returnedMessage.getMessage().getBody(); String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); log.info("[*] [{}] 消息未送达队列回调,错误码code:{}。原因:{}。消息:{}。交换机:{}。路由key:{}", new Date().toString(), code, text, new String(message), exchange, routingKey); } }
- My/confirm/iConsumer01
package cn.cnyasin.rabbit.consumer; import cn.cnyasin.rabbit.config.My/confirm/iConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; @Slf4j @Component public class My/confirm/iConsumer01 { @RabbitListener(queues = My/confirm/iConfig.QUEUE_/confirm/i) public void receiveMessage(String message) { log.info("[*] [{}] 发布确认高级-消费者01 接收到消息:{}", new Date().toString(), message); } }
- My/confirm/iController
package cn.cnyasin.rabbit.controller; import cn.cnyasin.rabbit.config.My/confirm/iConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; @Slf4j @RestController @RequestMapping("//confirm/i") public class MyConfirmController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/send/{msg}") public String send(@PathVariable String msg) throws Exception { log.info("[*] [{}] 准备发送消息:{}", new Date().toString(), msg); CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(My/confirm/iConfig.EXCHANGE_/confirm/i, My/confirm/iConfig.ROUTING_/confirm/i, msg + 1, correlationData); CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(My/confirm/iConfig.EXCHANGE_ConFIRM + 2, My/confirm/iConfig.ROUTING_/confirm/i, msg + 2, correlationData2); CorrelationData correlationData3 = new CorrelationData("3"); rabbitTemplate.convertAndSend(My/confirm/iConfig.EXCHANGE_/confirm/i, My/confirm/iConfig.ROUTING_ConFIRM + 3, msg + 3, correlationData3); return "OK"; } }
- 该教程部分内容收集自网络,感谢原作者。
- 无
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)