1、1 简介 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 1、2 消息成为死信的三种情况: 队列消息长度到达限制; 消费者拒接消费消息,并且不重回队列; 原队列存在消息过期设置,消息到达超时时间未被消费;
2、1 代码实现 // DLX // 先定义正常的队列和交换机 public static final String TEST_DLX_EXCHANGE_NAME = "test_dlx_exchange"; public static final String TEST_DLX_QUEUE_NAME = "test_dlx_queue"; // 定义死信队列 public static final String DLX_EXCHANGE_NAME = "dlx_exchange"; public static final String DLX_QUEUE_NAME = "dlx_queue"; @Bean("SpringBootDLXExchange") public Exchange DLXExchange(){ return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).durable(true).build(); } @Bean("SpringBootDLXQueue") public Queue DLXQueue() { return QueueBuilder.durable(DLX_QUEUE_NAME).build(); } @Bean public Binding bindDLXQueueTOExchange(@Qualifier("SpringBootDLXQueue") Queue queue, @Qualifier("SpringBootDLXExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs(); } @Bean("SpringBootDLXTestExchange") public Exchange DLXTestExchange(){ return ExchangeBuilder.topicExchange(TEST_DLX_EXCHANGE_NAME).durable(true).build(); } @Bean("SpringBootDLXTestQueue") public Queue DLXTestQueue() { Mapmap = new HashMap<>(); // 设置队列的过期时间 map.put("x-message-ttl",10000); // 设置队列的长度限制 map.put("x-max-length",10); // x-dead-letter-exchange:死信交换机名称 map.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME); // x-dead-letter-routing-key:发送给死信交换机的routingkey map.put("x-dead-letter-routing-key","dlx.hello"); return QueueBuilder.durable(TEST_DLX_QUEUE_NAME).withArguments(map).build(); } @Bean public Binding bindDLXTestQueueTOExchange(@Qualifier("SpringBootDLXTestQueue") Queue queue, @Qualifier("SpringBootDLXTestExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs(); } 2、2 消费者代码: @Component public class DLXListener { @RabbitListener(queues = "test_dlx_queue") public void TopicQueueListener(Message msg,Channel channel) throws Exception { long deliveryTag = msg.getMessageProperties().getDeliveryTag(); try { Thread.sleep(2000); // 1、接受转换的消息 System.out.println(new String(msg.getBody())); // 2、业务逻辑处理 // 手动异常 int i = 3 / 0; System.out.println("业务逻辑处理中-------------"); // 3、手动签收 channel.basicAck(deliveryTag,true); } catch (Exception e) { // 4、接受失败策略 channel.basicNack(deliveryTag,true,false); } } } 2、3 测试: @SpringBootTest @RunWith(SpringRunner.class) public class DLXProducerTest { // 1、注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; @Test public void testDlx(){ // 1. 测试过期时间,死信消息 // rabbitTemplate.convertAndSend("test_dlx_exchange","test.dlx.hello","我是一条消息,我会死吗?"); // 2. 测试长度限制后,消息死信 // for (int i = 0; i < 20; i++) { // rabbitTemplate.convertAndSend("test_dlx_exchange","test.dlx.hello","我是一条消息,我会死吗?"); // } // 3. 测试消息拒收 rabbitTemplate.convertAndSend("test_dlx_exchange","test.dlx.haha","我是一条消息,我会死吗?"); } }
效果如下:
1、测试过期时间
2、测试长度限制后,消息死信
3、测试消息拒收
项目代码链接:https://github.com/Mbm7280/rabbitmq_demo
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)