前一段时间在业务上遇到了一个MQ重复消费的问题,排查发现一个老哥在代码里写了个线程睡眠n分钟(n为客户控制)的逻辑(设计方案真是一言难尽…),导致必现消息重复消费的问题;于是接盘在业务上修改了设计方案、做了消息幂等处理。本着知其然知其所以然的原则,今天深入分析一下消息消费超时/失败是怎么重试的?
PS:RocketMQ版本4.8.0,本文中相关源码注释见GitHub中:RocketMQ:release-4.8.0。
二、源码分析在RocketMQ源码分析pullMessage:Consumer是如何从broker拉取消息的?这篇文章我们介绍了Consumer如何从Broker拉取消息的、Consumer如何处理拉取到的消息;
其中在从Broker拉取消息成功之后,会进入到PullCallback#onSuccess()方法,当拉取到消息时,首先将消息全部放入到处理队列ProcessQueue中,然后通知消费消息服务consumeMessageService开始干活。
1、入口接着上面的来看,ConsumeMessageService#submitConsumeRequest()为Consumer开始真正开始消费消息的入口;
而ConsumeMessageService是一个接口,它有两个实现:ConsumeMessageConcurrentlyService、ConsumeMessageOrderlyService,分别表示并发消费模式、顺序消费模式;因此下面我们从并发消费和顺序消费两部分分别研究消息消费超时/失败的重试机制;
我们接着ConsumeMessageConcurrentlyService#submitConsumeRequest()来看,并发消费模式下是如何处理消息消费请求的?
ConsumeMessageConcurrentlyService采用线程池的机制对消息进行分批并发消费,默认一个消息是一批;
从上图我们可以看出,在线程池中执行线程任务时,如果失败,Consumer端会自己延时5s之后重试当前消息消费任务,见ConsumeMessageConcurrentlyService#submitConsumeRequestLater()方法;
ConsumeRequest是ConsumeMessageConcurrentlyService的内部类,它作为一个线程任务,内部封装了消息消息请求的具体执行逻辑;
ConsumeRequest#run()方法主要做四个 *** 作:
- 注册业务系统自定义的消费监听器,负责具体的消息消费;并设置消息的重试Topic;执行消费监听器的consumeMessage()方法,进行真正的消费消息 *** 作;统计消息消费数据;判断消息消息是否超时、出现异常,处理消息消费结果;
@Override public void run() { if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } // todo 1、这里是执行消费前的钩子函数,也就是我们业务系统定义的消费监听器,负责具体消息的消费 MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; // 设置消息的重试topic defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); ConsumeMessageContext consumeMessageContext = null; // 如果消费者注册了消息消费者hook if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); // consumer消费前的钩子函数,类似于Spring中的BeanPostProcessor#postProcessBeforeInitialization()方法 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { // 设置每条消息被消费的时间 MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } // 2、 开始消费消息 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } // 消息消费总耗时 long consumeRT = System.currentTimeMillis() - beginTimestamp; // 根据是否出现异常等,判断处理结果 if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } // 消费超时,默认15分钟 } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } // 在钩子函数中放入消费结果 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } // 执行后置的钩子函数 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); // 类似于Spring中的BeanPostProcessor#postProcessAfterInitialization()方法 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } // 3、统计消息消费数据 ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { // 4、处理消费结果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
下面我开始本文的正题,在调用消息消费监听器后是如何处理消息消费超时 / 异常的?
2> 执行业务上自定义消费监听器导致的消费异常重试如果在自定义消费监听器中执行业务逻辑出现异常,会将hasException属性设置为true,供后置的钩子函数ConsumeMessageHook使用;并且此时status字段为null,在后面的逻辑中如果发现status字段为null,则会将其设置为RECONSUME_LATER(稍后重新消费);最终在processConsumeResult()中再根据这个status处理消费结果;
在processConsumeResult()方法中,会维护一个变量ackIndex表示当前消费请求中 第一个未ACK的消息 在msgs集合中的下标;如果消息全部消费成功,则ackIndex为msgsSize + 1;如果消息消费失败,则ackIndex为-1,表示该批消息需要全部重新消费,即将消息发送回Broker;如果发送消息回Broker失败,Consumer则延时5s后,重新执行当前消费请求。
发送回Broker的消息延时级别默认0;
我们往上追,最终确定ConsumeConcurrentlyContext的来源为ConsumeRequest#run()方法的开头处,并且后续未对其delayLevelWhenNextConsume属性做任何修改;
这个我们从ConsumeConcurrentlyContext类中也可以看到,其setDelayLevelWhenNextConsume()方法未被使用;
Broker端SendMessageProcessor#asyncConsumerSendMsgBack()方法中会处理backMsg。虽然Consumer没有传延时级别,但Broker会默认将其延时级别设置为3,然后将消息先以延时消息的机制发送到延时队列(SCHEDULE_TOPIC_XXXX)中,最多只会重试16次(可配置);重试超过16次会将消息添加到死信队列(%DLQ%+消费组)中。待消息到达投递时间(到期)后,消息转存到重试队列(%RETRY%consumerGroup)中。
Consumer端此时再接收到的该消息本质上是源自"%RETRY%+消费组名称"主题,而不是原始的topic。
Broker端如何处理的?见文章:深度剖析RocketMQ延时消息机制原理/源码;
3> 消费超时重试从ConsumeRequest#run()方法中,我们看不到任何关于消费超时重试的处理,这里只会统计一个消费超时的状态;
可以发现消费超时阈值的获取方式:defaultMQPushConsumer.getConsumeTimeout(),我们看看这个方法还在哪里被用到了;点进去发现在ProcessQueue的cleanExpiredMsg()方法中有调用它,作为判断消息是否过期的阈值(也是并发消费模式下消息过期的阈值)。
ProcessQueue#cleanExpiredMsg()方法中如果判断消息已经过期,会将消息在本地缓存msgTreeMap中清除、并以延时消息(延时级别为3)的方式发送回Broker。
Consumer端此时再接收到的该消息本质上是源自"%RETRY%+消费组名称"主题,而不是开始的那个topic。
Broker端如何处理的?见文章:深度剖析RocketMQ延时消息机制原理/源码;
对于消费超时的消息,首先会以延时消息的机制将其发送到延时队列(SCHEDULE_TOPIC_XXXX)中,待消息到达投递时间(到期)后,消息转存到重试队列(%RETRY%consumerGroup)中。这里我们可以注意到延时消息的方式会将消息又被存到CommitLog中 2 * n(重试次数)遍。
从这里我们可以看到,延时级别是3的情况下,理应10s后消息才会从延时队列投递到重试队列中,然而现象确实经过了10ms就投递了;由于消息到期的时间是从ConsumeQueue中每条记录的后64位取的,下一篇我们就研究一下CommitLog中的数据如何同步到ConsumeQueue中的?
3、顺序消费模式核心逻辑顺序消费模式和并发消费模式一样都存在消费异常重试的场景,但是由于顺序消费模式不会清理过期消息,所以不存在消费超时重试的场景。
1> 消费异常重试与并发消费不同的是顺序消费的ConsumeRequest只针对ProcessQueue和MessageQueue,而不是针对消息。其获取消息的逻辑是直接从ProcessQueue中取,一次取consumeMessageBatchMaxSize个(默认一个)。
另外:run()方法中消息消费的逻辑与非顺序消费差不多,但其关键点在于消息的消费/获取的顺序性,所以就不可避免的引入锁机制(加锁范围是针对ProcessQueue,或者说是MessageQueue,所以说RocketMQ无法做到多MessageQueue的全局顺序消费)。
出现异常的具体逻辑也基本和并发消费模式一样:
如果调用自定义消费监听器消费消息异常,则status状态为null,在后面的逻辑中如果status == null,则将SUSPEND_CURRENT_QUEUE_A_MOMENT赋值给status,进而在processConsumeResult()中处理消费结果。
在processConsumeResult()方法中,我们主要看非自动提交ACK的情况;
1)checkReconsumeTimes()首先会检查是否达到最大重试消费次数(默认是Integer.MAX_VALUE);如果没有达到最大重试次数,默认延时1s后再次开始尝试消费消息。
看一下checkReconsumeTimes()方法是如何判断重试次数的?
延时1s后,再次开启当前消费消息任务;
但是其不存在超时消费重试的概念,因为没有清理过期消息这个 *** 作:
在Consumer启动的时候会启动消息消费服务consumeMessageService,对于并发消费模式而言是定期清理过期消息;而对于顺序消费而言则是定时向Broker申请加锁,以确保消息的顺序消费。
消费异常重试机制:
出现异常的两种场景:执行消费请求异常出错、执行指定以消费监听器出错;出现异常之后会发送延时级别为0的消息到Broker,Broker端的SendMessageProcessor#asyncConsumerSendMsgBack()方法中遇到延时级别为0的消息会将其延时级别设置为(3 + 消费重试次数);然后将消息先以延时消息的机制发送到延时队列(SCHEDULE_TOPIC_XXXX)中,最多只会重试16次(可配置);重试超过16次会将消息添加到死信队列(%DLQ%+消费组)中。待消息到达投递时间(到期)后,消息转存到重试队列(%RETRY%consumerGroup)中。**Consumer端此时再次接收到的该消息本质上是源自"%RETRY%+消费组名称"主题,而不是原始的topic。
消费超时重试机制:
主要体现在并发消费模式会周期性清理过期的消息,然后将其发送回Broker,后面的步骤和消费异常重试机制一样;最终当前消费者能再次消费到重试队列(%RETRY%+consumerGroup)中的消息。 2、顺序消费模式
顺序消费模式不存在消费超时重试的机制,对于消费异常重试的逻辑基本和并发消费一样,区别在于,顺序消费模式遇到异常,延时1s后重试(再次消费),重试次数默认为Integer.MAX_VALUE;
这里我们可以发现一个问题,如果一个消息处理的特别慢 或者说消费出现异常在一直重试,那么它后面的消息就会被阻塞;进而导致消息堆积的现象。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)