RabbitMQ消息可靠性投递详解

RabbitMQ消息可靠性投递详解,第1张

RabbitMQ消息可靠性投递详解

在使用MQ实现异步通信的过程中,有消息丢了怎么办?或者MQ消息重复了怎么办?

1. 可靠性投递分析

这个问题就需要来到本文的主题——RabbitMQ的可靠性投递。当然,RabbitMQ在设计的时候就考虑了这一点,提供 了很多保证消息可靠投递的机制。这个可以说是RabbitMQ比较突出的一个特性。

下面从RabbitMQ的工作模型分析一下可靠性措施。

在我们使用RabbitMQ发送消息的时候,有几个主要环节:

  • ①代表消息从生产者发送到Broker

    生产者把消息发到Broker之后,怎么知道自己的消息有没有被Broker成功接受?如果Broker不给应答,生产者不断的发送,则消息全部丢失。

  • ②代表消息从Exchange路由到Queue

    Exchange是一个绑定列表,他的职责是分化消息。如果他没有办法旅行他的职责怎么办?也就是说,找不到队列该怎么办?

  • ③代表消息在Queue中存储

    队列有自己的数据库(Mnesia),他是正真用来存储信息的。如果没有消费者消费,那么消息要一直存储在队列中。但是如果数据库有相关问题,那么如何确保数据在队列中稳定存储?

  • ④代表消费者订阅消息

    队列的消息是一条一条的投递的,所以只有上一条消息被消费者接受以后,才能把这条消息从数据库删除,继续投递下一条数据。

    或者反过来说,如果消费者不签收,我是不能去派送下一个的,总不能丢在门口直接走吧?

下面我们就从这四个环节入手,分析如何保证消息的可靠性。

1.1 消息发送到RabbitMQ服务器

第一个环节是生产者发送消息到Broker。先来说一下什么情况才会丢失?

可能是因为网络连接或者Broker的问题导致消息发送失败,生产者不能确定Broker有没有正确接受。

如果我们去设计,肯定要给生产者发送消息的接口一个应答,生产者才能知道消息有没有发送成功。我们能想到的,RabbitMQ肯定也实现了。

在RabbitMQ里面提供了两种机制服务端确认机制,也就是在生产者发送消息给RabbitMQ的服务端的时候,服务端会通过某一种方式返回一个应答,只要生产者收到了这个应答,就知道消息发送成功了。

第一种是Transaction事务模式,第二种是/confirm/i确认模式。

1.1.1 Transaction事务模式

事务模式怎么用呢?在创建channel的时候,可以把信道设置成事务模式,然后就可以发布消息给RabbitMQ了。如果channel.txCommit();的方法调用成功,就说明事务提交成功,则消息一定到达了RabbitMQ中。

try {
    //设置事务模式
    channel.txSelect();
    // 发送消息
    // String exchange, String routingKey, BasicProperties props, byte[] body
    channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
    //提交事务
    channel.txCommit();
    int i = 1 / 0;
    channel.txCommit();
    channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
    channel.txCommit();
    System.out.println("消息发送成功");
} catch (Exception e) {
    channel.txRollback();
    System.out.println("消息已经回滚");
}

如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,然后进行回滚。

在事务模式里面,只有收到了服务的的Commit-ok的指令,才能提交成功。所以可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,他是阻塞的,一条消息没有发送完毕,不能发送下一条消息,他会榨干RabbitMQ服务器的性能。所以不建议在生产环境使用。

spring boot 中的设置:

rabbitTemplate.setChannelTransacted(true);
1.1.2 /confirm/i确认模式

确认模式有三种,第一种是普通模式。

1.1.2.1 普通模式

在生产者这边通过调用channel.confirmSelect()方法将信道设置为/confirm/i模式,然后发送消息。一旦消息被投递到交换机之后,RabbitMQ就会发送一个确认(Basic.ACK)给生产者,也就是调用channel.waitForConfirms()返回true,这样生产者就知道消息被服务端接受了。

如果网络错误,会抛出连接异常。如果交换机不存在,会抛出404错误。

// 开启发送方确认模式
channel./confirm/iSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 普通/confirm/i,发送一条,确认一条
if (channel.waitFor/confirm/is()) {
    System.out.println("消息发送成功");
} else {
    System.out.println("消息发送失败");
}

这种发送一条确认一条的方式效率还不是太高,所以我们还有一种批量确认的方式。

1.1.2.2 批量确认模式

批量确认,就是在开启/confirm/i模式后,先发送一批消息。

try {
    channel./confirm/iSelect();
    for (int i = 0; i < 5; i++) {
        // 发送消息
        // String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes());
    }
    // 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了
    // 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到)
    // 直到所有信息都发布,只要有一个未被Broker确认就会IOException
    channel.waitFor/confirm/isOrDie();
    System.out.println("消息发送完毕,批量确认成功");
} catch (Exception e) {
    // 发生异常,可能需要对所有消息进行重发
    e.printStackTrace();
}

