rocketmq核心源码分析第二十篇一消息消费五步曲一消费失败并发重试与顺序阻塞

rocketmq核心源码分析第二十篇一消息消费五步曲一消费失败并发重试与顺序阻塞,第1张

rocketmq核心源码分析第二十篇一消息消费五步曲一消费失败并发重试与顺序阻塞

文章目录

并发消费一失败重试顺序消费一失败阻塞总结

并发消费一失败重试

不管消费成功与否 都会更新消费进度 【对于broker来说 没有失败 消息都会消费成功,其实就是修改消费偏移量,consume端消费失败的会在重试主题创建新的消息】

计算ackIndex根据ack index 来决定是否发送到重试队列 %RETRY%+consumeGroup消费失败需要重发消息如果发送到重试队列失败 则需要兜底重新消费[重试次数也增加]移除ProcessQueue中处理过的消息

public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;
    处理统计信息 以及计算ackIndex
    switch (status) {
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    核心: 重试逻辑:  如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            根据ack index 来决定是否发送到重试队列 %RETRY%+consumeGroup
            List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                集群模式消费失败需要重发消息【先remotingClient发送,失败采用内部defaultProducer发送】
                boolean result = this.sendMessageBack(msg, context);
                如果发送到重试队列失败 则需要兜底重新消费
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }
            进行兜底消费
            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);
                // 这里需要注意 如果sendMessageBack 发送失败 则会尝试[延迟5s]重新消费
                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }
    移除ProcessQueue中处理过的消息
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    不管消费成功与否 都会更新消费进度 【对于broker来说 没有失败 消息都会消费成功,其实就是修改消费偏移量,consume端消费失败的会在重试主题创建新的消息】
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}
顺序消费一失败阻塞

失败时自旋阻塞消费失败消费间隔1s失败消费次数16次超过16次消费失败,回发重试队列,不在进行消费重试队列最多16次则按照重试时间间隔进行重试队列16次均失败则存储死信队列

 public boolean processConsumeResult(
            final List msgs,
            final ConsumeOrderlyStatus status,
            final ConsumeOrderlyContext context,
            final ConsumeRequest consumeRequest
    ) {
    boolean continueConsume = true;
    long commitOffset = -1L;
    // 默认自动提交
    if (context.isAutoCommit()) {
        switch (status) {
            case COMMIT:
            case ROLLBACK:
            case SUCCESS:
                commitOffset = consumeRequest.getProcessQueue().commit();
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                break;
            case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                超过16次则发送到重试队列
                if (checkReconsumeTimes(msgs)) {
                    将consumingMsgOrderlyTreeMap正在消费的消息重新添加到msgTreeMap
                    consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                    一秒后重新消费
                    this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                } else {
                    commitOffset = consumeRequest.getProcessQueue().commit();
                }
                break;
            default:
                break;
        }
        ......删除非自动提交代码
    根据消费进度
    if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
    }

    return continueConsume;
}
总结

消费失败均会回传重试队列回传是构建新的消息顺序消费回发重试队列前 会自旋阻塞消费消费失败处理完毕,需更新offsetStore消费进度管理

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5722492.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存