全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十六)延时队列

全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十六)延时队列,第1张

全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十六)延时队列

第二步:转发消息延迟主题的CosumeQueue中

CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递

投递时间=消息存储时间(storeTimestamp) + 延迟级别对应的时间

需要注意的是,会将计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中

相关源码参见:

org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean)

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC) {
    return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
}
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
    final boolean readBody) {
    try {
        // 1 TOTAL SIZE
        int totalSize = byteBuffer.getInt();

        // 2 MAGIC CODE
        int magicCode = byteBuffer.getInt();
        switch (magicCode) {
            case MESSAGE_MAGIC_CODE:
                break;
            case BLANK_MAGIC_CODE:
                return new DispatchRequest(0, true );
            default:
                log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
                return new DispatchRequest(-1, false );
        }

        byte[] bytesContent = new byte[totalSize];

        int bodyCRC = byteBuffer.getInt();

        int queueId = byteBuffer.getInt();

        int flag = byteBuffer.getInt();

        long queueOffset = byteBuffer.getLong();

        long physicOffset = byteBuffer.getLong();

        int sysFlag = byteBuffer.getInt();

        long bornTimeStamp = byteBuffer.getLong();

        ByteBuffer byteBuffer1;
        if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
            byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4);
        } else {
            byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4);
        }

        long storeTimestamp = byteBuffer.getLong();

        ByteBuffer byteBuffer2;
        if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
            byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4);
        } else {
            byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4);
        }

        int reconsumeTimes = byteBuffer.getInt();

        long preparedTransactionOffset = byteBuffer.getLong();

        int bodyLen = byteBuffer.getInt();
        if (bodyLen > 0) {
            if (readBody) {
                byteBuffer.get(bytesContent, 0, bodyLen);

                if (checkCRC) {
                    int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
                    if (crc != bodyCRC) {
                        log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
                        return new DispatchRequest(-1, false);
                    }
                }
            } else {
                byteBuffer.position(byteBuffer.position() + bodyLen);
            }
        }

        byte topicLen = byteBuffer.get();
        byteBuffer.get(bytesContent, 0, topicLen);
        String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);

        long tagsCode = 0;
        String keys = "";
        String uniqKey = null;

        short propertiesLength = byteBuffer.getShort();
        Map propertiesMap = null;
        if (propertiesLength > 0) {
            byteBuffer.get(bytesContent, 0, propertiesLength);
            String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
            propertiesMap = MessageDecoder.string2messageProperties(properties);

            keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);

            uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);

            String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
            if (tags != null && tags.length() > 0) {
                tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
            }

            // Timing message processing
            {
                String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
                    int delayLevel = Integer.parseInt(t);

                    if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                    }

                    if (delayLevel > 0) {
                        tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                            storeTimestamp);
                    }
                }
            }
        }

        int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);
        if (totalSize != readLength) {
            doNothingForDeadCode(reconsumeTimes);
            doNothingForDeadCode(flag);
            doNothingForDeadCode(bornTimeStamp);
            doNothingForDeadCode(byteBuffer1);
            doNothingForDeadCode(byteBuffer2);
            log.error(
                "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
                totalSize, readLength, bodyLen, topicLen, propertiesLength);
            return new DispatchRequest(totalSize, false);
        }

        return new DispatchRequest(
            topic,
            queueId,
            physicOffset,
            totalSize,
            tagsCode,
            storeTimestamp,
            queueOffset,
            keys,
            uniqKey,
            sysFlag,
            preparedTransactionOffset,
            propertiesMap
        );
    } catch (Exception e) {
    }

    return new DispatchRequest(-1, false );
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5716960.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存