只要channel.waitForConfirmsOrDie();方法没有抛出异常,就代表消息被服务端接受了。

批量确认的方式比单条确认的方式效率要高,但是也有两个问题:

首先就是批量的数量确定。对于不同的业务,到底发送多少条消息确认一次?数量太少,效率提升不上去。数量多的话,又会带来另外一个问题:比如我们发1000条消息才确认一次,如果前面999条消息都被接受了,但是最后一条失败了,那么前面的所有数据都需要重发。

有没有一种方式,可以一边发送一遍确认?这就是异步确认模式。

1.1.2.3 异步确认模式

异步确认模式需要添加一个/confirm/iListener,并且用一个SortedSet来维护一个批次中没有被确认的消息。

// 用来维护未确认消息的deliveryTag
final SortedSet confirmSet = Collections.synchronizedSortedSet(new TreeSet());

// 这里不会打印所有响应的ACK;ACK可能有多个,有可能一次确认多条,也有可能一次确认一条
// 异步监听确认和未确认的消息
// 如果要重复运行,先停掉之前的生产者,清空队列
channel.add/confirm/iListener(new /confirm/iListener() {
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Broker未确认消息,标识:" + deliveryTag);
        if (multiple) {
            // headSet表示后面参数之前的所有元素,全部删除
            /confirm/iSet.headSet(deliveryTag + 1L).clear();
        } else {
            /confirm/iSet.remove(deliveryTag);
        }
        // 这里添加重发的方法
    }

    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        // 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认
        System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
        if (multiple) {
            // headSet表示后面参数之前的所有元素,全部删除
            /confirm/iSet.headSet(deliveryTag + 1L).clear();
        } else {
            // 只移除一个元素
            /confirm/iSet.remove(deliveryTag);
        }
        System.out.println("未确认的消息:" + /confirm/iSet);
    }
});

// 开启发送方确认模式
channel./confirm/iSelect();
for (int i = 0; i < 10; i++) {
    long nextSeqNo = channel.getNextPublishSeqNo();
    // 发送消息
    // String exchange, String routingKey, BasicProperties props, byte[] body
    channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes());
    /confirm/iSet.add(nextSeqNo);
}
System.out.println("所有消息:" + /confirm/iSet);

Spring boot :

/confirm/i模式实在channel中开启的,RabbitTemplate对channel进行了封装。

rabbitTemplate.set/confirm/iCallback()
1.2 消息从交换机路由到队列

在什么情况下,消息会无法路由到正确的队列呢?

可以由于routinKey错误,或者队列不存在。

我们有两种处理无法路由的消息,一种就是让服务端重发给生产者,一种是让交换机路由到另一个备份的交换机。

1.2.1 消息回发
channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int replyCode,
                             String replyText,
                             String exchange,
                             String routingKey,
                             AMQP.BasicProperties properties,
                             byte[] body)
        throws IOException {
        System.out.println("=========监听器收到了无法路由,被返回的消息============");
        System.out.println("replyText:"+replyText);
        System.out.println("exchange:"+exchange);
        System.out.println("routingKey:"+routingKey);
        System.out.println("message:"+new String(body));
    }
});

Spring boot 消息回发的方式:

rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback();
1.2.2 备份交换机

在创建交换机的时候,从属性中指定备份交换机即可:

// 在声明交换机的时候指定备份交换机
Map arguments = new HashMap();
arguments.put("alternate-exchange", "ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE", "topic", false, false, false, arguments);
1.3 消息在队列存储

第三个环节是消息在队列中存储,如果没有消费的话,则队列一直存在于数据库中。

如果RabbitMQ的服务或者硬件发生故障,比如系统宕机、重启、关闭等,可能会导致内存中的消息丢失,所以我们要把消息本身和元数据(队列、交换机、绑定信息)都保存到磁盘中。

1.3.1 队列持久化

声明队列的时候需要设置相关属性:

// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

durable:没有持久化的队列,保存在内存中,服务重启后队列和消息都会消失。

autoDelete:没有消费者连接的时候,自动删除。

exclusive:排他性队列的特点是:

  • 只对首次声明他的连接可见
  • 会在其连接断开的时候自动删除
1.3.2 交换机持久
// String exchange, String type, boolean durable, boolean autoDelete, Map arguments
channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
1.3.3 消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2) // 持久化消息
        .contentEncoding("UTF-8")
        .expiration("10000") // TTL
        .build();

如果消息没有持久化,保存在内存中,队列还在,但是重启后消息会丢失。

1.3.4 集群

提高MQ服务的可用性,保障消息的传输,我们需要有多个RabbitMQ的节点进行搭建。

1.4 消息投递到消费者(ACK确认机制) 1.4.1 手动ACK及自动ACK

