配置:
server: port: 8080 spring: rabbitmq: # 单机连接配置host port即可 # host: 192.168.50.134 # port: 5672 # 集群连接信息 addresses: 192.168.50.134:5673,192.168.50.134:5672,192.168.50.134:5674 virtual-host: /dev username: tech password: tech # 开启发送确认机制,感知消息是否到达交换机 publisher-/confirm/i-type: correlated # 开启消息从交换机到队列的确认机制,感知消息是否到达队列 publisher-returns: true # true表示交换机转发消息到队列失败,将消息返给发送者 template: mandatory: true #开启消费者手动确认 listener: simple: acknowledge-mode: manual retry: enabled: true max-attempts: 3 #重试次数含第1次调用,也就是第一次调用失败再调用两次 max-interval: 100000 # 重试最大间隔时间 initial-interval: 1000 # 重试初始间隔时间 multiplier: 3 # 间隔时间乘子,上次间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
该配置在发生异常时,会执行3次,第1次调用失败,再重试2次重试间隔为 1s 3s ,然后抛出异常
消费端代码
package com.tech.rabbitmq.spring; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Slf4j @Component @RabbitListener(queues = "order_queue") public class OrderMQListener { @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); // System.out.println("msgTag="+deliveryTag); // System.out.println("message="+message.toString()); log.info("body="+body); System.out.println(1/0); //进行手动确认 //消息投递序号 是否批量 channel.basicAck(deliveryTag,false); // if(body.contains("2")){ // channel.basicAck(deliveryTag,false); // } //拒收消息 //消息投递序号 是否批量 是否将消息回退到队列 // channel.basicNack(deliveryTag,false,true); //拒收消息 (不支持批量拒收) //消息投递序号 是否将消息回退到队列 // channel.basicReject(deliveryTag,true); System.out.println("*****************************"); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)