RabbitMQ 异常处理篇04

RabbitMQ 异常处理篇04,第1张

RabbitMQ 异常处理篇04 功能

1.消息重试机制
2.死信队列
3.延时队列

1 github:源码地址 2 rabbitmq03 子工程


    4.0.0

    
        com.yzm
        rabbitmq
        0.0.1-SNAPSHOT
        ../pom.xml 
    

    rabbitmq03
    0.0.1-SNAPSHOT
    jar
    rabbitmq03
    Demo project for Spring Boot

    

    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    



项目结构

application.yml

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: auto
        retry:
          enabled: true
          max-attempts: 5 # 重试次数
          max-interval: 10000   # 重试最大间隔时间
          initial-interval: 2000  # 重试初始间隔时间
          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
3 案例重现

首先来看一个案例:
生产者,生产一条消息

package com.yzm.rabbitmq03.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class RetrySenderService {

    private final RabbitTemplate rabbitTemplate;
    private int count = 1;

    public RetrySenderService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Scheduled(fixedDelay = 1000, initialDelay = 10000)
    public void send1() {
        if (count <= 1) {
            String message = "Hello.........." + count++;
            rabbitTemplate.convertAndSend("retry.exchange", "topic.yzm.retry", message);
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        }
    }

}

消费者在处理消息时发生异常情况
这里创建队列和交换机是通过注解实现的,跟前面的篇章创建队列和交换机是等同的

package com.yzm.rabbitmq03.service;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Slf4j
@Component
public class RetryReceiverService {
	
	private int count = 1;
	
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "retry-a"),
            exchange = @Exchange(value = "retry.exchange", type = ExchangeTypes.TOPIC),
            key = {"topic.*.retry"}
    ))
    public void retry(Message message) {
        log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
        log.info("当前执行次数:{}", count++);
        // 制造异常
        int i = 1 / 0;
        log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
    }
}

开启定时器

package com.yzm.rabbitmq03.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class RetryRabbitConfig {
    
}

此时的yml文件是:自动确认模式

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: auto

启动项目,无限循环报错,停止项目后,这条消息重回Ready状态

在消息确认之前,发送异常并且没有捕获,导致确认失败,消息重复消费
针对这种异常情况,我们逐步优化,最后解决掉它。

4 消息重试机制

启动重试机制,只需要在yml文件配置即可

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: auto
        retry:
          enabled: true
          max-attempts: 5 # 重试次数
          max-interval: 10000   # 重试最大间隔时间
          initial-interval: 2000  # 重试初始间隔时间
          multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

服务器上删除retry-a(目的删除前面的那条异常消息),重启会重新创建的

重启项目,运行结果如下:

首先重试次数是5次(包括自身初次消费的那次)没问题,
第一个执行时间:11:06:43,第二次执行时间:11:06:45 ,初始间隔2秒没问题
第二次执行时间:11:06:45,第三次执行时间:11:06:49,时间间隔 = 初始间隔(2秒) * 间隔因子(2) = 4秒没问题
以此类推… 2s、4s、8s、16s(最后一次16s,但由于设置的最大间隔10s,所以16s就变成了10s)
最后查看retry-a队列,没有消息了,也就是说重试5次之后就会移除该消息
移除 *** 作是由日志中的这个类处理:RejectAndDontRequeueRecoverer(拒绝和不要重新排队)

原因是因为在构建SimpleRabbitListenerContainerFactoryConfigurer类时使用了MessageRecoverer接口,这个接口有一个cover方法,用来实现重试完成之后对消息的处理,源码如下:

RejectAndDontRequeueRecoverer 是 接口 MessageRecoverer 的一个实现类
同时 MessageRecoverer 还有另外两个实现类,分别是RepublishMessageRecoverer和ImmediateRequeueMessageRecoverer,顾名思义就是重新发布消息和立即重新返回队列

  • 先来使用下 ImmediateRequeueMessageRecoverer 重新排队
    在RetryRabbitConfig中配置
package com.yzm.rabbitmq03.config;

import org.springframework.amqp.rabbit.retry.ImmediateRequeueMessageRecoverer;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class RetryRabbitConfig {
    
    @Bean
    public MessageRecoverer messageRecoverer() {
        return new ImmediateRequeueMessageRecoverer();
    }
    
}

重启项目,运行结果如下:

 [ 生产者 ] Sent ==> 'Hello..........1'
