如上图所示:
一、publisher /confirm/iCallBack确认模式
springboot开启rabbitmq可靠抵达 ——/confirm/iCallBack
spring: rabbitmq: publisher-/confirm/i-type: correlated
当我们的publisher 到达 broker (服务器时候) ,返回/confirm/iCallback,当消息没有抵达broker的时候返回true,并会给出失败原因。
public class MyRabbitConfig { @Autowired RabbitTemplate rabbitTemplate; @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } @PostConstruct public void initRabbitTemplate(){ rabbitTemplate. setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { } }); } }
二、publisher returnCallBack 确认模式
springboot开启rabbitmq可靠抵达 —— returnCallBack
当我们开启publisher-returns 时候,将 spring.rabbitmq.template.mandatory 开启
作用:只要消息抵达队列 ,以异步方式优先回调 rerun/confirm/i
spring: rabbitmq: publisher-returns: true #只要消息抵达队列 ,以异步方式优先回调 rerunConfirm template: mandatory: true
设置broker 抵达 queue 时候的returnCallBack回调
只要消息没有投递给指定的队列,就触发这个失败回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { } });
贴上RetrunedMessage实体返回的内容(此处为rabbitMQ自带实体,非自建)
public class ReturnedMessage { //投递失败的消息详细信息 private final Message message; //回复的状态码 private final int replyCode; //回复的文本内容 private final String replyText; //当时这个消息发给哪个交换机 private final String exchange; //当时这个消息用哪个路由键 private final String routingKey; public ReturnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { this.message = message; this.replyCode = replyCode; this.replyText = replyText; this.exchange = exchange; this.routingKey = routingKey; } public Message getMessage() { return this.message; } public int getReplyCode() { return this.replyCode; } public String getReplyText() { return this.replyText; } public String getExchange() { return this.exchange; } public String getRoutingKey() { return this.routingKey; } public String toString() { return "ReturnedMessage [message=" + this.message + ", replyCode=" + this.replyCode + ", replyText=" + this.replyText + ", exchange=" + this.exchange + ", routingKey=" + this.routingKey + "]"; } }
路由键绑定为 :
@Test void bindingQueueExchange(){ Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE,exchange,"chendazui.#",null); amqpAdmin.declareBinding(binding); log.info("[{}]绑定成功","chendazui-binding"); }
当我们将路由键修改后:
@Test void sendMessage(){ MessageUtil messageUtil = new MessageUtil(); messageUtil.setCode("1"); messageUtil.setMsg("我到了"); messageUtil.setOrder("陈大嘴的订单"); messageUtil.setUser("陈大嘴"); messageUtil.setDateTime(new Date()); rabbitTemplate.convertAndSend(exchange,"chendazui1.#",messageUtil); log.info("[{}]消息已经发出",messageUtil); }
这个时候我们测试结果如下:
当前指向的交换机==>chendazui-exchange投递失败消息详情(Body:'{"code":"1","msg":"我到了","order":"陈大嘴的订单","user":"陈大嘴","dateTime":1634780951481}' MessageProperties [headers={__TypeId__=com.example.demo.util.MessageUtil}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])回复的文本内容NO_ROUTE路由键chendazui1.#回复的状态码=>312
这时候我们returnCallBack机制捕捉到失败消息,消息未抵达队列queue。
相关代码地址:
RabbitMQ Demo: rabbitmq 代码示例
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)