如果消息者收到消息后没来得及处理即发生异常、或者处理过程中发生异常,会导致④失败。服务端应该以某种得知消费者对消息的接受情况,并决定是否重新投递这条消息给消费者。

RabbitMQ提供了消费者的消息确认机制,消费者可以自动或者手动的发送ACK给服务端。

如果没有ACK会怎么办?永远等待下去?也不会。

没有收到ACK消息,消费者断开连接后,RabbitMQ会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。(消费者代码有问题则另算…)

消费者怎么给Broker应答呢?有两种方式。

  • 自动ACK
  • 手动ACK

自动ACK是默认的情况。也就是我们没有在消费者处编写ACK的代码,消费者会在收到消息的时候就自动发送ACK,而不是在方法执行完毕的时候发送ACK。(并不关心消息是否正常)

如果想要等消息消费完毕或者方法执行完毕才发送ACK,需要先把自动ACK设置成手动ACK。把autoAck设置成false。

// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);

这个时候RabbitMQ会等待消费者显示地回复ACK后才从队列中移除消息。

channel.basicAck(envelope.getDeliveryTag(), true);
1.4.2 Spring boot 配置
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
1.4.3 配置文件处理

在SimpleRabbitListenerContainer或者SimpleRabbitListenerContainerFactory中配置

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    //设置ACK
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    factory.setAutoStartup(true);
    return factory;
}

参数区别

NONE:自动ACK

MANUAL:手动ACK

AUTO:如果方法未抛异常,则发送ack;如果方法抛出异常,并且不是AmqpRejectAndDontRequeueException则发送nack,并且重新入队列;如果抛出异常是AmqpRejectAndDontRequeueException则发送nack且不会重新入队列。

1.4.4 拒绝消息

如果消费出了问题,确实是不能发送ACK告诉服务端成功消费了怎么办?当然也有办法拒绝消息的指令,而且还可以让消息重新入队给其他消费者消费。

如果消息无法处理或者消费失败,也有两种拒绝的方式。

  • basicReject()拒绝单条
  • basicNack()批量拒绝
if (msg.contains("拒收")) {
    // 拒绝消息
    // requeue:是否重新入队列,true:是;false:直接丢弃,相当于告诉队列可以直接删除掉
    // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
    channel.basicReject(envelope.getDeliveryTag(), false);
} else if (msg.contains("异常")) {
    // 批量拒绝
    // requeue:是否重新入队列
    // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费
    channel.basicNack(envelope.getDeliveryTag(), true, false);
} else {
    // 手工应答
    // 如果不应答,队列中的消息会一直存在,重新连接的时候会重复消费
    channel.basicAck(envelope.getDeliveryTag(), true);
}

如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(只有一个消费者的时候,这种方式可能会出现无线循环重复消费的情况,可以投递到新的队列或者直接打印异常信息)。

1.4.5 总结

从生产者到Broker,交换机到队列,队列本身,队列到消费者,我们都有相应的方法知道消费有没有正常流转,或者说当消息没有正常流转的时候采取相关措施。

1.5 消费者回调

场景:

服务端收到了ACK或者NACK,生产者会知道吗?即使消费者没有接收到消息,或者消费出现异常,生产者也是完全不知情的。

这是一个很正常的情况,但是如果现在为了保证数据一致性,生产者必须知道消费者有没有消费成功,应该怎么 *** 作?

这种情景有两种解决方法:

  • 消费者收到消息,处理完毕后,调用生产者的API
  • 消费者收到消息,处理完毕后,发送一条响应消息给生产者。
1.6 消息幂等性

如果消费者状态是正常的,每一条消息都可以处理。只是在响应或者调用API的时候出现了问题,会不会出现消息的重复处理?例如:存款1000元,ATM重发了三次存款消息,核心系统一共处理了四次,我们账户余额直接增加4000元。

所以,为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ服务端是没有这种控制的(同一批的消息有个递增的DeliveryTag),他并不知道对于你的业务来说什么才是重复的消息。所以这个只能在消费端控制。

如何避免重复消费:

消息出现重复可能会有如下几个原因:

  • 生产者的问题,环节①重复发送消息。比如在开启了/confirm/i模式但未收到确认,消费重复投递。
  • 环节②出现了问题,由于消费者未发送ACK或者其他原因,消息重复消费。
  • 生产者代码或者网络问题

对于重复的消息,可以对每一条消息生成一个唯一的业务ID,通过日志或者消费落库来做重复控制。

可以参考微信支付,有的时候会提示可能是重复支付。他根据业务要素一致性来进行判断(付款人ID,商户ID,金额,地点,时间等),从而判断可能是同一笔交易。

1.7 消息顺序性

在RabbitMQ中,一个队列有多个消费者,由于不同的消费者消费消息的速度是不一样的,顺序是无法得到保证的。所以只有一个队列仅有一个消费者的情况下才能确保顺序消费。(不同的业务消息发送到不同的专用队列中)

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5653178.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存