1. 可靠性投递分析在使用MQ实现异步通信的过程中,有消息丢了怎么办?或者MQ消息重复了怎么办?
这个问题就需要来到本文的主题——RabbitMQ的可靠性投递。当然,RabbitMQ在设计的时候就考虑了这一点,提供 了很多保证消息可靠投递的机制。这个可以说是RabbitMQ比较突出的一个特性。
下面从RabbitMQ的工作模型分析一下可靠性措施。
在我们使用RabbitMQ发送消息的时候,有几个主要环节:
-
①代表消息从生产者发送到Broker
生产者把消息发到Broker之后,怎么知道自己的消息有没有被Broker成功接受?如果Broker不给应答,生产者不断的发送,则消息全部丢失。
-
②代表消息从Exchange路由到Queue
Exchange是一个绑定列表,他的职责是分化消息。如果他没有办法旅行他的职责怎么办?也就是说,找不到队列该怎么办?
-
③代表消息在Queue中存储
队列有自己的数据库(Mnesia),他是正真用来存储信息的。如果没有消费者消费,那么消息要一直存储在队列中。但是如果数据库有相关问题,那么如何确保数据在队列中稳定存储?
-
④代表消费者订阅消息
队列的消息是一条一条的投递的,所以只有上一条消息被消费者接受以后,才能把这条消息从数据库删除,继续投递下一条数据。
或者反过来说,如果消费者不签收,我是不能去派送下一个的,总不能丢在门口直接走吧?
下面我们就从这四个环节入手,分析如何保证消息的可靠性。
1.1 消息发送到RabbitMQ服务器第一个环节是生产者发送消息到Broker。先来说一下什么情况才会丢失?
可能是因为网络连接或者Broker的问题导致消息发送失败,生产者不能确定Broker有没有正确接受。
如果我们去设计,肯定要给生产者发送消息的接口一个应答,生产者才能知道消息有没有发送成功。我们能想到的,RabbitMQ肯定也实现了。
在RabbitMQ里面提供了两种机制服务端确认机制,也就是在生产者发送消息给RabbitMQ的服务端的时候,服务端会通过某一种方式返回一个应答,只要生产者收到了这个应答,就知道消息发送成功了。
第一种是Transaction事务模式,第二种是/confirm/i确认模式。
1.1.1 Transaction事务模式事务模式怎么用呢?在创建channel的时候,可以把信道设置成事务模式,然后就可以发布消息给RabbitMQ了。如果channel.txCommit();的方法调用成功,就说明事务提交成功,则消息一定到达了RabbitMQ中。
try { //设置事务模式 channel.txSelect(); // 发送消息 // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); //提交事务 channel.txCommit(); int i = 1 / 0; channel.txCommit(); channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); channel.txCommit(); System.out.println("消息发送成功"); } catch (Exception e) { channel.txRollback(); System.out.println("消息已经回滚"); }
如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,然后进行回滚。
在事务模式里面,只有收到了服务的的Commit-ok的指令,才能提交成功。所以可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,他是阻塞的,一条消息没有发送完毕,不能发送下一条消息,他会榨干RabbitMQ服务器的性能。所以不建议在生产环境使用。
spring boot 中的设置:
rabbitTemplate.setChannelTransacted(true);1.1.2 /confirm/i确认模式
确认模式有三种,第一种是普通模式。
1.1.2.1 普通模式在生产者这边通过调用channel.confirmSelect()方法将信道设置为/confirm/i模式,然后发送消息。一旦消息被投递到交换机之后,RabbitMQ就会发送一个确认(Basic.ACK)给生产者,也就是调用channel.waitForConfirms()返回true,这样生产者就知道消息被服务端接受了。
如果网络错误,会抛出连接异常。如果交换机不存在,会抛出404错误。
// 开启发送方确认模式 channel./confirm/iSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); // 普通/confirm/i,发送一条,确认一条 if (channel.waitFor/confirm/is()) { System.out.println("消息发送成功"); } else { System.out.println("消息发送失败"); }
这种发送一条确认一条的方式效率还不是太高,所以我们还有一种批量确认的方式。
1.1.2.2 批量确认模式批量确认,就是在开启/confirm/i模式后,先发送一批消息。
try { channel./confirm/iSelect(); for (int i = 0; i < 5; i++) { // 发送消息 // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes()); } // 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了 // 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到) // 直到所有信息都发布,只要有一个未被Broker确认就会IOException channel.waitFor/confirm/isOrDie(); System.out.println("消息发送完毕,批量确认成功"); } catch (Exception e) { // 发生异常,可能需要对所有消息进行重发 e.printStackTrace(); }
只要channel.waitForConfirmsOrDie();方法没有抛出异常,就代表消息被服务端接受了。
批量确认的方式比单条确认的方式效率要高,但是也有两个问题:
首先就是批量的数量确定。对于不同的业务,到底发送多少条消息确认一次?数量太少,效率提升不上去。数量多的话,又会带来另外一个问题:比如我们发1000条消息才确认一次,如果前面999条消息都被接受了,但是最后一条失败了,那么前面的所有数据都需要重发。
有没有一种方式,可以一边发送一遍确认?这就是异步确认模式。
1.1.2.3 异步确认模式异步确认模式需要添加一个/confirm/iListener,并且用一个SortedSet来维护一个批次中没有被确认的消息。
// 用来维护未确认消息的deliveryTag final SortedSetconfirmSet = Collections.synchronizedSortedSet(new TreeSet ()); // 这里不会打印所有响应的ACK;ACK可能有多个,有可能一次确认多条,也有可能一次确认一条 // 异步监听确认和未确认的消息 // 如果要重复运行,先停掉之前的生产者,清空队列 channel.add/confirm/iListener(new /confirm/iListener() { public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Broker未确认消息,标识:" + deliveryTag); if (multiple) { // headSet表示后面参数之前的所有元素,全部删除 /confirm/iSet.headSet(deliveryTag + 1L).clear(); } else { /confirm/iSet.remove(deliveryTag); } // 这里添加重发的方法 } public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认 System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple)); if (multiple) { // headSet表示后面参数之前的所有元素,全部删除 /confirm/iSet.headSet(deliveryTag + 1L).clear(); } else { // 只移除一个元素 /confirm/iSet.remove(deliveryTag); } System.out.println("未确认的消息:" + /confirm/iSet); } }); // 开启发送方确认模式 channel./confirm/iSelect(); for (int i = 0; i < 10; i++) { long nextSeqNo = channel.getNextPublishSeqNo(); // 发送消息 // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes()); /confirm/iSet.add(nextSeqNo); } System.out.println("所有消息:" + /confirm/iSet);
Spring boot :
/confirm/i模式实在channel中开启的,RabbitTemplate对channel进行了封装。
rabbitTemplate.set/confirm/iCallback()1.2 消息从交换机路由到队列
在什么情况下,消息会无法路由到正确的队列呢?
可以由于routinKey错误,或者队列不存在。
我们有两种处理无法路由的消息,一种就是让服务端重发给生产者,一种是让交换机路由到另一个备份的交换机。
1.2.1 消息回发channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("=========监听器收到了无法路由,被返回的消息============"); System.out.println("replyText:"+replyText); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); System.out.println("message:"+new String(body)); } });
Spring boot 消息回发的方式:
rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback();1.2.2 备份交换机
在创建交换机的时候,从属性中指定备份交换机即可:
// 在声明交换机的时候指定备份交换机 Map1.3 消息在队列存储arguments = new HashMap (); arguments.put("alternate-exchange", "ALTERNATE_EXCHANGE"); channel.exchangeDeclare("TEST_EXCHANGE", "topic", false, false, false, arguments);
第三个环节是消息在队列中存储,如果没有消费的话,则队列一直存在于数据库中。
如果RabbitMQ的服务或者硬件发生故障,比如系统宕机、重启、关闭等,可能会导致内存中的消息丢失,所以我们要把消息本身和元数据(队列、交换机、绑定信息)都保存到磁盘中。
1.3.1 队列持久化声明队列的时候需要设置相关属性:
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Maparguments channel.queueDeclare(QUEUE_NAME, false, false, false, null);
durable:没有持久化的队列,保存在内存中,服务重启后队列和消息都会消失。
autoDelete:没有消费者连接的时候,自动删除。
exclusive:排他性队列的特点是:
- 只对首次声明他的连接可见
- 会在其连接断开的时候自动删除
// String exchange, String type, boolean durable, boolean autoDelete, Map1.3.3 消息持久化arguments channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 持久化消息 .contentEncoding("UTF-8") .expiration("10000") // TTL .build();
如果消息没有持久化,保存在内存中,队列还在,但是重启后消息会丢失。
1.3.4 集群提高MQ服务的可用性,保障消息的传输,我们需要有多个RabbitMQ的节点进行搭建。
1.4 消息投递到消费者(ACK确认机制) 1.4.1 手动ACK及自动ACK如果消息者收到消息后没来得及处理即发生异常、或者处理过程中发生异常,会导致④失败。服务端应该以某种得知消费者对消息的接受情况,并决定是否重新投递这条消息给消费者。
RabbitMQ提供了消费者的消息确认机制,消费者可以自动或者手动的发送ACK给服务端。
如果没有ACK会怎么办?永远等待下去?也不会。
没有收到ACK消息,消费者断开连接后,RabbitMQ会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。(消费者代码有问题则另算…)
消费者怎么给Broker应答呢?有两种方式。
- 自动ACK
- 手动ACK
自动ACK是默认的情况。也就是我们没有在消费者处编写ACK的代码,消费者会在收到消息的时候就自动发送ACK,而不是在方法执行完毕的时候发送ACK。(并不关心消息是否正常)
如果想要等消息消费完毕或者方法执行完毕才发送ACK,需要先把自动ACK设置成手动ACK。把autoAck设置成false。
// String queue, boolean autoAck, Consumer callback channel.basicConsume(QUEUE_NAME, false, consumer);
这个时候RabbitMQ会等待消费者显示地回复ACK后才从队列中移除消息。
channel.basicAck(envelope.getDeliveryTag(), true);1.4.2 Spring boot 配置
spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual1.4.3 配置文件处理
在SimpleRabbitListenerContainer或者SimpleRabbitListenerContainerFactory中配置
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); //设置ACK factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setAutoStartup(true); return factory; }
参数区别
NONE:自动ACK
MANUAL:手动ACK
AUTO:如果方法未抛异常,则发送ack;如果方法抛出异常,并且不是AmqpRejectAndDontRequeueException则发送nack,并且重新入队列;如果抛出异常是AmqpRejectAndDontRequeueException则发送nack且不会重新入队列。
1.4.4 拒绝消息如果消费出了问题,确实是不能发送ACK告诉服务端成功消费了怎么办?当然也有办法拒绝消息的指令,而且还可以让消息重新入队给其他消费者消费。
如果消息无法处理或者消费失败,也有两种拒绝的方式。
- basicReject()拒绝单条
- basicNack()批量拒绝
if (msg.contains("拒收")) { // 拒绝消息 // requeue:是否重新入队列,true:是;false:直接丢弃,相当于告诉队列可以直接删除掉 // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费 channel.basicReject(envelope.getDeliveryTag(), false); } else if (msg.contains("异常")) { // 批量拒绝 // requeue:是否重新入队列 // TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费 channel.basicNack(envelope.getDeliveryTag(), true, false); } else { // 手工应答 // 如果不应答,队列中的消息会一直存在,重新连接的时候会重复消费 channel.basicAck(envelope.getDeliveryTag(), true); }
如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(只有一个消费者的时候,这种方式可能会出现无线循环重复消费的情况,可以投递到新的队列或者直接打印异常信息)。
1.4.5 总结从生产者到Broker,交换机到队列,队列本身,队列到消费者,我们都有相应的方法知道消费有没有正常流转,或者说当消息没有正常流转的时候采取相关措施。
1.5 消费者回调场景:
服务端收到了ACK或者NACK,生产者会知道吗?即使消费者没有接收到消息,或者消费出现异常,生产者也是完全不知情的。
这是一个很正常的情况,但是如果现在为了保证数据一致性,生产者必须知道消费者有没有消费成功,应该怎么 *** 作?
这种情景有两种解决方法:
- 消费者收到消息,处理完毕后,调用生产者的API
- 消费者收到消息,处理完毕后,发送一条响应消息给生产者。
如果消费者状态是正常的,每一条消息都可以处理。只是在响应或者调用API的时候出现了问题,会不会出现消息的重复处理?例如:存款1000元,ATM重发了三次存款消息,核心系统一共处理了四次,我们账户余额直接增加4000元。
所以,为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ服务端是没有这种控制的(同一批的消息有个递增的DeliveryTag),他并不知道对于你的业务来说什么才是重复的消息。所以这个只能在消费端控制。
如何避免重复消费:
消息出现重复可能会有如下几个原因:
- 生产者的问题,环节①重复发送消息。比如在开启了/confirm/i模式但未收到确认,消费重复投递。
- 环节②出现了问题,由于消费者未发送ACK或者其他原因,消息重复消费。
- 生产者代码或者网络问题
对于重复的消息,可以对每一条消息生成一个唯一的业务ID,通过日志或者消费落库来做重复控制。
可以参考微信支付,有的时候会提示可能是重复支付。他根据业务要素一致性来进行判断(付款人ID,商户ID,金额,地点,时间等),从而判断可能是同一笔交易。
1.7 消息顺序性在RabbitMQ中,一个队列有多个消费者,由于不同的消费者消费消息的速度是不一样的,顺序是无法得到保证的。所以只有一个队列仅有一个消费者的情况下才能确保顺序消费。(不同的业务消息发送到不同的专用队列中)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)