CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。
投递时间=消息存储时间(storeTimestamp) + 延迟级别对应的时间
需要注意的是,会将计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中
相关源码参见:
org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean)
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC) { return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true); }
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { try { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); // 2 MAGIC CODE int magicCode = byteBuffer.getInt(); switch (magicCode) { case MESSAGE_MAGIC_CODE: break; case BLANK_MAGIC_CODE: return new DispatchRequest(0, true ); default: log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode)); return new DispatchRequest(-1, false ); } byte[] bytesContent = new byte[totalSize]; int bodyCRC = byteBuffer.getInt(); int queueId = byteBuffer.getInt(); int flag = byteBuffer.getInt(); long queueOffset = byteBuffer.getLong(); long physicOffset = byteBuffer.getLong(); int sysFlag = byteBuffer.getInt(); long bornTimeStamp = byteBuffer.getLong(); ByteBuffer byteBuffer1; if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) { byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4); } else { byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4); } long storeTimestamp = byteBuffer.getLong(); ByteBuffer byteBuffer2; if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) { byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4); } else { byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4); } int reconsumeTimes = byteBuffer.getInt(); long preparedTransactionOffset = byteBuffer.getLong(); int bodyLen = byteBuffer.getInt(); if (bodyLen > 0) { if (readBody) { byteBuffer.get(bytesContent, 0, bodyLen); if (checkCRC) { int crc = UtilAll.crc32(bytesContent, 0, bodyLen); if (crc != bodyCRC) { log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC); return new DispatchRequest(-1, false); } } } else { byteBuffer.position(byteBuffer.position() + bodyLen); } } byte topicLen = byteBuffer.get(); byteBuffer.get(bytesContent, 0, topicLen); String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8); long tagsCode = 0; String keys = ""; String uniqKey = null; short propertiesLength = byteBuffer.getShort(); MappropertiesMap = null; if (propertiesLength > 0) { byteBuffer.get(bytesContent, 0, propertiesLength); String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8); propertiesMap = MessageDecoder.string2messageProperties(properties); keys = propertiesMap.get(MessageConst.PROPERTY_KEYS); uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS); if (tags != null && tags.length() > 0) { tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags); } // Timing message processing { String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) { int delayLevel = Integer.parseInt(t); if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); } if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp); } } } } int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength); if (totalSize != readLength) { doNothingForDeadCode(reconsumeTimes); doNothingForDeadCode(flag); doNothingForDeadCode(bornTimeStamp); doNothingForDeadCode(byteBuffer1); doNothingForDeadCode(byteBuffer2); log.error( "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}", totalSize, readLength, bodyLen, topicLen, propertiesLength); return new DispatchRequest(totalSize, false); } return new DispatchRequest( topic, queueId, physicOffset, totalSize, tagsCode, storeTimestamp, queueOffset, keys, uniqKey, sysFlag, preparedTransactionOffset, propertiesMap ); } catch (Exception e) { } return new DispatchRequest(-1, false ); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)