RabbitMq-进阶-07-DLX

RabbitMq-进阶-07-DLX,第1张

RabbitMq-进阶-07-DLX
1、1 简介
死信队列,英文缩写:DLX  。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

1、2 消息成为死信的三种情况:
		队列消息长度到达限制;
		消费者拒接消费消息,并且不重回队列;
		原队列存在消息过期设置,消息到达超时时间未被消费;
2、1 代码实现

    // DLX
    // 先定义正常的队列和交换机
    public static final String TEST_DLX_EXCHANGE_NAME = "test_dlx_exchange";
    public static final String TEST_DLX_QUEUE_NAME = "test_dlx_queue";
    // 定义死信队列
    public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
    public static final String DLX_QUEUE_NAME = "dlx_queue";
    
    @Bean("SpringBootDLXExchange")
    public Exchange DLXExchange(){
        return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).durable(true).build();
    }

    
    @Bean("SpringBootDLXQueue")
    public Queue DLXQueue() {
        return QueueBuilder.durable(DLX_QUEUE_NAME).build();
    }

    
    @Bean
    public Binding bindDLXQueueTOExchange(@Qualifier("SpringBootDLXQueue") Queue queue, @Qualifier("SpringBootDLXExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
    }


    
    @Bean("SpringBootDLXTestExchange")
    public Exchange DLXTestExchange(){
        return ExchangeBuilder.topicExchange(TEST_DLX_EXCHANGE_NAME).durable(true).build();
    }

    
    @Bean("SpringBootDLXTestQueue")
    public Queue DLXTestQueue() {
        Map map = new HashMap<>();
        // 设置队列的过期时间
        map.put("x-message-ttl",10000);
        // 设置队列的长度限制
        map.put("x-max-length",10);
        // x-dead-letter-exchange:死信交换机名称
        map.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
        // x-dead-letter-routing-key:发送给死信交换机的routingkey
        map.put("x-dead-letter-routing-key","dlx.hello");
        return QueueBuilder.durable(TEST_DLX_QUEUE_NAME).withArguments(map).build();
    }

    
    @Bean
    public Binding bindDLXTestQueueTOExchange(@Qualifier("SpringBootDLXTestQueue") Queue queue, @Qualifier("SpringBootDLXTestExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();
    }



2、2 消费者代码:

@Component
public class DLXListener {
    @RabbitListener(queues = "test_dlx_queue")
    public void TopicQueueListener(Message msg,Channel channel) throws Exception {
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        try {
            Thread.sleep(2000);
            // 1、接受转换的消息
            System.out.println(new String(msg.getBody()));
            // 2、业务逻辑处理
            // 手动异常
            int i = 3 / 0;
            System.out.println("业务逻辑处理中-------------");
            // 3、手动签收
            
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            // 4、接受失败策略
            
            channel.basicNack(deliveryTag,true,false);
        }
    }
}


2、3 测试:


@SpringBootTest
@RunWith(SpringRunner.class)
public class DLXProducerTest {

    // 1、注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @Test
    public void testDlx(){
        // 1. 测试过期时间,死信消息
        // rabbitTemplate.convertAndSend("test_dlx_exchange","test.dlx.hello","我是一条消息,我会死吗?");

        // 2. 测试长度限制后,消息死信
//        for (int i = 0; i < 20; i++) {
//            rabbitTemplate.convertAndSend("test_dlx_exchange","test.dlx.hello","我是一条消息,我会死吗?");
//        }

        // 3. 测试消息拒收
        rabbitTemplate.convertAndSend("test_dlx_exchange","test.dlx.haha","我是一条消息,我会死吗?");
    }
}

效果如下:

1、测试过期时间

2、测试长度限制后,消息死信

3、测试消息拒收

项目代码链接:https://github.com/Mbm7280/rabbitmq_demo

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5696460.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存