rocketmq核心源码分析第十篇一消息处理第四部分一broker端添加消息到缓存以及构建consumequeue与index文件

rocketmq核心源码分析第十篇一消息处理第四部分一broker端添加消息到缓存以及构建consumequeue与index文件,第1张

rocketmq核心源码分析第十篇一消息处理第四部分一broker端添加消息到缓存以及构建consumequeue与index文件

文章目录

消息顺序写入缓存

消息存储一asyncPutMessagecommitLog.asyncPutMessageMappedFile.appendMessagesInner添加消息到内存 异步构建consumequeue和index文件

异步自旋处理doReput一构建索引以及触发消息到达事件

CommitLogDispatcherBuildConsumeQueue.dispatch构建消费队列CommitLogDispatcherBuildIndex.dispatch构建消费队列 总结

消息顺序写入缓存 消息存储一asyncPutMessage

commitLog完成消息存储storeStatsService完成消息架构升级

public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) {
    储存检查
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }
    消息检查
    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }

    long beginTime = this.getSystemClock().now();
    commitlog完成文件存储[内存存储,并无刷盘]
    CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg);

    putResultFuture.thenAccept((result) -> {
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
    });

    return putResultFuture;
}
commitLog.asyncPutMessage

定时消息 则topic替换成定时消息topic定时消息队列处理 queueID = DelayTimeLevel - 1 [1~18] [0~17]Backup real topic, queueId 原来的topic可能是业务topic也有可能是和消费者组挂钩的消费失败topic追加消息到commitlog的文件对应内存当前文件不足则新建文件并追加消息到新的mmap内存统计刷盘主从复制

 public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) {
    // 设置存储时间
    msg.setStoreTimestamp(System.currentTimeMillis());
    // 设置消息体crc
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    AppendMessageResult result = null;
    // 获取统计服务
    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();
    //
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 定时消息 则topic替换成定时消息topic
            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            // queueID = DelayTimeLevel - 1 [1~18] [0~17]
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // Backup real topic, queueId 原来的topic可能是业务topic也有可能是和消费者组挂钩的消费失败topic
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }

    long elapsedTimeInLock = 0;
    MappedFile unlockMappedFile = null;
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    try {
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;

        // Here settings are stored timestamp, in order to ensure an orderly
        // global
        msg.setStoreTimestamp(beginLockTimestamp);

        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        if (null == mappedFile) {
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
        }
        // 追加消息到commitlog的文件对应内存
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        switch (result.getStatus()) {
            case PUT_OK:
                break;
            case END_OF_FILE:
                // 当前文件不足则新建文件
                unlockMappedFile = mappedFile;
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) {
                    // XXX: warn and notify me
                    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                }
                // 消息刷到内存
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
            case UNKNOWN_ERROR:
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            default:
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
        }

        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        beginTimeInLock = 0;
    } finally {
        putMessageLock.unlock();
    }

    if (elapsedTimeInLock > 500) {
        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
    }

    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }

    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // t统计
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
    // 刷盘
    CompletableFuture flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
    // 主从复制
    CompletableFuture replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
    // 两者都完成后才会返回结果
    return flushResultFuture.thenCombine(replicaResultFuture, new BiFunction() {
        @Override
        public PutMessageResult apply(PutMessageStatus flushStatus, PutMessageStatus replicaStatus) {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
            }
            return putMessageResult;
        }
    });
}
MappedFile.appendMessagesInner添加消息到内存

可能添加到堆外内存[开启堆外内存+异步刷盘]可能添加到mmap文件内存ByteBuffer.doAppend将消息添加到缓存区[二进制字节处理]

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;
    获取当前文件写入位置
    int currentPos = this.wrotePosition.get();
    if (currentPos < this.fileSize) {
        buffer和同步刷盘还是异步刷盘机制相关 broker.conf配置
        writeBuffer只有异步刷盘外加堆外内存机制开启才可使用
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        单独消息
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        } else if (messageExt instanceof MessageExtBatch) {
            批量消息添加到缓存
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        处理写完后的指针
        this.wrotePosition.addAndGet(result.getWroteBytes());
        处理写入mmap的时间点
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
异步构建consumequeue和index文件 异步自旋处理

当commitlog写入并刷盘后,会异步构建相关索引信息默认每1毫秒构建一次

public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");
            //                     index
            // 写完commitlog后--->
            //                      consumequeue
            //
            while (!this.isStopped()) {
                try {
                    Thread.sleep(1);
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }

doReput一构建索引以及触发消息到达事件

遍历dispatcherList构建索引文件开启长轮询机制则触发消息到达[参见消息消费]

private void doReput() {
    ...... 删除其他代码    
    for (boolean donext = true; this.isCommitLogAvailable() && doNext; ) {
        ...... 删除其他代码
        当前的构建进度
        this.reputFromOffset = result.getStartOffset();
        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
            if (size > 0) {
                核心构建consumequeue和index
                DefaultMessageStore.this.doDispatch(dispatchRequest);
                如果broker允许长轮询则触发消息到达
                if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                }
                ..... 删除统计代码
            }
        }
           
    }
}
CommitLogDispatcherBuildConsumeQueue.dispatch构建消费队列

通过putMessagePositionInfo完成写入

    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // 构建核心 写入 consumequeue文件,真正的写入到 ConsumeQueue 逻辑如下。
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }

初始化ConsumeQueue将commitlog的信息放置consumeQueue

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    初始化ConsumeQueue
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    将commitlog的信息放置consumeQueue
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}
CommitLogDispatcherBuildIndex.dispatch构建消费队列

通过indexservice完成构建

  class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.buildIndex(request);
            }
        }
    }

需要注意msgId和消息key都加入了indexFile

public void buildIndex(DispatchRequest req) {
    // 获取索引文件 对应一个mapperfile
    IndexFile indexFile = retryGetAndCreateIndexFile();
    ...... 删除其他代码
    消息唯一id
    if (req.getUniqKey() != null) {
        // 将 getUniqKey 放到 indexFile
        indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
        if (indexFile == null) {
            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
            return;
        }
    }
    消息key加入索引文件
    if (keys != null && keys.length() > 0) {
        String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
        for (int i = 0; i < keyset.length; i++) {
            String key = keyset[i];
            if (key.length() > 0) {
                indexFile = putKey(indexFile, msg, buildKey(topic, key));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }
        }
    }
}
总结

消息写入缓冲区,消息跨文件需要两次写入缓冲区ReputMessageService异步构建消息到indexFile和consumeQueue内存需要注意msgId 和msg key 都加入了indexFile还涉及其他知识点,比如mmap顺序写与mlock预热 ,堆外内存开启的特点,消息长轮询机制,commitlog数据协议,消息刷盘,队列名称的转换细节等

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5715480.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存