所谓的死信队列,也就是我们说的延迟队列。其实现方式就是给普通队列绑定一个所谓的死信队列,给消息设置一个过期时间,在该时间内如果消息没有被消费,那么则会进入死信队列。我这里以Direct模式处理,fanout模式也是类似
下面开始整活。(前提是你已经会基本使用springboot+rabbitmq)
org.springframework.boot spring-boot-starter-amqp
spring: rabbitmq: host: 127.0.0.1 #mq服务器ip,默认为localhost port: 5672 #mq服务器port,默认为5672 username: guest #mq服务器username,默认为gust password: guest #mq服务器password,默认为guest virtual-host: / #publisher-/confirm/is: true #消费消息的时候,就必须手动ack确认,不然消息永远还在队列中 listener: simple: acknowledge-mode: manual direct: acknowledge-mode: manual
基本配置
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DeadConfig { private String normalQueue="normal_queue"; private String normalExchange="normal_exchange"; private String normalRoutingKey="normal_routingkey"; private String deadQueue="dead_queue"; private String deadExchange="dead_exchange"; private String deadRoutingKey="dead_routingkey"; @Bean Queue normalQueue(){ // return new Queue(normalQueue); Mapmap = new HashMap<>(); map.put("x-dead-letter-exchange",deadExchange); map.put("x-dead-letter-routing-key", deadRoutingKey); return new Queue(normalQueue,true,false,false,map); } @Bean DirectExchange normalExchange(){ return new DirectExchange(normalExchange); } @Bean Binding normalBindingExchange(){ return BindingBuilder.bind( normalQueue()).to(normalExchange()).with(normalRoutingKey); } @Bean Queue deadQueue(){ return new Queue(deadQueue); } @Bean DirectExchange deadExchange(){ return new DirectExchange(deadExchange); } @Bean Binding deadBindingExchange(){ return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(deadRoutingKey); } }
消息生产者
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.util.Arrays; import java.util.List; @RestController @Slf4j public class RabbitProduct { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/123") public void mysendDelayMessage() { Listlist = Arrays.asList("123", "456", "789", "899"); log.info("发送时间:{},发送内容:{}", LocalDateTime.now(), list.toString()); this.rabbitTemplate.convertAndSend( "normal_exchange", "normal_routingkey", list, message -> { message.getMessageProperties().setExpiration("3000"); return message; } ); } }
普通队列取消费消息的时候,死信是接收不到的,由于我们这里有对应的队列去消费,所以死信这边是无法获取到消息的
当我们把普通消息队列注释掉之后,也就是不让其消费,等时间到期之后就会转到死信队列中,通过时间差我们就能看出来效果
消费者代码
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; import java.time.LocalDateTime; import java.util.List; @Component @Slf4j public class RabbitConsumer { @RabbitListener(queues = "normal_queue") public void normal_queue(Listlist, Message message, Channel channel) throws IOException { log.info("正常队列收到消息时间为:{},收到的消息内容为:{}", LocalDateTime.now(), list.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = "dead_queue") public void myDealy(List list, Message message, Channel channel) throws IOException { log.info("死信收到消息时间为:{},收到的消息内容为:{}", LocalDateTime.now(), list.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)