并发消费一失败重试顺序消费一失败阻塞总结
并发消费一失败重试不管消费成功与否 都会更新消费进度 【对于broker来说 没有失败 消息都会消费成功,其实就是修改消费偏移量,consume端消费失败的会在重试主题创建新的消息】
计算ackIndex根据ack index 来决定是否发送到重试队列 %RETRY%+consumeGroup消费失败需要重发消息如果发送到重试队列失败 则需要兜底重新消费[重试次数也增加]移除ProcessQueue中处理过的消息
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; 处理统计信息 以及计算ackIndex switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } 核心: 重试逻辑: 如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: 根据ack index 来决定是否发送到重试队列 %RETRY%+consumeGroup List顺序消费一失败阻塞msgBackFailed = new ArrayList (consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); 集群模式消费失败需要重发消息【先remotingClient发送,失败采用内部defaultProducer发送】 boolean result = this.sendMessageBack(msg, context); 如果发送到重试队列失败 则需要兜底重新消费 if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } 进行兜底消费 if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); // 这里需要注意 如果sendMessageBack 发送失败 则会尝试[延迟5s]重新消费 this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } 移除ProcessQueue中处理过的消息 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); 不管消费成功与否 都会更新消费进度 【对于broker来说 没有失败 消息都会消费成功,其实就是修改消费偏移量,consume端消费失败的会在重试主题创建新的消息】 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
失败时自旋阻塞消费失败消费间隔1s失败消费次数16次超过16次消费失败,回发重试队列,不在进行消费重试队列最多16次则按照重试时间间隔进行重试队列16次均失败则存储死信队列
public boolean processConsumeResult( final List总结msgs, final ConsumeOrderlyStatus status, final ConsumeOrderlyContext context, final ConsumeRequest consumeRequest ) { boolean continueConsume = true; long commitOffset = -1L; // 默认自动提交 if (context.isAutoCommit()) { switch (status) { case COMMIT: case ROLLBACK: case SUCCESS: commitOffset = consumeRequest.getProcessQueue().commit(); this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); 超过16次则发送到重试队列 if (checkReconsumeTimes(msgs)) { 将consumingMsgOrderlyTreeMap正在消费的消息重新添加到msgTreeMap consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); 一秒后重新消费 this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } else { commitOffset = consumeRequest.getProcessQueue().commit(); } break; default: break; } ......删除非自动提交代码 根据消费进度 if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); } return continueConsume; }
消费失败均会回传重试队列回传是构建新的消息顺序消费回发重试队列前 会自旋阻塞消费消费失败处理完毕,需更新offsetStore消费进度管理
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)