消息队列 RabbitMQ[十] SpringBoot 死信队列与延迟队列实现思路

消息队列 RabbitMQ[十] SpringBoot 死信队列与延迟队列实现思路,第1张

目录

消息队列 RabbitMQ[一] RabbitMQ的下载与安装
消息队列 RabbitMQ[二] RabbitMQ可视化管控台创建用户并为用户分配虚拟机
消息队列 RabbitMQ[三] RabbitMQ的HelloWorld工作模式(SpringBoot方式与amqp-client方式)
消息队列 RabbitMQ[四] RabbitMQ的Publish/Subscribe(发布/订阅)工作模式(SpringBoot方式与amqp-client方式)
消息队列 RabbitMQ[五] RabbitMQ的Routing工作模式(SpringBoot方式与amqp-client方式)
消息队列 RabbitMQ[六] RabbitMQ的Topics工作模式(SpringBoot方式与amqp-client方式)
消息队列 RabbitMQ[七] RabbitMQ保证消息的可靠性传递(Confirm Return Ack)
消息队列 RabbitMQ[八] SpringBoot Consumer 限流机制
消息队列 RabbitMQ[九] SpringBoot 设置消息过期时间TTL
消息队列 RabbitMQ[十] SpringBoot 死信队列与延迟队列实现思路


消息成为死信消息的三种情况
  1. 队列消息长度达到限制,多余的消息将成为死信消息
  2. 消费者拒绝接收消息,并不把该消息返回到队列中
  3. 消息达到过期时间未被消费者消费
消息成为死信消息之后会发送到另一个交换机,这个交换机称为死信交换机 和死信交换机绑定的队列就是死信队列

整体架构是这样的👇👇

配置文件
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME = "topic_exchange";
    public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
    public static final String QUEUE1_NAME = "topic_queue";
    public static final String QUEUE2_NAME = "dead_queue";

    // 1. 创建Exchange交换机
    @Bean("topic_exchange")
    public Exchange createExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    @Bean("dead_exchange")
    public Exchange createDeadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).durable(true).build();
    }

    // 2. 创建Queue队列
    @Bean("topic_queue")
    public Queue createQueue1(){
        return QueueBuilder
                .durable(QUEUE1_NAME)
                 // 这里就是绑定死信交换机
                .withArgument("x-dead-letter-exchange", "dead_exchange")
                 // 绑定死信routingkey 只有routingkey为#.dead的消息才会路由到死信交换机
                .withArgument("x-dead-letter-routing-key", "#.dead") 
                .build();
    }
    @Bean("dead_queue")
    public Queue createQueue2(){
        return QueueBuilder.durable(QUEUE2_NAME).build();
    }

    // 3. 创建绑定交换机与队列
    @Bean
    public Binding createBindingTopic(@Qualifier("topic_queue") Queue queue, @Qualifier("topic_exchange") Exchange exchange){
    	// 最好是设置一样的routingkey,因为这个消息既要路由到topic_queue还要能路由到dead_queue
        return BindingBuilder.bind(queue).to(exchange).with("#.dead").noargs();
    }
    @Bean
    public Binding createBindingDead(@Qualifier("dead_queue") Queue queue, @Qualifier("dead_exchange") Exchange exchange){		
    	// 这里的.with("#.dead")其实写什么都行,因为它会根据上边的
    	// .withArgument("x-dead-letter-routing-key", "#.dead")来路由
        return BindingBuilder.bind(queue).to(exchange).with("#.dead").noargs();
    }
}

生产者代码
import com.zdy.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testTopicsSend(){
    	// 这里就用消息时间过期来作为案例测试死信队列
        MessageProperties messageProperties = new MessageProperties();
        // 设置过期时间
        messageProperties.setExpiration("5000");
        CorrelationData correlationData = new CorrelationData();
        Message message = new Message("hello rabbitmq".getBytes(), messageProperties);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "asd.dead", message);
    }
}

效果

5秒前🙈 ,消息在topic_queue中

5秒后🙉,消息过期成为死信消息并成功路由到dead_queue中

延迟队列实现思路

rabbitmq中并没有给出延迟队列的实现方式,但我们可以通过设置过期时间TTL+死信队列来实现延时队列🤠

以新用户注册七天发送慰问短信为例,我们可以将用户的注册信息存储到正常的队列中,设置7天的过期时间,发送短信的服务订阅死信队列,当七天之后用户的信息就会路由到死信队列中,发送短信服务就会读取用户信息,发送慰问短信✉

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/872101.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存