RabbitMQ消息可靠性保障

RabbitMQ消息可靠性保障,第1张

RabbitMQ消息可靠性保障


从上面的图可以看到,消息的投递有三个对象参与:生产者、broker、消费者

生产者保证

生产者发送消息到broker时,要保证消息的可靠性,主要的方案有以下2种

    发送发确认失败通知

在不做任何配置的情况下,生产者是不知道消息是否真正到达RabbitMQ,也就是说消息发布 *** 作不返回任何消息给生产者。

失败通知

如果出现消息无法投递到队列会出现失败通知

那么怎么保证我们消息发布的可靠性?这里我们就可以启动失败通知,在原生编程中在发送消息时设置 mandatory标志,即可开启故障检测模式。

注意:它只会让 RabbitMQ 向你通知失败,而不会通知成功。如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,投递到队列的消息可能会没来得及持久化就宕机丢失了。

实现方式
spring配置:

关键代码,注意需要发送者实现 ReturnCallback 接口方可实现失败通知

@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
	//消息体为空直接返回
	if (null == message) {
		return;
	}
	TaxiBO taxiBO = JSON.parseObject(message.getBody(), TaxiBO.class);
	if (null != taxiBO) {
		//删除rediskey
		redisHelper.handelAccountTaxi(taxiBO.getAccountId());
		//记录错误日志
		recordErrorMessage(taxiBO, replyText, exchange, routingKey, message,
		replyCode);
	}
}

遇到的问题
如果消息正确路由到队列,则发布者不会受到任何通知。带来的问题是无法确保发布消息一定是成功的,我们可以使用RabbitMQ的发送方确认来实现,它不仅仅在路由失败的时候给我们发送消息,并且能够在消息路由成功的时候也给我们发送消息。

发送方确认

发送方确认是指生产者投递消息后,如果 Broker 接收到消息,则会给生产者一个应答。生产者进行接收应答,用来确认这条消息是否正常的发送到 Broker,这种方式也是消息可靠性投递的核心保障
rabbitmq消息发送分为两个阶段:

    将消息发送到broker,即发送到exchage交换机消息通过交换机exchange被路由到队列queue

一旦消息投递到队列,队列则会向生产者发送一个通知,如果设置了消息持久化到磁盘,则会等待消息持久化到磁盘之后再发送通知

注意:发送发确认只有出现RabbitMQ内部错误无法投递才会出现发送发确认失败。

发送方确认模式需要分两种情况下列来看
①不可路由
当前消息到达交换器后对于发送者确认是成功的

首先当RabbitMQ交换器不可路由时,消息也根本不会投递到队列中,所以这里只管到交换器的路径,当消息成功送到交换器后,就会进行确认 *** 作。

另外在这过程中,生产者收到了确认消息后,那么因为消息无法路由,所以该消息也是无效的,无法投递到队列,所以一般情况下这里会结合失败通知来一同使用,这里一般会进行设置 mandatory模式,失败则会调用addReturnListener监听器来进行处理。

②可以路由
只要消息能够到达队列即可进行确认,一般是RabbitMQ发生内部错误才会出现失败

可以路由的消息,要等到消息被投递到所有匹配的队列之后,broker会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。

如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。

使用方式

spring配置

spring:
	rabbitmq:
		# 开启消息确认机制
		publisher-/confirm/i-type: correlated

关键代码,注意需要发送者实现 ConfirmCallback 接口方可实现失败通知

