从上面的图可以看到,消息的投递有三个对象参与:生产者、broker、消费者
生产者发送消息到broker时,要保证消息的可靠性,主要的方案有以下2种
- 发送发确认失败通知
在不做任何配置的情况下,生产者是不知道消息是否真正到达RabbitMQ,也就是说消息发布 *** 作不返回任何消息给生产者。
失败通知如果出现消息无法投递到队列会出现失败通知
那么怎么保证我们消息发布的可靠性?这里我们就可以启动失败通知,在原生编程中在发送消息时设置 mandatory标志,即可开启故障检测模式。
注意:它只会让 RabbitMQ 向你通知失败,而不会通知成功。如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,投递到队列的消息可能会没来得及持久化就宕机丢失了。
实现方式
spring配置:
关键代码,注意需要发送者实现 ReturnCallback 接口方可实现失败通知
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //消息体为空直接返回 if (null == message) { return; } TaxiBO taxiBO = JSON.parseObject(message.getBody(), TaxiBO.class); if (null != taxiBO) { //删除rediskey redisHelper.handelAccountTaxi(taxiBO.getAccountId()); //记录错误日志 recordErrorMessage(taxiBO, replyText, exchange, routingKey, message, replyCode); } }
遇到的问题
如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,我们可以使用RabbitMQ的发送方确认来实现,它不仅仅在路由失败的时候给我们发送消息,并且能够在消息路由成功的时候也给我们发送消息。
发送方确认是指生产者投递消息后,如果 Broker 接收到消息,则会给生产者一个应答。生产者进行接收应答,用来确认这条消息是否正常的发送到 Broker,这种方式也是消息可靠性投递的核心保障
rabbitmq消息发送分为两个阶段:
- 将消息发送到broker,即发送到exchage交换机消息通过交换机exchange被路由到队列queue
一旦消息投递到队列,队列则会向生产者发送一个通知,如果设置了消息持久化到磁盘,则会等待消息持久化到磁盘之后再发送通知
注意:发送发确认只有出现RabbitMQ内部错误无法投递才会出现发送发确认失败。
发送方确认模式需要分两种情况下列来看
①不可路由
当前消息到达交换器后对于发送者确认是成功的
首先当RabbitMQ交换器不可路由时,消息也根本不会投递到队列中,所以这里只管到交换器的路径,当消息成功送到交换器后,就会进行确认 *** 作。
另外在这过程中,生产者收到了确认消息后,那么因为消息无法路由,所以该消息也是无效的,无法投递到队列,所以一般情况下这里会结合失败通知来一同使用,这里一般会进行设置 mandatory模式,失败则会调用addReturnListener监听器来进行处理。
②可以路由
只要消息能够到达队列即可进行确认,一般是RabbitMQ发生内部错误才会出现失败
可以路由的消息,要等到消息被投递到所有匹配的队列之后,broker会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。
使用方式spring配置
spring: rabbitmq: # 开启消息确认机制 publisher-/confirm/i-type: correlated
关键代码,注意需要发送者实现 ConfirmCallback 接口方可实现失败通知
@Override public void /confirm/i(CorrelationData correlationData, Boolean ack, String cause) { //只有异常的数据才需要处理 if (!ack) { //关联数据为空直接返回 if (correlationData == null) { return; } //检查返回消息是否为null if (null != correlationData.getReturnedMessage()) { TaxiBO taxiBO = JSON.parseObject(correlationData.getReturnedMessage().getBody(), TaxiBO.class); //处理消息还原用户未打车状态 redisHelper.handelAccountTaxi(taxiBO.getAccountId()); //获取交换器 String exchange = correlationData.getReturnedMessage().getMessageProperties().getHeader("SEND_EXCH ANGE"); //获取队列信息 String routingKey = correlationData.getReturnedMessage().getMessageProperties().getHeader("SEND_ROUT ING_KEY"); //获取当前的消息体 Message message = correlationData.getReturnedMessage(); //记录错误日志 recordErrorMessage(taxiBO, cause, exchange, routingKey, message, -1); } } }Broker丢失消息
前面我们从生产者的角度分析了消息可靠性传输的原理和实现,这一部分我们从broker的角度来看一下如何能保证消息的可靠性传输。
假设有现在一种情况,生产者已经成功将消息发送到了交换机,并且交换机也成功的将消息路由到了队列中,但是在消费者还未进行消费时,mq挂掉了,那么重启mq之后消息还会存在吗?如果消息不存在,那就造成了消息的丢失,也就不能保证消息的可靠性传输了。
也就是现在的问题变成了如何在mq挂掉重启之后还能保证消息是存在的?
开启RabbitMQ的持久化,也即消息写入后会持久化到磁盘,此时即使mq挂掉了,重启之后也会自动读取之前存储的额数据
①持久化队列:
@Bean public Queue queue(){ return new Queue(queueName,true); }
②持久化交换器:
@Bean DirectExchange directExchange() { return new DirectExchange(exchangeName,true,false); }
③发送持久化消息
发送消息时,设置消息的deliveryMode=2。如果使用SpringBoot的话,发送消息时自动设置deliveryMode=2,不需要人工再去设置
Broker总结:
通过以上方式,可以保证大部分消息在broker不会丢失,但是还是有很小的概率会丢失消息,什么情况下会丢失呢?
假如消息到达队列之后,还未保存到磁盘mq就挂掉了,此时还是有很小的几率会导致消息丢失的。
这就要mq的持久化和前面的/confirm/i进行配合使用,只有当消息写入磁盘后才返回ack,那么就是在持久化之前mq挂掉了,但是由于生产者没有接收到ack信号,此时可以进行消息重发。
消费者保证 消费者手动确认消费者接收到消息,但是还未处理或者还未处理完,此时消费者进程挂掉了,比如重启或者异常断电等,此时mq认为消费者已经完成消息消费,就会从队列中删除消息,从而导致消息丢失。
那该如何避免这种情况呢?这就要用到RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也即自己的程序确定消息已经处理完成后,手动提交ack,此时如果再遇到消息未处理进程就挂掉的情况,由于没有提交ack,RabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息是不会丢的。
配置文件:
参数介绍
acknowledge-mode: manual就表示开启手动ack,该配置项的其他两个值分别是none和auto
auto:消费者根据程序执行正常或者抛出异常来决定是提交ack或者nack,不要把none和auto搞混了manual: 手动ack,用户必须手动提交ack或者nacknone: 没有ack机制
消费者实现
@RabbitListener( bindings = { @QueueBinding(value = @Queue(value = RabbitConfig.TAXI_DEAD_QUEUE, durable = "true"), exchange = @Exchange(value = RabbitConfig.TAXI_DEAD_QUEUE_EXCHANGE), key = RabbitConfig.TAXI_DEAD_KEY) } ) @RabbitHandler public void processOrder(Message massage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { TaxiBO taxiBO = JSON.parseObject(massage.getBody(), TaxiBO.class); try { //开始处理订单 logger.info("处理超时订单,订单详细信息:" + taxiBO.toString()); taxiService.taxiTimeout(taxiBO); //手动确认机制 channel.basicAck(tag, false); } catch (Exception e) { e.printStackTrace(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)