2021-11-05 11:31:54.394  INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService       :  [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1
2021-11-05 11:31:54.394  INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService       : 当前执行次数:1
2021-11-05 11:31:56.398  INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService       :  [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1
2021-11-05 11:31:56.398  INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService       : 当前执行次数:2
2021-11-05 11:32:00.399  INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService       :  [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1
2021-11-05 11:32:00.399  INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService       : 当前执行次数:3
2021-11-05 11:32:08.401  INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService       :  [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1
2021-11-05 11:32:08.401  INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService       : 当前执行次数:4
2021-11-05 11:32:18.403  INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService       :  [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1
2021-11-05 11:32:18.403  INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService       : 当前执行次数:5
2021-11-05 11:32:18.411  WARN 11836 --- [ntContainer#0-1] s.a.r.r.ImmediateRequeueMessageRecoverer : Retries exhausted for message (Body:'Hello..........1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=retry.exchange, receivedRoutingKey=topic.yzm.retry, deliveryTag=1, consumerTag=amq.ctag--Er498COrXqDSbno4cCIyw, consumerQueue=retry-a]); requeuing...

....报错信息

2021-11-05 11:32:18.424  INFO 11836 --- [ntContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@4e424582: tags=[[amq.ctag--Er498COrXqDSbno4cCIyw]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@66ec4409 Shared Rabbit Connection: SimpleConnection@310a7859 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54707], acknowledgeMode=AUTO local queue size=1
2021-11-05 11:32:18.436  INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService       :  [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1
2021-11-05 11:32:18.436  INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService       : 当前执行次数:6
2021-11-05 11:32:20.437  INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService       :  [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1
2021-11-05 11:32:20.437  INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService       : 当前执行次数:7
2021-11-05 11:32:24.439  INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService       :  [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1
2021-11-05 11:32:24.439  INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService       : 当前执行次数:8
2021-11-05 11:32:32.441  INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService       :  [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1
2021-11-05 11:32:32.441  INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService       : 当前执行次数:9
2021-11-05 11:32:42.442  INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService       :  [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1
2021-11-05 11:32:42.442  INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService       : 当前执行次数:10
2021-11-05 11:32:42.444  WARN 11836 --- [ntContainer#0-2] s.a.r.r.ImmediateRequeueMessageRecoverer : Retries exhausted for message (Body:'Hello..........1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=retry.exchange, receivedRoutingKey=topic.yzm.retry, deliveryTag=1, consumerTag=amq.ctag-pyjXZ9-uJPmTevevHKq5VQ, consumerQueue=retry-a]); requeuing...

可以看出:重试5次之后,返回队列,然后再重试5次,周而复始直到不抛出异常为止,这样还是会影响后续的消息消费。

  • 接着使用 RepublishMessageRecoverer 全局异常处理
    在RetryRabbitConfig中配置
package com.yzm.rabbitmq03.config;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.ImmediateRequeueMessageRecoverer;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class RetryRabbitConfig {

    
//    @Bean
    public MessageRecoverer messageRecoverer() {
        return new ImmediateRequeueMessageRecoverer();
    }

    public static final String RETRY_QUEUE = "retry-error";
    public static final String RETRY_EXCHANGE = "retry.exchange";
    public static final String RETRY_KEY = "retry-key";

    
    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, RETRY_KEY);
    }
}

创建异常消费者

package com.yzm.rabbitmq03.service;

import com.yzm.rabbitmq03.config.RetryRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RetryReceiverService {

    private int count = 1;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "retry-a"),
            exchange = @Exchange(value = "retry.exchange", type = ExchangeTypes.TOPIC),
            key = {"topic.*.retry"}
    ))
    public void retry(Message message) {
        log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
        log.info("当前执行次数:{}", count++);
        // 制造异常
        int i = 1 / 0;
        log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
    }

    // 处理异常消费者
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = RetryRabbitConfig.RETRY_QUEUE),
            exchange = @Exchange(value = RetryRabbitConfig.RETRY_EXCHANGE, type = ExchangeTypes.TOPIC),
            key = {RetryRabbitConfig.RETRY_KEY}
    ))
    public void error(Message message) {
        log.info(" [ 消费者@异常号 ] 接收到消息 ==> '" + new String(message.getBody()));
    }

}

删除retry-a队列,移除前面的异常消息,重启项目,运行结果如下:

重试5次之后,将消息 Republishing failed message to exchange ‘retry.exchange’ with routing key retry-key 转发到异常队列,由异常消费者消费了

5 死信队列

死信队列:创建一个普通队列时,通过添加配置绑定另一个交换机(死信交换机),在普通队列发生异常时,消息就通过死信交换机转发到绑定它的队列里,这个绑定死信交换机的队列就是死信队列

创建死信队列、死信交换机
创建正常队列,并添加死信交换机以及死信路由键的配置

package com.yzm.rabbitmq03.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableScheduling
public class DeadLetterRabbitConfig {

    public static final String DL_QUEUE = "dl-queue";
    public static final String DL_EXCHANGE = "dl.exchange";
    public static final String DL_KEY = "dl.key";

    // 死信队列
    @Bean
    public Queue dlQueue() {
        return QueueBuilder.durable(DL_QUEUE).build();
    }

    // 死信交换机
    @Bean
    public DirectExchange dlExchange() {
        return ExchangeBuilder.directExchange(DL_EXCHANGE).build();
    }

    // 绑定死信队列到死信交换机
    @Bean
    public Binding dlBinding() {
        return BindingBuilder.bind(dlQueue()).to(dlExchange()).with(DL_KEY);
    }

    public static final String NORMAL_QUEUE = "normal-queue";
    public static final String NORMAL_EXCHANGE = "normal.exchange";
    public static final String NORMAL_KEY = "normal.key";

    // 正常队列,添加配置
    @Bean
    public Queue queue() {
        Map params = new HashMap<>();
        params.put("x-dead-letter-exchange", DL_EXCHANGE);//声明当前队列绑定的死信交换机
        params.put("x-dead-letter-routing-key", DL_KEY);//声明当前队列的死信路由键
        return QueueBuilder.durable(NORMAL_QUEUE).withArguments(params).build();
    }

    @Bean
    public DirectExchange exchange() {
        return ExchangeBuilder.directExchange(NORMAL_EXCHANGE).build();
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(NORMAL_KEY);
    }
}

生产者

package com.yzm.rabbitmq03.service;

import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class DeadLetterSenderService {

    private final RabbitTemplate rabbitTemplate;
    private int count = 1;

    public DeadLetterSenderService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Scheduled(fixedDelay = 1000, initialDelay = 10000)
    public void send() {
        if (count <= 1) {
            String message = "Hello.........." + count++;
            rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY, message);
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        }
    }
}

消费者

package com.yzm.rabbitmq03.service;

import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DeadLetterReceiverService {

    private int count = 1;

    @RabbitListener(queues = DeadLetterRabbitConfig.NORMAL_QUEUE)
    public void normal(Message message) {
        log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
        log.info("当前执行次数:{}", count++);
        int i = 1 / 0;
        log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
    }

    @RabbitListener(queues = DeadLetterRabbitConfig.DL_QUEUE)
    public void dl(Message message) {
        log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
    }
}

关闭重试生产者定时器功能

关闭 RepublishMessageRecoverer 全局异常处理功能,不然启动后的异常就会被全局异常队列处理,而不是死信队列处理,这种全局异常队列优先于死信队列;
恢复默认RejectAndDontRequeueRecoverer 拒绝并且不重新排队

重启项目,运行结果如下:

服务器上normal-queue有DLX、DLK标识,说明该队列绑定了死信交换机和死信路由键;
重试5次之后,就将消息转发给死信队列
死信队列是针对某个普通队列发生异常情况进行处理,而RepublishMessageRecoverer的异常处理是对所有普通队列发生异常进行处理,并且优先于死信队列

6 延时队列

延时队列:一般来说,发布消息之后,会被交换机接收并转发给对应的队列,队列分配给消费者处理,这个过程很快秒级处理;但有时候我们希望发布完消息后,在指定的时间之后再去处理消息,这个时候就需要使用到延时队列;
虽说是延时队列,但其实也只是对死信队列的一种扩展应用罢了。
首先还是得创建普通队列,添加参数绑定死信队列同时设置消息过期时间,生产者发布消息到普通队列,而普通队列没有任何消费者来消费,那么消息在普通队列中存活到设定过期时间就被转发到死信队列,由死信队列的消费者消费消息,以此实现延时功能。

实现延时队列

package com.yzm.rabbitmq03.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableScheduling
public class DeadLetterRabbitConfig {

	// 死信队列
    ...

    public static final String NORMAL_QUEUE = "normal-queue";
    public static final String NORMAL_QUEUE2 = "normal-queue2";
    public static final String NORMAL_EXCHANGE = "normal.exchange";
    public static final String NORMAL_KEY = "normal.key";
    public static final String NORMAL_KEY2 = "normal.key2";

    // 正常队列,添加配置
    @Bean
    public Queue queue() {
        Map params = new HashMap<>();
        params.put("x-dead-letter-exchange", DL_EXCHANGE);//声明当前队列绑定的死信交换机
        params.put("x-dead-letter-routing-key", DL_KEY);//声明当前队列的死信路由键
        return QueueBuilder.durable(NORMAL_QUEUE).withArguments(params).build();
    }

    @Bean
    public DirectExchange exchange() {
        return ExchangeBuilder.directExchange(NORMAL_EXCHANGE).build();
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(NORMAL_KEY);
    }

    // 延时队列
    @Bean
    public Queue queue2() {
        Map params = new HashMap<>();
        params.put("x-dead-letter-exchange", DL_EXCHANGE);//声明当前队列绑定的死信交换机
        params.put("x-dead-letter-routing-key", DL_KEY);//声明当前队列的死信路由键
        // 添加消息过期时间,10秒内还没处理完,就转发给死信队列
        params.put("x-message-ttl", 10000);
        return QueueBuilder.durable(NORMAL_QUEUE2).withArguments(params).build();
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(exchange()).with(NORMAL_KEY2);
    }
}

生产者,指定路由键是 NORMAL_KEY2

package com.yzm.rabbitmq03.service;

import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DeadLetterSenderService {

    private final RabbitTemplate rabbitTemplate;
    private int count = 1;

    public DeadLetterSenderService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

//    @Scheduled(fixedDelay = 1000, initialDelay = 10000)
    public void send() {
        if (count <= 1) {
            String message = "Hello.........." + count++;
            log.info(" [ 生产者 ] Sent ==> '" + message + "'");
            rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY, message);
        }
    }

    @Scheduled(fixedDelay = 1000, initialDelay = 10000)
    public void send2() {
        if (count <= 1) {
            String message = "Hello.........." + count++;
            log.info(" [ 生产者 ] Sent ==> '" + message + "'");
            rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY2, message);
        }
    }
}

消费者,不需要对普通队列NORMAL_QUEUE2进行处理

package com.yzm.rabbitmq03.service;

import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DeadLetterReceiverService {

    private int count = 1;

    @RabbitListener(queues = DeadLetterRabbitConfig.NORMAL_QUEUE)
    public void normal(Message message)  {
        log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
        log.info("当前执行次数:{}", count++);
        int i = 1 / 0;
        log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
    }

    @RabbitListener(queues = DeadLetterRabbitConfig.DL_QUEUE)
    public void dl(Message message) {
        log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
    }
}

启动项目,运行结果如下:

设置过期时间10秒,没有任何消费者对normal-queue2队列处理,10秒过后,消息转发到死信队列处理

现在这种过期时间设定就针对整个normal-queue2队列的所有消息的,还可以更细粒度的针对每一条消息设置过期时间。
实现:使用normal-queue作为普通队列,之前已经绑定死信队列了
生产者,向normal-queue发布消息

package com.yzm.rabbitmq03.service;

import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Slf4j
@Component
public class DeadLetterSenderService {

    private final RabbitTemplate rabbitTemplate;
    private int count = 1;

    public DeadLetterSenderService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

//    @Scheduled(fixedDelay = 1000, initialDelay = 10000)
    public void send() {
        if (count <= 1) {
            String message = "Hello.........." + count++;
            log.info(" [ 生产者 ] Sent ==> '" + message + "'");
            rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY, message);
        }
    }

    //    @Scheduled(fixedDelay = 1000, initialDelay = 10000)
    public void send2() {
        if (count <= 1) {
            String message = "Hello.........." + count++;
            log.info(" [ 生产者 ] Sent ==> '" + message + "'");
            rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY2, message);
        }
    }

    @Scheduled(fixedDelay = 1000, initialDelay = 10000)
    public void send3() {
        if (count <= 1) {
            String s = "Hello.........." + count++;
            log.info(" [ 生产者 ] Sent ==> '" + s + "'");

            //设置过期时间
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setExpiration("12000");
            Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties);
            rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY, message);
        }
    }
}

消费者,注释掉normal-queue队列的消费者

package com.yzm.rabbitmq03.service;

import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DeadLetterReceiverService {

    private int count = 1;

//    @RabbitListener(queues = DeadLetterRabbitConfig.NORMAL_QUEUE)
    public void normal(Message message)  {
        log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody()));
        log.info("当前执行次数:{}", count++);
        int i = 1 / 0;
        log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody()));
    }

    @RabbitListener(queues = DeadLetterRabbitConfig.DL_QUEUE)
    public void dl(Message message) {
        log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody()));
    }
}

启动项目

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5116467.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存