消息队列 RabbitMQ[一] RabbitMQ的下载与安装
消息队列 RabbitMQ[二] RabbitMQ可视化管控台创建用户并为用户分配虚拟机
消息队列 RabbitMQ[三] RabbitMQ的HelloWorld工作模式(SpringBoot方式与amqp-client方式)
消息队列 RabbitMQ[四] RabbitMQ的Publish/Subscribe(发布/订阅)工作模式(SpringBoot方式与amqp-client方式)
消息队列 RabbitMQ[五] RabbitMQ的Routing工作模式(SpringBoot方式与amqp-client方式)
消息队列 RabbitMQ[六] RabbitMQ的Topics工作模式(SpringBoot方式与amqp-client方式)
消息队列 RabbitMQ[七] RabbitMQ保证消息的可靠性传递(Confirm Return Ack)
消息队列 RabbitMQ[八] SpringBoot Consumer 限流机制
消息队列 RabbitMQ[九] SpringBoot 设置消息过期时间TTL
消息队列 RabbitMQ[十] SpringBoot 死信队列与延迟队列实现思路
消息成为死信消息的三种情况
消息成为死信消息之后会发送到另一个交换机,这个交换机称为死信交换机 和死信交换机绑定的队列就是死信队列
- 队列消息长度达到限制,多余的消息将成为死信消息
- 消费者拒绝接收消息,并不把该消息返回到队列中
- 消息达到过期时间未被消费者消费
配置文件整体架构是这样的👇👇
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "topic_exchange";
public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
public static final String QUEUE1_NAME = "topic_queue";
public static final String QUEUE2_NAME = "dead_queue";
// 1. 创建Exchange交换机
@Bean("topic_exchange")
public Exchange createExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean("dead_exchange")
public Exchange createDeadExchange(){
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).durable(true).build();
}
// 2. 创建Queue队列
@Bean("topic_queue")
public Queue createQueue1(){
return QueueBuilder
.durable(QUEUE1_NAME)
// 这里就是绑定死信交换机
.withArgument("x-dead-letter-exchange", "dead_exchange")
// 绑定死信routingkey 只有routingkey为#.dead的消息才会路由到死信交换机
.withArgument("x-dead-letter-routing-key", "#.dead")
.build();
}
@Bean("dead_queue")
public Queue createQueue2(){
return QueueBuilder.durable(QUEUE2_NAME).build();
}
// 3. 创建绑定交换机与队列
@Bean
public Binding createBindingTopic(@Qualifier("topic_queue") Queue queue, @Qualifier("topic_exchange") Exchange exchange){
// 最好是设置一样的routingkey,因为这个消息既要路由到topic_queue还要能路由到dead_queue
return BindingBuilder.bind(queue).to(exchange).with("#.dead").noargs();
}
@Bean
public Binding createBindingDead(@Qualifier("dead_queue") Queue queue, @Qualifier("dead_exchange") Exchange exchange){
// 这里的.with("#.dead")其实写什么都行,因为它会根据上边的
// .withArgument("x-dead-letter-routing-key", "#.dead")来路由
return BindingBuilder.bind(queue).to(exchange).with("#.dead").noargs();
}
}
生产者代码
import com.zdy.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testTopicsSend(){
// 这里就用消息时间过期来作为案例测试死信队列
MessageProperties messageProperties = new MessageProperties();
// 设置过期时间
messageProperties.setExpiration("5000");
CorrelationData correlationData = new CorrelationData();
Message message = new Message("hello rabbitmq".getBytes(), messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "asd.dead", message);
}
}
效果
5秒前🙈 ,消息在topic_queue中
延迟队列实现思路5秒后🙉,消息过期成为死信消息并成功路由到dead_queue中
在rabbitmq中并没有给出延迟队列的实现方式,但我们可以通过设置过期时间TTL+死信队列来实现延时队列🤠
以新用户注册七天发送慰问短信为例,我们可以将用户的注册信息存储到正常的队列中,设置7天的过期时间,发送短信的服务订阅死信队列,当七天之后用户的信息就会路由到死信队列中,发送短信服务就会读取用户信息,发送慰问短信✉
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)