@Override
public void /confirm/i(CorrelationData correlationData, Boolean ack, String cause)
{
	//只有异常的数据才需要处理
	if (!ack) {
		//关联数据为空直接返回
		if (correlationData == null) {
			return;
		}
		//检查返回消息是否为null
		if (null != correlationData.getReturnedMessage()) {
			TaxiBO taxiBO =
			JSON.parseObject(correlationData.getReturnedMessage().getBody(), TaxiBO.class);
			//处理消息还原用户未打车状态
			redisHelper.handelAccountTaxi(taxiBO.getAccountId());
			//获取交换器
			String exchange =
			correlationData.getReturnedMessage().getMessageProperties().getHeader("SEND_EXCH
ANGE");
			//获取队列信息
			String routingKey =
			correlationData.getReturnedMessage().getMessageProperties().getHeader("SEND_ROUT
ING_KEY");
			//获取当前的消息体
			Message message = correlationData.getReturnedMessage();
			//记录错误日志
			recordErrorMessage(taxiBO, cause, exchange, routingKey, message,
			-1);
		}
	}
}
Broker丢失消息

前面我们从生产者的角度分析了消息可靠性传输的原理和实现,这一部分我们从broker的角度来看一下如何能保证消息的可靠性传输。

假设有现在一种情况,生产者已经成功将消息发送到了交换机,并且交换机也成功的将消息路由到了队列中,但是在消费者还未进行消费时,mq挂掉了,那么重启mq之后消息还会存在吗?如果消息不存在,那就造成了消息的丢失,也就不能保证消息的可靠性传输了。

也就是现在的问题变成了如何在mq挂掉重启之后还能保证消息是存在的?

开启RabbitMQ的持久化,也即消息写入后会持久化到磁盘,此时即使mq挂掉了,重启之后也会自动读取之前存储的额数据

①持久化队列:

@Bean
public Queue queue(){
	return new Queue(queueName,true);
}

②持久化交换器:

@Bean
DirectExchange directExchange() {
	return new DirectExchange(exchangeName,true,false);
}

③发送持久化消息
发送消息时,设置消息的deliveryMode=2。如果使用SpringBoot的话,发送消息时自动设置deliveryMode=2,不需要人工再去设置

Broker总结:
通过以上方式,可以保证大部分消息在broker不会丢失,但是还是有很小的概率会丢失消息,什么情况下会丢失呢?

假如消息到达队列之后,还未保存到磁盘mq就挂掉了,此时还是有很小的几率会导致消息丢失的。

这就要mq的持久化和前面的/confirm/i进行配合使用,只有当消息写入磁盘后才返回ack,那么就是在持久化之前mq挂掉了,但是由于生产者没有接收到ack信号,此时可以进行消息重发。

消费者保证 消费者手动确认

消费者接收到消息,但是还未处理或者还未处理完,此时消费者进程挂掉了,比如重启或者异常断电等,此时mq认为消费者已经完成消息消费,就会从队列中删除消息,从而导致消息丢失。

那该如何避免这种情况呢?这就要用到RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也即自己的程序确定消息已经处理完成后,手动提交ack,此时如果再遇到消息未处理进程就挂掉的情况,由于没有提交ack,RabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息是不会丢的。

配置文件:

参数介绍
acknowledge-mode: manual就表示开启手动ack,该配置项的其他两个值分别是none和auto

auto:消费者根据程序执行正常或者抛出异常来决定是提交ack或者nack,不要把none和auto搞混了manual: 手动ack,用户必须手动提交ack或者nacknone: 没有ack机制

消费者实现

@RabbitListener(
bindings =
{
@QueueBinding(value = @Queue(value =
RabbitConfig.TAXI_DEAD_QUEUE, durable = "true"),
exchange = @Exchange(value =
RabbitConfig.TAXI_DEAD_QUEUE_EXCHANGE), key = RabbitConfig.TAXI_DEAD_KEY)
}
)
@RabbitHandler
public void processOrder(Message massage, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
TaxiBO taxiBO = JSON.parseObject(massage.getBody(), TaxiBO.class);
try {
	//开始处理订单
	logger.info("处理超时订单,订单详细信息:" + taxiBO.toString());
	taxiService.taxiTimeout(taxiBO);
	//手动确认机制
	channel.basicAck(tag, false);
}
catch (Exception e) {
	e.printStackTrace();
}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5710197.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存