- 消息存储机制
- 1.前言
- 2.核心存储类:DefaultMessageStore
- 3.消息存储流程
- 4.消息存储文件
- 5.存储文件内存映射
- 5.1.MapperFileQueue
- 5.2.MappedFile
- 5.2.1.commit
- 5.2.2.flush
- 5.3.TransientStorePool
- 6.刷盘机制
- 6.1.同步刷盘
- 6.2.异步刷盘
本文主要讲解内容是Broker接收到消息生产者发送的消息之后,如何将消息持久化存储在Broker中。
2.核心存储类:DefaultMessageStoreprivate final MessageStoreConfig messageStoreConfig; //消息配置属性 private final CommitLog commitLog; //CommitLog文件存储的实现类->消息存储在commitLog中 private final ConcurrentMap> consumeQueueTable; //消息队列存储缓存表,按照消息主题分组 private final FlushConsumeQueueService flushConsumeQueueService; //消息队列文件刷盘服务线程 private final CleanCommitLogService cleanCommitLogService; //过期CommitLog文件删除服务 private final CleanConsumeQueueService cleanConsumeQueueService; //过期ConsumerQueue队列文件删除服务 private final IndexService indexService; //索引服务 private final AllocateMappedFileService allocateMappedFileService; //MappedFile分配服务->内存映射处理commitLog、consumerQueue文件 private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件 private final HAService haService; //消息主从同步实现服务 private final ScheduleMessageService scheduleMessageService; //消息服务调度服务 private final StoreStatsService storeStatsService; //消息存储服务 private final MessageArrivingListener messageArrivingListener; //消息到达监听器 private final TransientStorePool transientStorePool; //消息堆外内存缓存 private final BrokerStatsManager brokerStatsManager; //Broker状态管理器 private final MessageArrivingListener messageArrivingListener; //消息拉取长轮询模式消息达到监听器 private final BrokerConfig brokerConfig; //Broker配置类 private StoreCheckpoint storeCheckpoint; //文件刷盘监测点 private final linkedList dispatcherList; //CommitLog文件转发请求
以上属性是消息存储的核心,需要重点关注每个属性的具体作用。
3.消息存储流程消息存储时序图如下:
消息存储入口:DefaultMessageStore#putMessage
//检查Broker是否是Slave || 判断当前写入状态如果是正在写入,则不能继续 PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } //检查消息主题和消息体长度是否合法 PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } //记录开始写入时间 long beginTime = this.getSystemClock().now(); //写入消息 CompletableFutureresultFuture = this.commitLog.asyncPutMessages(messageExtBatch); resultFuture.thenAccept((result) -> { long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); } //记录相关统计信息 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); //存储失败 if (null == result || !result.isOk()) { //存储状态服务->消息存储失败次数自增 this.storeStatsService.getPutMessageFailedTimes().add(1); } }); return resultFuture;
DefaultMessageStore#checkStoreStatus
//存储服务已停止 if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); return PutMessageStatus.SERVICE_NOT_AVAILABLE; } //Broker为Slave->不可写入 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("broke role is slave, so putMessage is forbidden"); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } //不可写入->broker磁盘已满/写入逻辑队列错误/写入索引文件错误 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("the message store is not writable. It may be caused by one of the following reasons: " + "the broker's disk is full, write to logic queue error, write to index file error, etc"); } return PutMessageStatus.SERVICE_NOT_AVAILABLE; } else { this.printTimes.set(0); } // *** 作系统页写入是否繁忙 if (this.isOSPageCacheBusy()) { return PutMessageStatus.OS_PAGECACHE_BUSY; } return PutMessageStatus.PUT_OK;
CommitLog#asyncPutMessages
//记录消息存储时间 messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); AppendMessageResult result; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); //消息类型是否合法 if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); } //.... //获取上一个MapperFile对象->内存映射的具体实现 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //追加消息需要加锁->串行化处理 putMessageLock.lock(); try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; //记录消息存储时间->保证消息的有序性 messageExtBatch.setStoreTimestamp(beginLockTimestamp); //判断如果mappedFile如果为空或者已满,创建新的mappedFile文件 if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } //如果创建失败,直接返回 if (null == mappedFile) { log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); } //!!!写入消息到mappedFile中!!! result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); //根据写入结果做不同的处理 switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); } result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); case UNKNOWN_ERROR: default: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); } elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); } if (elapsedTimeInLock > 500) { log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes()); //根据刷盘策略进行刷盘 CompletableFutureflushOKFuture = submitFlushRequest(result, messageExtBatch); //主从同步 CompletableFuture replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
MappedFile#appendMessagesInner
assert messageExt != null; assert cb != null; //获取写指针/写入位置 int currentPos = this.wrotePosition.get(); //写指针偏移量小于文件指定大小 if (currentPos < this.fileSize) { //写入缓冲区 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; //根据消息类型->批量/单个->进行不同处理 if (messageExt instanceof MessageExtBrokerInner) { //单个消息 //调用回调方法写入磁盘->CommitLog#doAppend result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt, putMessageContext); } else if (messageExt instanceof MessageExtBatch) { //批量消息 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt, putMessageContext); } else { //未知消息->返回异常结果 return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } //更新写指针 this.wrotePosition.addAndGet(result.getWroteBytes()); //更新写入时间戳 this.storeTimestamp = result.getStoreTimestamp(); //返回写入结果->成功 return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
CommitLog#doAppend
public AppendMessageResult doAppend(final long fileFromOffset, //文件序列偏移量 final ByteBuffer byteBuffer, //NIO字节容器 final int maxBlank, //最大可写入字节数 final MessageExtBrokerInner msgInner, //消息封装实体 PutMessageContext putMessageContext) { //文件写入偏移量 long wroteOffset = fileFromOffset + byteBuffer.position(); //构建msgId SuppliermsgIdSupplier = () -> { //系统标识 int sysflag = msgInner.getSysFlag(); //msgId底层存储由16个字节组成 int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; //分配16个字节的存储空间 ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); //8个字节->ip、host各占用4个字节 MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer); //清除缓冲区->因为接下来需要翻转缓冲区 msgIdBuffer.clear(); //剩下的8个字节用来存储commitLog偏移量-wroteOffset msgIdBuffer.putLong(msgIdLen - 8, wroteOffset); return UtilAll.bytes2string(msgIdBuffer.array()); }; //获取当前主题消息队列唯一key String key = putMessageContext.getTopicQueueTableKey(); //根据key获取消息存储偏移量 Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; } ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); //计算消息存储长度 final int msgLen = preEncodeBuffer.getInt(0); // Determines whether there is sufficient free space //消息是如果没有足够的存储空间则新创建CommitLog文件 if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.msgStoreItemMemory.clear(); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } int pos = 4 + 4 + 4 + 4 + 4; // 6 QUEUEOFFSET preEncodeBuffer.putLong(pos, queueOffset); pos += 8; // 7 PHYSICALOFFSET preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position()); int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP pos += 8 + 4 + 8 + ipLen; // refresh store time stamp in lock preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp()); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer //将消息存储到byteBuffer中 byteBuffer.put(preEncodeBuffer); msgInner.setEncodedBuff(null); //返回AppendMessageResult AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; }
AppendMessageResult
public class AppendMessageResult { private AppendMessageStatus status; //消息追加结果 private long wroteOffset; //消息写入偏移量 private int wroteBytes; //消息待写入字节 private String msgId; //消息ID private SuppliermsgIdSupplier; //消息ID private long storeTimestamp; //消息写入时间戳 private long logicsOffset; //消息队列偏移量 private long pagecacheRT = 0; //消息开始写入时间戳 }
返回消息写入结果,回到CommitLog#asyncPutMessages
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); switch (result.getStatus()) { case PUT_OK: break; } //释放锁 putMessageLock.unlock(); //存储数据统计 storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes()); //根据刷盘策略进行刷盘 CompletableFuture4.消息存储文件flushOKFuture = submitFlushRequest(result, messageExtBatch); //消息主从同步 CompletableFuture replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
- commitLog:消息存储目录
- config:配置信息
- consumerqueue:消息队列存储目录
- index:消息索引文件存储目录
- abort:Broker异常关闭时信息记录
- checkpoint:文件监测点,存储commitlog、consumerqueue、index文件最后一次刷盘时间戳。
RocketMQ通过使用内存映射文件提高IO访问性能,无论是CommitLog、ConsumerQueue还是IndexFile,单个文件都被设计为固定长度。
如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量,如下图所示。
5.1.MapperFileQueue//存储目录 private final String storePath; //单个文件大小 protected final int mappedFileSize; //MappedFile文件集合 protected final CopyOnWriteArrayListmappedFiles = new CopyOnWriteArrayList (); //映射文件MapperFile分配服务线程 private final AllocateMappedFileService allocateMappedFileService; //刷盘指针 protected long flushedWhere = 0; //当前数据提交指针 private long committedWhere = 0;
根据存储时间获取对应的MappedFile
public MappedFile getMappedFileByTime(final long timestamp) { //拷贝映射文件 Object[] mfs = this.copyMappedFiles(0); if (null == mfs) { return null; } //遍历映射文件数组 for (int i = 0; i < mfs.length; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; //MappedFile的最后修改时间大于指定时间戳->返回该文件 if (mappedFile.getLastModifiedTimestamp() >= timestamp) { return mappedFile; } } return (MappedFile) mfs[mfs.length - 1]; }
根据消息存储偏移量查找MappedFile
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { //分别获取第一个和最后一个映射文件 MappedFile firstMappedFile = this.getFirstMappedFile(); MappedFile lastMappedFile = this.getLastMappedFile(); //第一个文件和最后一个文件均不为空,则进行处理 if (firstMappedFile != null && lastMappedFile != null) { if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this.mappedFileSize, this.mappedFileSize, this.mappedFiles.size()); } else { //获得文件索引 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); //目标映射文件 MappedFile targetFile = null; try { //根据文件索引查找目标文件 targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } //对获取到的映射文件进行检查-判空-偏移量是否合法 if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } //继续选择映射文件 for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } } } //返回第一个映射文件 if (returnFirstOnNotFound) { return firstMappedFile; } } } catch (Exception e) { log.error("findMappedFileByOffset Exception", e); } return null; }
获取存储文件最小偏移量
public long getMinOffset() { if (!this.mappedFiles.isEmpty()) { try { return this.mappedFiles.get(0).getFileFromOffset(); } catch (IndexOutOfBoundsException e) { //continue; } catch (Exception e) { log.error("getMinOffset has exception.", e); } } return -1; }
获取存储文件最大偏移量
public long getMaxOffset() { //最后一个映射文件 MappedFile mappedFile = getLastMappedFile(); if (mappedFile != null) { return mappedFile.getFileFromOffset() + mappedFile.getReadPosition(); } return 0; }
获取存储文件当前写指针
public long getMaxWrotePosition() { MappedFile mappedFile = getLastMappedFile(); if (mappedFile != null) { return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); } return 0; }5.2.MappedFile
// *** 作系统每页刷写大小,默认4K public static final int OS_PAGE_SIZE = 1024 * 4; //当前JVM实例中MappedFile虚拟内存 private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); //当前JVM实例中MappedFile对象个数 private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); //当前文件的写指针 protected final AtomicInteger wrotePosition = new AtomicInteger(0); //当前文件的提交指针 protected final AtomicInteger committedPosition = new AtomicInteger(0); //刷盘指针 private final AtomicInteger flushedPosition = new AtomicInteger(0); //文件大小 protected int fileSize; //文件通道 protected FileChannel fileChannel; protected ByteBuffer writeBuffer = null; //堆外内存池 protected TransientStorePool transientStorePool = null; //文件名称 private String fileName; //该文件的处理偏移量 private long fileFromOffset; //物理文件 private File file; //文件映射缓冲区 private MappedByteBuffer mappedByteBuffer; //存储时间戳 private volatile long storeTimestamp = 0; //是否是初次创建 private boolean firstCreateInQueue = false;
MappedFile初始化
private void init(final String fileName, final int fileSize) throws IOException { this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); boolean ok = false; //确保文件目录正确 ensureDirOK(this.file.getParent()); try { this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { log.error("Failed to create file " + this.fileName, e); throw e; } catch (IOException e) { log.error("Failed to map file " + this.fileName, e); throw e; } finally { if (!ok && this.fileChannel != null) { this.fileChannel.close(); } } }
值得注意的是MappedFile还有一个属性值transientStorePoolEnable,当这个属性值为true时,数据会先存储到对外内存,如何通过commit线程将数据提交到内存映射buffer中,最后通过flush线程将内存映射刷写到磁盘中。
开启transientStorePoolEnable
public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize); //初始化对外内存缓冲区 this.writeBuffer = transientStorePool.borrowBuffer(); this.transientStorePool = transientStorePool; }5.2.1.commit
刷盘文件提交流程大致如下:
DefaultMessageStore#flush→CommitLog→MappedFileQueue→MappedFile
//DefaultMessageStore public long flush() { return this.commitLog.flush(); } //CommitLog public long flush() { //----------↓----------- this.mappedFileQueue.commit(0); this.mappedFileQueue.flush(0); return this.mappedFileQueue.getFlushedWhere(); } //MappedFileQueue public boolean commit(final int commitLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); if (mappedFile != null) { //----------↓----------- int offset = mappedFile.commit(commitLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.committedWhere; this.committedWhere = where; } return result; }
最后进入MappedFile进行数据刷写提交:
MappedFile#commit
public int commit(final int commitLeastPages) { //如果为空->说明没有开启transientStorePoolEnable->无需向文件通道fileChannel提交数据 //将wrotePosition视为committedPosition并返回->然后直接进行flush *** 作 if (writeBuffer == null) { return this.wrotePosition.get(); } //提交数据页数大于commitLeastPages if (this.isAbleToCommit(commitLeastPages)) { //MappedFile是否被销毁 //hold()->isAvailable()->MappedFile.available<属性继承于ReferenceResource> //文件如何被摧毁可见下文中的shutdown() if (this.hold()) { //--↓-- commit0(); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } // All dirty data has been committed to FileChannel. // 所有数据提交后,清空缓冲区 if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } return this.committedPosition.get(); }
MappedFile#isAbleToCommit
//已提交刷盘的指针 int flush = this.committedPosition.get(); //文件写指针 int write = this.wrotePosition.get(); //刷盘已写满 if (this.isFull()) { return true; } if (commitLeastPages > 0) { //文件内容达到commitLeastPages->进行刷盘 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } return write > flush;
MappedFile#commit0
//写指针 int writePos = this.wrotePosition.get(); //上次提交指针 int lastCommittedPosition = this.committedPosition.get(); //写指针一定要大于上次提交指针 if (writePos - lastCommittedPosition > 0) { try { //复制共享内存区域 ByteBuffer byteBuffer = writeBuffer.slice(); //设置提交位置是上次提交位置 byteBuffer.position(lastCommittedPosition); //最大提交数量 byteBuffer.limit(writePos); //设置fileChannel位置是上次提交位置 this.fileChannel.position(lastCommittedPosition); //将lastCommittedPosition到writePos的数据复制到FileChannel中 this.fileChannel.write(byteBuffer); //重置提交位置为writePos->以此反复避免提交重复数据 this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } }5.2.2.flush
刷写磁盘,直接调用MappedByteBuffer或fileChannel的force方法将内存中的数据持久化到磁盘,那么flushedPosition应该等于MappedByteBuffer中的写指针;
- 如果writeBuffer不为空,则flushPosition应该等于上一次的commit指针;因为上一次提交的数据就是进入到MappedByteBuffer中的数据;
- 如果writeBuffer为空,数据时直接进入到MappedByteBuffer,wrotePosition代表的是MappedByteBuffer中的指针,故设置flushPosition为wrotePosition。
提交数据到fileChannel后开始刷盘,步骤如下:
CommitLog#flush→MappedFileQueue#flush→MappedFile#flush
MappedFile#flush
//达到刷盘条件 if (this.isAbleToFlush(flushLeastPages)) { //加锁,同步刷盘 if (this.hold()) { //读指针 int value = getReadPosition(); try { //开启TransientStorePool->fileChannel //关闭TransientStorePool->mappedByteBuffer //We only append data to fileChannel or mappedByteBuffer, never both. //数据从writeBuffer提交数据到fileChannel->force if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } //数据直接传到mappedByteBuffer->force else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } //更新刷盘位置 this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition();
MappedFile#getReadPosition
public int getReadPosition() { //如果writeBuffer为空直接返回当前的写指针,否则返回上次提交的指针 //在MappedFile中,只有提交了的数据(写入到MappedByteBuffer或FileChannel中的数据)才是安全的数据 return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); }
MappedFile#shutdown
MappedFile文件销毁的实现方法为ReferenceResource中的public boolean destory(long intervalForcibly),intervalForcibly表示拒绝被销毁的最大存活时间。
if (this.available) { //关闭MappedFile this.available = false; //设置关闭时间戳 this.firstShutdownTimestamp = System.currentTimeMillis(); //释放资源 this.release(); } else if (this.getRefCount() > 0) { if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { this.refCount.set(-1000 - this.getRefCount()); this.release(); } }5.3.TransientStorePool
用于短时间存储数据的存储池。RocketMQ单独创建ByteBuffer内存缓冲区,用来临时存储数据,数据先写入该内存映射,然后由commit线程将数据复制到目标物理文件所对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。
private final int poolSize; //availableBuffers个数 private final int fileSize; //每个ByteBuffer大小 private final DequeavailableBuffers; //双端队列-存储可用缓冲区的容器 private final MessageStoreConfig storeConfig; //消息存储配置
初始化:
public void init() { //创建poolSize个堆外内存区 for (int i = 0; i < poolSize; i++) { //分配内存 ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize); //内存地址 final long address = ((DirectBuffer) byteBuffer).address(); Pointer pointer = new Pointer(address); //使用com.sun.jna.Library类库将该批内存锁定,避免被置换到交换区,提高存储性能 LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); availableBuffers.offer(byteBuffer); } }
6.刷盘机制 6.1.同步刷盘
CommitLog#submitFlushRequest
//同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { //刷写CommitLog服务线程 final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; //需要等待消息存储结果 if (messageExt.isWaitStoreMsgOK()) { //封装刷盘请求 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); //将request放入刷写磁盘服务线程中 //--------↓-------- service.putRequest(request); //等待写入结果返回 return request.future(); } else { //唤醒同步刷盘线程 service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } else { //异步刷盘.... }
GroupCommitRequest
public static class GroupCommitRequest { private final long nextOffset; private CompletableFutureflushOKFuture = new CompletableFuture<>(); private final long startTimestamp = System.currentTimeMillis(); private long timeoutMillis = Long.MAX_VALUE; }
GroupCommitService
class GroupCommitService extends FlushCommitLogService { //分别存储写请求和读请求的容器 private volatile linkedListrequestsWrite = new linkedList (); private volatile linkedList requestsRead = new linkedList (); //消息存储自旋锁-保护以上容器线程安全 private final PutMessageSpinLock lock = new PutMessageSpinLock(); }
GroupCommitService#putRequest
//加上自旋锁 lock.lock(); try { //将写请求放入容器 this.requestsWrite.add(request); } finally { lock.unlock(); } //唤醒线程 this.wakeup();
GroupCommitService#run
CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { //等待线程10s this.waitForRunning(10); //执行提交任务 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end");
GroupCommitService#doCommit
if (!this.requestsRead.isEmpty()) { //遍历requestsRead for (GroupCommitRequest req : this.requestsRead) { //刷盘后指针位置大于请求指针偏移量则代表已经刷盘成功 //下一个文件中可能有消息,所以最多两次flush boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); for (int i = 0; i < 2 && !flushOK; i++) { CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } //唤醒发送消息客户端 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } //更新刷盘监测点 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } //清空任务容器 this.requestsRead = new linkedList<>(); } else { //因为个别消息设置为异步flush,所以会走到这个过程 CommitLog.this.mappedFileQueue.flush(0); }6.2.异步刷盘
在消息追加到内存后,立即返回给消息发送端。如果开启transientStorePoolEnable,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。
开启transientStorePoolEnable后异步刷盘步骤:
- 将消息直接追加到ByteBuffer堆外内存
- CommitRealTimeService线程每隔200ms将ByteBuffer中的消息提交到fileChannel
- commit *** 作成功,将commitedPosition向后移动
- FlushRealTimeService线程每隔500ms将fileChannel的数据刷写到磁盘
// Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { ... } // Asynchronous flush else { //开启TransientStorePoolEnable if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { //唤醒flushCommitLogService服务线程 flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }
CommitRealTimeService#run
提交线程工作机制:
//间隔时间:200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); //一次提交的最少页数:4 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); //两次提交的最大间隔:200ms int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); //上次提交间隔超过commitDataThoroughInterval,则忽略提交commitDataLeastPages参数,直接提交 long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; //忽略提交页数要求 commitDataLeastPages = 0; } try { //执行提交 *** 作,将待提交数据提交到物理文件的内存映射区并返回提交结果 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); //提交成功 if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. //唤醒刷盘线程FlushRealTimeService(FlushCommitLogService的子类) flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } }
FlushCommitLogService#run
刷盘线程工作机制:
//线程不停止 while (!this.isStopped()) { //线程执行间隔:500ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); //一次刷盘任务最少包含页数:4 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); //两次刷盘任务最大间隔:10s int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); //如果当前时间戳大于上次刷盘时间+最大刷盘任务间隔 则本次刷盘任务忽略flushPhysicQueueLeastPages(设置为0) 直接提交刷盘任务 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } try { if (flushCommitLogTimed) { //线程执行间隔-500m Thread.sleep(interval); } else { this.waitForRunning(interval); } if (printFlushProgress) { this.printFlushProgress(); } long begin = System.currentTimeMillis(); //刷写磁盘 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); //更新存储监测点文件的时间戳 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } }
本文仅作为个人学习使用,如有不足或错误请指正!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)