SpringBoot : V2.5.5
RabbitMQ: 3.9.9
延时队列的典型应用场景,例如购买火车票,下单占座后20分钟内未支付的订单会被强制取消,避免在余票紧张的情况下,车票一直被占用,其他人无法购买。还有电商平台,客户下单后,订单进入购物车,如果购物车内的订单超过特定时间未支付,则会失效,回滚库存。
RabbitMQ实现延时队列利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。
Time To Live(TTL) :
TTL 指的是消息的存活时间,RabbitMQ可以通过x-message-ttl参数来设置指定Queue上消息的存活时间,它的值是一个非负整数,单位为微秒。
RabbitMQ 可以从两种维度设置消息过期时间,分别为队列和消息本身。
设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。
注1:如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。
注2:队列过期后,队列内的所有消息全部变为死信。
注3:消息过期后,只有消息位于队列的顶端,才会判断其是否过期,过期的消息变为死信Dead Letter
Dead Letter Exchanges(DLX)
DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQ的 Queue(队列)可以配置两个参数x-dead-letter-exchange 和 x-dead-letter-routing-key,一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。
1.消息生产方的配置类
@Configuration public class RabbitMQConf { //正常交换机的名称 public static final String ITEM_TOPIC_EXCHANGE = "springboot_item_topic_exchange"; //死信交换机的名称 public static final String DLX_TOPIC_EXCHANGE = "dlx_topic_exchange"; //正常队列的名称 public static final String ITEM_QUEUE = "springboot_item_queue"; //死信队列的名称 public static final String DLX_QUEUE = "dlx_queue"; //构建正常交换机 @Bean("itemTopicExchange") public Exchange topicExchange(){ return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build(); } //构建死信交换机 @Bean("dlxExchange") public Exchange dlxExchange(){ return ExchangeBuilder.topicExchange(DLX_TOPIC_EXCHANGE).durable(true).build(); } //构建正常队列 @Bean("itemQueue") public Queue itemQueue(){ return QueueBuilder .durable(ITEM_QUEUE) .deadLetterExchange(DLX_TOPIC_EXCHANGE)//设置死信交换机 .deadLetterRoutingKey("dlx.hello")//设置路由规则 .ttl(10000)//设置队列TTL=10s .maxLength(10)//设置队列队列最大容量=10 .build(); } //构建死信队列 @Bean("dlxQueue") public Queue dlxQueue(){ return QueueBuilder.durable(DLX_QUEUE).build(); } //绑定正常队列和正常交换机 @Bean public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue, @Qualifier("itemTopicExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs(); } //绑定死信队列和死信交换机 @Bean public Binding dlxQueueExchange(@Qualifier("dlxQueue") Queue queue, @Qualifier("dlxExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs(); } }
- 正常消息消费方
@Component public class DlxListener implements ChannelAwareMessageListener { @RabbitListener(queues = "springboot_item_queue") @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("接收到的消息是:"+new String(message.getBody())); try { System.out.println("开始处理业务逻辑………………"); //int i = 3/0;//业务处理过程中发生异常 channel.basicAck(deliveryTag,true); } catch (Exception e) { e.printStackTrace(); System.out.println("出现异常,拒绝接收"); //拒绝签收,消息不重回队列,requeue设置为false channel.basicNack(deliveryTag,true,false); } } }
- 死信消息消费方
@Component public class DLListener implements ChannelAwareMessageListener { @RabbitListener(queues = "dlx_queue") @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("接收到的消息是:"+new String(message.getBody())); try { System.out.println("开始处理业务逻辑………………"); System.out.println("根据订单id查询其状态..."); System.out.println("判断状态是否为支付成功"); System.out.println("取消订单,回滚库存...."); channel.basicAck(deliveryTag,true); } catch (Exception e) { e.printStackTrace(); channel.basicNack(deliveryTag,true,true); } } }延时任务运行结果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)