上一章我们讲解了rabbitmq的四种交换机类型、七种通讯方式。本章我们将整合springboot来向大家完整演示rabbitmq的使用,并说明如何保证消息的可靠性。
1. RabbitMQ的安装这里为了快速部署,我们通过docker来安装,如果需要其他安装方式的可以去rabbitmq官网或者github下载对应系统安装包来安装
1、下载镜像
docker pull rabbitmq
本文书写时,其版本为rabbitmq3.8.9
2、安装镜像
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
3、开启远程管理,否则通过15672无法登陆管理页面
进入到docker容器中执行:
# 查看容器id
docker ps -a
# 进入容器
docker exec -it 容器id /bin/bssh
# 容器内执行
rabbitmq-plugins enable rabbitmq_management
4、访问ip:15672。如果是在虚拟机中安装的,记得开通15672,5672端口
1、创建springboot项目
2、在项目中引入amqp
依赖,这里的版本与springboot保持一致
org.springframework.boot
spring-boot-starter-amqp
2.3.7.RELEASE
2、配置文件中添加rabbitmq的配置
spring:
# rabbitmq配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
3、声明及创建交换机、队列,如果不知道交换机、队列、routingKey概念的,可以先查看上一篇rabbitmq博客
这里我们创建一个队列和直接交换机来示例
(1)创建队列
@Bean
public Queue routingQueueA(){
return new Queue(RabbitConstant.ROUTING_QUEUE_A);
}
(2)创建交换机
@Bean
public DirectExchange testExchange(){
return new DirectExchange("test.exchange");
}
(3)绑定队列和交换机,并且设置routingKey
@Bean
public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
return BindingBuilder.bind(testQueue).to(testExchange).with("test.routing.key");
}
完整代码:
注意以上代码是写在一个配置类中的,目的是为了在项目启动时能够加载该类,并且创建对应的Bean,即队列、交换机和绑定关系
@Configuration
public class RabbitMqConfig {
@Bean
public Queue testQueue(){
return new Queue("test.queue");
// 另一种创建队列的方法
// return QueueBuilder.durable("test.queue").build();
}
@Bean
public DirectExchange testExchange(){
return new DirectExchange("test.exchange");
// 另一种创建交换机的方法
// return ExchangeBuilder.directExchange("test.exchange").build();
}
@Bean
public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
return BindingBuilder.bind(testQueue).to(testExchange).with("test.routing.key");
}
}
这里我们只声明了一个直接交换机,单个队列,rabbitmq的其他消息模型和交换机类型大家可以到上一篇中查看,这里不再累叙
4、生产者发送消息
(1)创建消息对象,这是我们要发送到消息队列中的自定义的消息对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MyMessage implements Serializable {
private Long id;
private String title;
private String body;
private Date createDate;
}
(1)创建发送方法
public class QueueController {
private final RabbitTemplate rabbitTemplate;
@GetMapping("sendTestQueue")
public String sendTestQueue(){
MyMessage message = new MyMessage(1L,"物流提醒","到达装货区域,注意上传凭证",new Date());
rabbitTemplate.convertAndSend("test.exchange","test.routing.key", message);
return "发送成功";
}
}
5、创建消费者
@Component
public class QueueListener {
@RabbitListener(queues = "test.queue")
public void handler(MyMessage messageInfo, Message message, Channel channel) {
System.out.println("接收的消息:"+messageInfo);
}
}
6、测试调用我们的消息发送方法
7、可以看到结果中显示了我们刚刚发送的消息
我们在上一章没有讲解的Publisher Confirms模式就是用来保证消息可靠性的。下面我们来看看实现消息可靠性的具体代码,也就是实现Publisher Confirms模式。
2.3.1 哪些环节会导致消息丢失首先我们要明白消息可靠性也就是保证消息不丢失,那么就要先理解消息会在哪些环节丢失,我们通过一张图来表述可能会导致消息丢失的环节
(1)生产者到交换机的过程中,如果生产者将消息发送出去了,但是还没送达之前,rabbitmq宕机了,或者因为网络原因消息在传输过程中丢失了,但生产者又不知道交换机没有收到,就会导致消息的丢失
(2)因为rabbitmq是基于内存运行的,当rabbitmq宕机或者重启,内存被初始化,就会导致消息丢失。
(3)交换机到队列的过程中,消息还没到达队列时,rabbitmq宕机了,就会导致消息丢失
(4)同2
(5)队列发送消息到消费者的过程中,当队列把消息发送出去了,在发送途中,因为网络波动或者消息者宕机导致消费者没有收到消息,但是队列并不知道消费者没有收到消息,就会导致消息丢失
(6)消费者接收到消息之后,还没有来得及处理消息,消费者就宕机了,也会导致消息丢失。
下面我们来针对这六个环节来谈谈如何保证消息不丢失
2.3.2 保证消息一定发送到交换机利用消息队列的confirm机制可以保证消息发送到交换机的可靠性
-
思路:
所谓confirm机制就是:交换机收到消息后会发送一个ack回执给生产者,接收成功ack=true,接收失败ack=false。那么我们就可以通过设置一个回调函数来监听这个ack,如果接收失败就叫消息重发或者存到数据库中后续补发,如果没有收到ack就说明消息在传输中丢失了,那么也进行补发在开始讲解代码实现之前,要向大家普通,rabbtimq中confirm机制提供了三种类型:
SIMPLE:,发送的消息到达生产者后会触发waitForConfirms回调方法,为同步方式
CORRELATED:发送的消息到达生产者后会触发回调方法,为异步方式,相比较于SIMPLE效率更高
NONE:禁用发布确认类型,是默认值 -
代码实现:
(1)首先将confirm机制设置为correlated
可以通过两种方式设置:一是在配置文件中设置,二是通过setPublisherConfirmType方法设置
spring:
rabbitmq:
publisher-confirm-type: correlated
(2)实现confirmCallback方法。这里的重试机制可以设置为重新发送,或者将消息存放到数据库后续再发送
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("消息发送交换机{}:correlationData({}),ack({}),cause({})",ack ? "成功":"失败", correlationData, ack, cause)
if(!ack){
// 发送失败进行重试机制
}
});
return rabbitTemplate;
}
confirmCallback方法中的三个参数分别为
2.3.3 实现消息持久化correlationData:相关数据
ack:消息是否到达交换机,true是,false否
cause:发送失败原因
我们上述已经说明,消息存在exchange和queue时可能也会导致消息丢失,那么我们如何保证消息不丢失呢?
这里想象一下redis也是基于内存的,它怎么防止数据丢失呢?
那就是做持久化,所谓持久化,就是把数据保存到磁盘,rabbitmq中怎么实现呢,rabbitmq接收到消息后先存储到内存,然后再存储到磁盘,只有当磁盘保存完毕后,才发送回执给生产者,这样即使rabbitmq宕机了也不会导致消息丢失
- Exchange,Queue持久化
存储消息的地方有Exchange和Queue,那么我们就要在这两个地方实现持久化
我们查看之前创建交换机和队列的源码,其实会发现,里面有一个durable属性,就是用来声明是否持久化的,我们创建时如果不声明就默认为true了。
但是需要注意的是,这里的持久化只是用来控制交换机和队列是否持久化的。当durable=false时,只要rabbitmq重启,当没有消费者监听该交换机或者队列时,该交换机或队列就会被删除。常常用在临时队列中。durable=true时,交换机和队列就会被保存至磁盘,重启后会从磁盘读取到内存。
- 消息持久化
消息的载体是交换机和队列我们要先实现他们的持久化,然后再实现消息本身的持久化
原生的做法,是通过设置BasicProperties的deliveryMode为2来声明其消息实现持久化,如下所示:
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.deliveryMode(2)
.build();
但我们现在的演示都是整合了springboot的,我们来看看其发送消息的方法的源码
(1)我们的消息发送是通过RabbitTemplate.convertAndSend
方法实现的。该方法中调用了this.convertMessageIfNecessary(object)
方法将消息进行了转换
(2)我们打开convertMessageIfNecessary
方法,方法中新建了一个MessageProperties
对象。咱们上述原生的声明不就是通过一个Properties对象来实现的吗,于是我们点击进该对象查看源码
(3)会发现该对象的无参构造方法中,声明了一个deliveryMode
属性,其值为常量
DEFAULT_DELIVERY_MODE
(4)继续追踪该常量的值,会发现其定义就是一个枚举类MessageDeliveryMode.PERSISTENT
,其命名为持久化,通过其名称我们已经能够联想到什么了。
(5)为求真相,打开该枚举类,于是乎我们终于找到了我们想要的东西,其值就是2,与原生的设置异曲同工,这说明,amqp中默认就将消息设置为持久化的了。
所以呢,也不需要我们配置了,可能有同学会疑惑,都不用配置的,你讲他干嘛,这不浪费时间吗?
学习,讲究知其然,知其所以然。如果你抱有不求甚解的态度去学习,那么你能学到的永远是皮毛,经不起考究。
2.3.4 保证消息一定路由到队列我们上述所说的confirm机制,只能保证消息发送到Exchange,并不能保证Exchange一定能将消息路由到Queue
我们就需要Return机制来保证消息能够路由到队列
-
思路:
Return机制,就是当消息进入从交换机转发到消息队列,但消息队列未收到时调用回调函数,可以在回调函数中通过实现我们的重试机制来实现消息不丢失。这里需要注意的是,return机制提供了两种模式,通过Mandatory属性来设置
(1)Mandatory=true,消息通过交换机无法匹配到队列时会返回给生产者,并触发returnCallback
(2)Mandatory=false,消息通过交换机无法匹配到队列时会直接丢弃消息,默认配置 -
代码:
(1)开启return机制
方式一:setPublisherReturns
connectionFactory.setPublisherReturns(true);
方式二:setMandatory,Mandatory为true时会自动开启return机制
rabbitTemplate.setMandatory(true);
(3)声明returnCallback方法:重试机制中可以重新发送消息,或者存储数据库后续重发
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// connectionFactory.setPublisherReturns(true);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
// 路由失败,进行重试机制
});
return rabbitTemplate;
}
returnCallback方法参数
2.3.5 保证消息一定被消费者消费者message: 消息
replyCode:回应码
replyText:回应信息
exchange:交换机
routingKey:路由键
想要保证消息一定被消费者消费,我们可以通过手动ACK的形式,我们上述讲到了消息接收后会发送一个ACK回执,通过该回执来确定消息是否达到
- 思路:
同理,针对发送给消费者,我们也可以通过手动ACK的形式,所谓手动ACK,就是消费者自己确定什么时候发送这个ACK回执过去,于是可以等到消息消费完毕后再发送这个回执回去,这样就能确保消息已经被消费,消息队列收到ACK后才将消息删除,如果这中间被中断,那么就不会有ACK回执,那消息队列中的消息就不会被删除 - 代码:
(1)开启手动ack
spring:
rabbitmq:
# 手动提交消息
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: manual
acknowledge-mode提供了三种模式:
NONE:自动模式,默认配置,只要有消费者接受到消息,无论消费成功都认为消费成功
MANUAL:手动模式,消费者自己控制什么时候返回ACK
AUTO:自动模式,但会根据报错来决定是否删除队列中的消息,具体规则如下
如果成功消费,没有抛出异常,则自动确认,删除队列中的消息
如果抛出AmqpRejectAndDontRequeueException异常,拒绝确认,不删除队列中的消息
如果抛出ImmediateAcknowledgeAmqpException异常,自动确认,删除队列汇总消息
如果抛出其他异常,则拒绝确认,不会删除队列中的消息
(2)消费者发送ack
@RabbitListener(queues = "test.queue")
public void handler(MyMessage messageInfo, Message message, Channel channel) {
try{
System.out.println("接收的消息:"+messageInfo.toString());
// 返回ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (IOException e){
try {
channel.basicRecover();
} catch (IOException ex) {
ex.printStackTrace();
log.error("消息处理失败:{}",e.getMessage());
}
}
}
配置文件中完整代码
@Configuration
@Slf4j
public class RabbitMqConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("消息发送交换机{}:correlationData({}),ack({}),cause({})",ack ? "成功":"失败", correlationData, ack, cause);
if(!ack){
// 发送失败进行重试机制
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
// 路由失败,进行重试机制
});
return rabbitTemplate;
}
@Bean
public Queue testQueue(){
return new Queue("test.queue");
}
@Bean
public DirectExchange testExchange(){
return new DirectExchange("test.exchange");
}
@Bean
public Binding testQueueBing(Queue testQueue, DirectExchange testExchange){
return BindingBuilder.bind(testQueue).to(testExchange).with("test.routing.key");
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)