消息顺序写入缓存
消息存储一asyncPutMessagecommitLog.asyncPutMessageMappedFile.appendMessagesInner添加消息到内存 异步构建consumequeue和index文件
异步自旋处理doReput一构建索引以及触发消息到达事件
CommitLogDispatcherBuildConsumeQueue.dispatch构建消费队列CommitLogDispatcherBuildIndex.dispatch构建消费队列 总结
消息顺序写入缓存 消息存储一asyncPutMessagecommitLog完成消息存储storeStatsService完成消息架构升级
public CompletableFuturecommitLog.asyncPutMessageasyncPutMessage(MessageExtBrokerInner msg) { 储存检查 PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } 消息检查 PutMessageStatus msgCheckStatus = this.checkMessage(msg); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } long beginTime = this.getSystemClock().now(); commitlog完成文件存储[内存存储,并无刷盘] CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg); putResultFuture.thenAccept((result) -> { long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } }); return putResultFuture; }
定时消息 则topic替换成定时消息topic定时消息队列处理 queueID = DelayTimeLevel - 1 [1~18] [0~17]Backup real topic, queueId 原来的topic可能是业务topic也有可能是和消费者组挂钩的消费失败topic追加消息到commitlog的文件对应内存当前文件不足则新建文件并追加消息到新的mmap内存统计刷盘主从复制
public CompletableFutureMappedFile.appendMessagesInner添加消息到内存asyncPutMessage(final MessageExtBrokerInner msg) { // 设置存储时间 msg.setStoreTimestamp(System.currentTimeMillis()); // 设置消息体crc msg.setBodyCRC(UtilAll.crc32(msg.getBody())); AppendMessageResult result = null; // 获取统计服务 StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); // final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 定时消息 则topic替换成定时消息topic topic = ScheduleMessageService.SCHEDULE_TOPIC; // queueID = DelayTimeLevel - 1 [1~18] [0~17] queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId 原来的topic可能是业务topic也有可能是和消费者组挂钩的消费失败topic MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global msg.setStoreTimestamp(beginLockTimestamp); if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); } // 追加消息到commitlog的文件对应内存 result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: // 当前文件不足则新建文件 unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); } // 消息刷到内存 result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); case UNKNOWN_ERROR: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); default: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); } elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); } if (elapsedTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // t统计 storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); // 刷盘 CompletableFuture flushResultFuture = submitFlushRequest(result, putMessageResult, msg); // 主从复制 CompletableFuture replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg); // 两者都完成后才会返回结果 return flushResultFuture.thenCombine(replicaResultFuture, new BiFunction () { @Override public PutMessageResult apply(PutMessageStatus flushStatus, PutMessageStatus replicaStatus) { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; } }); }
可能添加到堆外内存[开启堆外内存+异步刷盘]可能添加到mmap文件内存ByteBuffer.doAppend将消息添加到缓存区[二进制字节处理]
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; 获取当前文件写入位置 int currentPos = this.wrotePosition.get(); if (currentPos < this.fileSize) { buffer和同步刷盘还是异步刷盘机制相关 broker.conf配置 writeBuffer只有异步刷盘外加堆外内存机制开启才可使用 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; 单独消息 if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { 批量消息添加到缓存 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } 处理写完后的指针 this.wrotePosition.addAndGet(result.getWroteBytes()); 处理写入mmap的时间点 this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }异步构建consumequeue和index文件 异步自旋处理
当commitlog写入并刷盘后,会异步构建相关索引信息默认每1毫秒构建一次
public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); // index // 写完commitlog后---> // consumequeue // while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } DefaultMessageStore.log.info(this.getServiceName() + " service end"); }doReput一构建索引以及触发消息到达事件
遍历dispatcherList构建索引文件开启长轮询机制则触发消息到达[参见消息消费]
private void doReput() { ...... 删除其他代码 for (boolean donext = true; this.isCommitLogAvailable() && doNext; ) { ...... 删除其他代码 当前的构建进度 this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { if (size > 0) { 核心构建consumequeue和index DefaultMessageStore.this.doDispatch(dispatchRequest); 如果broker允许长轮询则触发消息到达 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } ..... 删除统计代码 } } } }CommitLogDispatcherBuildConsumeQueue.dispatch构建消费队列
通过putMessagePositionInfo完成写入
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 构建核心 写入 consumequeue文件,真正的写入到 ConsumeQueue 逻辑如下。 DefaultMessageStore.this.putMessagePositionInfo(request); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } }
初始化ConsumeQueue将commitlog的信息放置consumeQueue
public void putMessagePositionInfo(DispatchRequest dispatchRequest) { 初始化ConsumeQueue ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); 将commitlog的信息放置consumeQueue cq.putMessagePositionInfoWrapper(dispatchRequest); }CommitLogDispatcherBuildIndex.dispatch构建消费队列
通过indexservice完成构建
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { DefaultMessageStore.this.indexService.buildIndex(request); } } }
需要注意msgId和消息key都加入了indexFile
public void buildIndex(DispatchRequest req) { // 获取索引文件 对应一个mapperfile IndexFile indexFile = retryGetAndCreateIndexFile(); ...... 删除其他代码 消息唯一id if (req.getUniqKey() != null) { // 将 getUniqKey 放到 indexFile indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey())); if (indexFile == null) { log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey()); return; } } 消息key加入索引文件 if (keys != null && keys.length() > 0) { String[] keyset = keys.split(MessageConst.KEY_SEPARATOR); for (int i = 0; i < keyset.length; i++) { String key = keyset[i]; if (key.length() > 0) { indexFile = putKey(indexFile, msg, buildKey(topic, key)); if (indexFile == null) { log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey()); return; } } } } }总结
消息写入缓冲区,消息跨文件需要两次写入缓冲区ReputMessageService异步构建消息到indexFile和consumeQueue内存需要注意msgId 和msg key 都加入了indexFile还涉及其他知识点,比如mmap顺序写与mlock预热 ,堆外内存开启的特点,消息长轮询机制,commitlog数据协议,消息刷盘,队列名称的转换细节等
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)