1 github:源码地址 2 rabbitmq03 子工程
4.0.0 rabbitmq03 com.yzm rabbitmq0.0.1-SNAPSHOT ../pom.xml 0.0.1-SNAPSHOT jar rabbitmq03 Demo project for Spring Boot org.springframework.boot spring-boot-maven-plugin
项目结构
application.yml
spring: rabbitmq: port: 5672 host: 127.0.0.1 username: guest password: guest listener: simple: acknowledge-mode: auto retry: enabled: true max-attempts: 5 # 重试次数 max-interval: 10000 # 重试最大间隔时间 initial-interval: 2000 # 重试初始间隔时间 multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间3 案例重现
首先来看一个案例:
生产者,生产一条消息
package com.yzm.rabbitmq03.service; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class RetrySenderService { private final RabbitTemplate rabbitTemplate; private int count = 1; public RetrySenderService(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @Scheduled(fixedDelay = 1000, initialDelay = 10000) public void send1() { if (count <= 1) { String message = "Hello.........." + count++; rabbitTemplate.convertAndSend("retry.exchange", "topic.yzm.retry", message); System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); } } }
消费者在处理消息时发生异常情况
这里创建队列和交换机是通过注解实现的,跟前面的篇章创建队列和交换机是等同的
package com.yzm.rabbitmq03.service; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; @Slf4j @Component public class RetryReceiverService { private int count = 1; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "retry-a"), exchange = @Exchange(value = "retry.exchange", type = ExchangeTypes.TOPIC), key = {"topic.*.retry"} )) public void retry(Message message) { log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody())); log.info("当前执行次数:{}", count++); // 制造异常 int i = 1 / 0; log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody())); } }
开启定时器
package com.yzm.rabbitmq03.config; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling public class RetryRabbitConfig { }
此时的yml文件是:自动确认模式
spring: rabbitmq: port: 5672 host: 127.0.0.1 username: guest password: guest listener: simple: acknowledge-mode: auto
4 消息重试机制启动项目,无限循环报错,停止项目后,这条消息重回Ready状态
在消息确认之前,发送异常并且没有捕获,导致确认失败,消息重复消费
针对这种异常情况,我们逐步优化,最后解决掉它。
启动重试机制,只需要在yml文件配置即可
spring: rabbitmq: port: 5672 host: 127.0.0.1 username: guest password: guest listener: simple: acknowledge-mode: auto retry: enabled: true max-attempts: 5 # 重试次数 max-interval: 10000 # 重试最大间隔时间 initial-interval: 2000 # 重试初始间隔时间 multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
服务器上删除retry-a(目的删除前面的那条异常消息),重启会重新创建的
重启项目,运行结果如下:
首先重试次数是5次(包括自身初次消费的那次)没问题,
第一个执行时间:11:06:43,第二次执行时间:11:06:45 ,初始间隔2秒没问题
第二次执行时间:11:06:45,第三次执行时间:11:06:49,时间间隔 = 初始间隔(2秒) * 间隔因子(2) = 4秒没问题
以此类推… 2s、4s、8s、16s(最后一次16s,但由于设置的最大间隔10s,所以16s就变成了10s)
最后查看retry-a队列,没有消息了,也就是说重试5次之后就会移除该消息
移除 *** 作是由日志中的这个类处理:RejectAndDontRequeueRecoverer(拒绝和不要重新排队)
原因是因为在构建SimpleRabbitListenerContainerFactoryConfigurer类时使用了MessageRecoverer接口,这个接口有一个cover方法,用来实现重试完成之后对消息的处理,源码如下:
RejectAndDontRequeueRecoverer 是 接口 MessageRecoverer 的一个实现类
同时 MessageRecoverer 还有另外两个实现类,分别是RepublishMessageRecoverer和ImmediateRequeueMessageRecoverer,顾名思义就是重新发布消息和立即重新返回队列
- 先来使用下 ImmediateRequeueMessageRecoverer 重新排队
在RetryRabbitConfig中配置
package com.yzm.rabbitmq03.config; import org.springframework.amqp.rabbit.retry.ImmediateRequeueMessageRecoverer; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling public class RetryRabbitConfig { @Bean public MessageRecoverer messageRecoverer() { return new ImmediateRequeueMessageRecoverer(); } }
重启项目,运行结果如下:
[ 生产者 ] Sent ==> 'Hello..........1' 2021-11-05 11:31:54.394 INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService : [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1 2021-11-05 11:31:54.394 INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService : 当前执行次数:1 2021-11-05 11:31:56.398 INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService : [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1 2021-11-05 11:31:56.398 INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService : 当前执行次数:2 2021-11-05 11:32:00.399 INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService : [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1 2021-11-05 11:32:00.399 INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService : 当前执行次数:3 2021-11-05 11:32:08.401 INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService : [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1 2021-11-05 11:32:08.401 INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService : 当前执行次数:4 2021-11-05 11:32:18.403 INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService : [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1 2021-11-05 11:32:18.403 INFO 11836 --- [ntContainer#0-1] c.y.r.service.RetryReceiverService : 当前执行次数:5 2021-11-05 11:32:18.411 WARN 11836 --- [ntContainer#0-1] s.a.r.r.ImmediateRequeueMessageRecoverer : Retries exhausted for message (Body:'Hello..........1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=retry.exchange, receivedRoutingKey=topic.yzm.retry, deliveryTag=1, consumerTag=amq.ctag--Er498COrXqDSbno4cCIyw, consumerQueue=retry-a]); requeuing... ....报错信息 2021-11-05 11:32:18.424 INFO 11836 --- [ntContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@4e424582: tags=[[amq.ctag--Er498COrXqDSbno4cCIyw]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@66ec4409 Shared Rabbit Connection: SimpleConnection@310a7859 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 54707], acknowledgeMode=AUTO local queue size=1 2021-11-05 11:32:18.436 INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService : [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1 2021-11-05 11:32:18.436 INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService : 当前执行次数:6 2021-11-05 11:32:20.437 INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService : [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1 2021-11-05 11:32:20.437 INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService : 当前执行次数:7 2021-11-05 11:32:24.439 INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService : [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1 2021-11-05 11:32:24.439 INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService : 当前执行次数:8 2021-11-05 11:32:32.441 INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService : [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1 2021-11-05 11:32:32.441 INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService : 当前执行次数:9 2021-11-05 11:32:42.442 INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService : [ 消费者@A号 ] 接收到消息 ==> 'Hello..........1 2021-11-05 11:32:42.442 INFO 11836 --- [ntContainer#0-2] c.y.r.service.RetryReceiverService : 当前执行次数:10 2021-11-05 11:32:42.444 WARN 11836 --- [ntContainer#0-2] s.a.r.r.ImmediateRequeueMessageRecoverer : Retries exhausted for message (Body:'Hello..........1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=retry.exchange, receivedRoutingKey=topic.yzm.retry, deliveryTag=1, consumerTag=amq.ctag-pyjXZ9-uJPmTevevHKq5VQ, consumerQueue=retry-a]); requeuing...
可以看出:重试5次之后,返回队列,然后再重试5次,周而复始直到不抛出异常为止,这样还是会影响后续的消息消费。
- 接着使用 RepublishMessageRecoverer 全局异常处理
在RetryRabbitConfig中配置
package com.yzm.rabbitmq03.config; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.ImmediateRequeueMessageRecoverer; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling public class RetryRabbitConfig { // @Bean public MessageRecoverer messageRecoverer() { return new ImmediateRequeueMessageRecoverer(); } public static final String RETRY_QUEUE = "retry-error"; public static final String RETRY_EXCHANGE = "retry.exchange"; public static final String RETRY_KEY = "retry-key"; @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, RETRY_KEY); } }
创建异常消费者
package com.yzm.rabbitmq03.service; import com.yzm.rabbitmq03.config.RetryRabbitConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class RetryReceiverService { private int count = 1; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "retry-a"), exchange = @Exchange(value = "retry.exchange", type = ExchangeTypes.TOPIC), key = {"topic.*.retry"} )) public void retry(Message message) { log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody())); log.info("当前执行次数:{}", count++); // 制造异常 int i = 1 / 0; log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody())); } // 处理异常消费者 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = RetryRabbitConfig.RETRY_QUEUE), exchange = @Exchange(value = RetryRabbitConfig.RETRY_EXCHANGE, type = ExchangeTypes.TOPIC), key = {RetryRabbitConfig.RETRY_KEY} )) public void error(Message message) { log.info(" [ 消费者@异常号 ] 接收到消息 ==> '" + new String(message.getBody())); } }
5 死信队列删除retry-a队列,移除前面的异常消息,重启项目,运行结果如下:
重试5次之后,将消息 Republishing failed message to exchange ‘retry.exchange’ with routing key retry-key 转发到异常队列,由异常消费者消费了
死信队列:创建一个普通队列时,通过添加配置绑定另一个交换机(死信交换机),在普通队列发生异常时,消息就通过死信交换机转发到绑定它的队列里,这个绑定死信交换机的队列就是死信队列
创建死信队列、死信交换机
创建正常队列,并添加死信交换机以及死信路由键的配置
package com.yzm.rabbitmq03.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import java.util.HashMap; import java.util.Map; @Configuration @EnableScheduling public class DeadLetterRabbitConfig { public static final String DL_QUEUE = "dl-queue"; public static final String DL_EXCHANGE = "dl.exchange"; public static final String DL_KEY = "dl.key"; // 死信队列 @Bean public Queue dlQueue() { return QueueBuilder.durable(DL_QUEUE).build(); } // 死信交换机 @Bean public DirectExchange dlExchange() { return ExchangeBuilder.directExchange(DL_EXCHANGE).build(); } // 绑定死信队列到死信交换机 @Bean public Binding dlBinding() { return BindingBuilder.bind(dlQueue()).to(dlExchange()).with(DL_KEY); } public static final String NORMAL_QUEUE = "normal-queue"; public static final String NORMAL_EXCHANGE = "normal.exchange"; public static final String NORMAL_KEY = "normal.key"; // 正常队列,添加配置 @Bean public Queue queue() { Mapparams = new HashMap<>(); params.put("x-dead-letter-exchange", DL_EXCHANGE);//声明当前队列绑定的死信交换机 params.put("x-dead-letter-routing-key", DL_KEY);//声明当前队列的死信路由键 return QueueBuilder.durable(NORMAL_QUEUE).withArguments(params).build(); } @Bean public DirectExchange exchange() { return ExchangeBuilder.directExchange(NORMAL_EXCHANGE).build(); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with(NORMAL_KEY); } }
生产者
package com.yzm.rabbitmq03.service; import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class DeadLetterSenderService { private final RabbitTemplate rabbitTemplate; private int count = 1; public DeadLetterSenderService(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @Scheduled(fixedDelay = 1000, initialDelay = 10000) public void send() { if (count <= 1) { String message = "Hello.........." + count++; rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY, message); System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); } } }
消费者
package com.yzm.rabbitmq03.service; import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class DeadLetterReceiverService { private int count = 1; @RabbitListener(queues = DeadLetterRabbitConfig.NORMAL_QUEUE) public void normal(Message message) { log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody())); log.info("当前执行次数:{}", count++); int i = 1 / 0; log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody())); } @RabbitListener(queues = DeadLetterRabbitConfig.DL_QUEUE) public void dl(Message message) { log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody())); } }
关闭重试生产者定时器功能
关闭 RepublishMessageRecoverer 全局异常处理功能,不然启动后的异常就会被全局异常队列处理,而不是死信队列处理,这种全局异常队列优先于死信队列;
恢复默认RejectAndDontRequeueRecoverer 拒绝并且不重新排队
6 延时队列重启项目,运行结果如下:
服务器上normal-queue有DLX、DLK标识,说明该队列绑定了死信交换机和死信路由键;
重试5次之后,就将消息转发给死信队列
死信队列是针对某个普通队列发生异常情况进行处理,而RepublishMessageRecoverer的异常处理是对所有普通队列发生异常进行处理,并且优先于死信队列
延时队列:一般来说,发布消息之后,会被交换机接收并转发给对应的队列,队列分配给消费者处理,这个过程很快秒级处理;但有时候我们希望发布完消息后,在指定的时间之后再去处理消息,这个时候就需要使用到延时队列;
虽说是延时队列,但其实也只是对死信队列的一种扩展应用罢了。
首先还是得创建普通队列,添加参数绑定死信队列同时设置消息过期时间,生产者发布消息到普通队列,而普通队列没有任何消费者来消费,那么消息在普通队列中存活到设定过期时间就被转发到死信队列,由死信队列的消费者消费消息,以此实现延时功能。
实现延时队列
package com.yzm.rabbitmq03.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import java.util.HashMap; import java.util.Map; @Configuration @EnableScheduling public class DeadLetterRabbitConfig { // 死信队列 ... public static final String NORMAL_QUEUE = "normal-queue"; public static final String NORMAL_QUEUE2 = "normal-queue2"; public static final String NORMAL_EXCHANGE = "normal.exchange"; public static final String NORMAL_KEY = "normal.key"; public static final String NORMAL_KEY2 = "normal.key2"; // 正常队列,添加配置 @Bean public Queue queue() { Mapparams = new HashMap<>(); params.put("x-dead-letter-exchange", DL_EXCHANGE);//声明当前队列绑定的死信交换机 params.put("x-dead-letter-routing-key", DL_KEY);//声明当前队列的死信路由键 return QueueBuilder.durable(NORMAL_QUEUE).withArguments(params).build(); } @Bean public DirectExchange exchange() { return ExchangeBuilder.directExchange(NORMAL_EXCHANGE).build(); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with(NORMAL_KEY); } // 延时队列 @Bean public Queue queue2() { Map params = new HashMap<>(); params.put("x-dead-letter-exchange", DL_EXCHANGE);//声明当前队列绑定的死信交换机 params.put("x-dead-letter-routing-key", DL_KEY);//声明当前队列的死信路由键 // 添加消息过期时间,10秒内还没处理完,就转发给死信队列 params.put("x-message-ttl", 10000); return QueueBuilder.durable(NORMAL_QUEUE2).withArguments(params).build(); } @Bean public Binding binding2() { return BindingBuilder.bind(queue2()).to(exchange()).with(NORMAL_KEY2); } }
生产者,指定路由键是 NORMAL_KEY2
package com.yzm.rabbitmq03.service; import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Slf4j @Component public class DeadLetterSenderService { private final RabbitTemplate rabbitTemplate; private int count = 1; public DeadLetterSenderService(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } // @Scheduled(fixedDelay = 1000, initialDelay = 10000) public void send() { if (count <= 1) { String message = "Hello.........." + count++; log.info(" [ 生产者 ] Sent ==> '" + message + "'"); rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY, message); } } @Scheduled(fixedDelay = 1000, initialDelay = 10000) public void send2() { if (count <= 1) { String message = "Hello.........." + count++; log.info(" [ 生产者 ] Sent ==> '" + message + "'"); rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY2, message); } } }
消费者,不需要对普通队列NORMAL_QUEUE2进行处理
package com.yzm.rabbitmq03.service; import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class DeadLetterReceiverService { private int count = 1; @RabbitListener(queues = DeadLetterRabbitConfig.NORMAL_QUEUE) public void normal(Message message) { log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody())); log.info("当前执行次数:{}", count++); int i = 1 / 0; log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody())); } @RabbitListener(queues = DeadLetterRabbitConfig.DL_QUEUE) public void dl(Message message) { log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody())); } }
启动项目,运行结果如下:
设置过期时间10秒,没有任何消费者对normal-queue2队列处理,10秒过后,消息转发到死信队列处理
现在这种过期时间设定就针对整个normal-queue2队列的所有消息的,还可以更细粒度的针对每一条消息设置过期时间。
实现:使用normal-queue作为普通队列,之前已经绑定死信队列了
生产者,向normal-queue发布消息
package com.yzm.rabbitmq03.service; import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; @Slf4j @Component public class DeadLetterSenderService { private final RabbitTemplate rabbitTemplate; private int count = 1; public DeadLetterSenderService(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } // @Scheduled(fixedDelay = 1000, initialDelay = 10000) public void send() { if (count <= 1) { String message = "Hello.........." + count++; log.info(" [ 生产者 ] Sent ==> '" + message + "'"); rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY, message); } } // @Scheduled(fixedDelay = 1000, initialDelay = 10000) public void send2() { if (count <= 1) { String message = "Hello.........." + count++; log.info(" [ 生产者 ] Sent ==> '" + message + "'"); rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY2, message); } } @Scheduled(fixedDelay = 1000, initialDelay = 10000) public void send3() { if (count <= 1) { String s = "Hello.........." + count++; log.info(" [ 生产者 ] Sent ==> '" + s + "'"); //设置过期时间 MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("12000"); Message message = new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties); rabbitTemplate.convertAndSend(DeadLetterRabbitConfig.NORMAL_EXCHANGE, DeadLetterRabbitConfig.NORMAL_KEY, message); } } }
消费者,注释掉normal-queue队列的消费者
package com.yzm.rabbitmq03.service; import com.yzm.rabbitmq03.config.DeadLetterRabbitConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Slf4j @Component public class DeadLetterReceiverService { private int count = 1; // @RabbitListener(queues = DeadLetterRabbitConfig.NORMAL_QUEUE) public void normal(Message message) { log.info(" [ 消费者@A号 ] 接收到消息 ==> '" + new String(message.getBody())); log.info("当前执行次数:{}", count++); int i = 1 / 0; log.info(" [ 消费者@A号 ] 消费了消息 ==> '" + new String(message.getBody())); } @RabbitListener(queues = DeadLetterRabbitConfig.DL_QUEUE) public void dl(Message message) { log.info(" [ 消费者@死信号 ] 接收到消息 ==> '" + new String(message.getBody())); } }
启动项目
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)