获取映射文件原理图源码分析一创建新文件源码分析一getLastMappedFile源码分析一putRequestAndReturnMappedFile源码分析一AllocateMappedFileService.run源码分析一mmapOperation
源码分析一warmMappedFilemlock 总结
获取映射文件原理图当前文件已经写满或者空间小于当前消息长度,则需要新建commitlog[1G]需要通过allocateMappedFileService创建文件每次构建两个请求[创建两个commitlog文件]加入任务池追加消息线程通过任务池的request阻塞[线程闭锁工具]allocateMappedFileService从任务池异步按顺序获取任务创建文件完成假值填充预热[磁盘载入内存]完成内存锁定[防止内存交换导致缺页]完成文件创建,线程闭锁工具CountDownLatch().countDown()唤醒追加消息线程
commitlog.asyncPutMessage消息追加缓冲区作为创建文件入口
step-1: 缓冲区写满step-2: 追加消息到commitlog的文件对应内存step-3: 当前文件不足则新建文件后添加消息到缓冲区
public CompletableFuture源码分析一getLastMappedFileasyncPutMessage(final MessageExtBrokerInner msg) { step-1: 缓冲区写满 if (null == mappedFile || mappedFile.isFull()) { // 获取一个新的mappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } 追加消息到commitlog的文件对应内存 result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: step-2: 消息正常写入缓冲区 break; case END_OF_FILE: step-3: 当前文件不足则新建文件 mappedFile = this.mappedFileQueue.getLastMappedFile(0); ...... 删除其他代码 消息刷到内存 result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; ...... 删除其他代码 default: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); } }
step-1: 异步创建:构建两个文件名称step-2: commitlog创建方式,commitlog大小1g,一次异步创建2个step-3: 同步创建 consumequeue和index创建方式,直接创建一个mmapedFile
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) { ...... 删除其他代码 if (createOffset != -1 && needCreate) { step-1: 异步创建:构建两个文件名称 第一个文件创建完毕唤醒appendmessage线程 第二个文件异步创建用于提高下次获取文件的效率 String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + this.mappedFileSize); MappedFile mappedFile = null; if (this.allocateMappedFileService != null) { commitlog创建方式,commitlog大小1g,一次异步创建2个 mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this.mappedFileSize); } else { consumequeue和index创建方式 try { mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); } catch (IOException e) { log.error("create mappedFile exception", e); } } ...... 删除其他代码 return mappedFile; } }源码分析一putRequestAndReturnMappedFile
step-1: 添加两个请求到任务处理池requestTable 处理线程阻塞等待唤醒step-2: 添加两个请求添加到任务处理池requestQueue[commitlog文件名进行排序 文件名小的先创建 文件名按照offset进行命名]step-3: 主线程等待nextFilePath创建完毕 nextNextFilePath无需等待
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) { 默认可以处理两个映射文件创建请求 int canSubmitRequests = 2; 重新计算最多可以提交几个文件创建请求,(一般两个) if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size(); } } 添加到任务处理池requestTable 处理线程阻塞等待唤醒 AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize); boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null; if (nextPutOK) { if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums()); this.requestTable.remove(nextFilePath); return null; } 添加到任务处理池requestQueue[commitlog文件名进行排序 文件名小的先创建 文件名按照offset进行命名] boolean offerOK = this.requestQueue.offer(nextReq); if (!offerOK) { log.warn("never expected here, add a request to preallocate queue failed"); } canSubmitRequests--; } 处理nextNextReq到requestTable和requestQueue AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize); boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null; if (nextNextPutOK) { if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums()); this.requestTable.remove(nextNextFilePath); } else { boolean offerOK = this.requestQueue.offer(nextNextReq); if (!offerOK) { log.warn("never expected here, add a request to preallocate queue failed"); } } } if (hasException) { log.warn(this.getServiceName() + " service has exception. so return null"); return null; } 主线程等待nextFilePath创建完毕 AllocateRequest result = this.requestTable.get(nextFilePath); try { if (result != null) { 通过CountDownLatch完成线程间信息通信 boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS); if (!waitOK) { log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize()); return null; } else { this.requestTable.remove(nextFilePath); return result.getMappedFile(); } } else { log.error("find preallocate mmap failed, this never happen"); } } catch (InterruptedException e) { log.warn(this.getServiceName() + " service has exception. ", e); } return null; }源码分析一AllocateMappedFileService.run
DefaultMessageStore构造函数会启动AllocateMappedFileService线程自旋获取请求,不存在阻塞,否则创建文件
public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped() && this.mmapOperation()) { } log.info(this.getServiceName() + " service end"); }源码分析一mmapOperation
step-1: 优先级阻塞队列,获取创建文件请求[队列无元素阻塞]step-2: 是否允许堆外内存,要求异步刷盘 并且启动堆外内存才会走该机制step-2.1: 堆外内存创建方式step-2.2: 没有堆外内存,则mmap内存映射step-3: 内存预热以及mlock
private boolean mmapOperation() { boolean isSuccess = false; AllocateRequest req = null; try { step-1: 优先级阻塞队列,创建请求[队列无元素阻塞] req = this.requestQueue.take(); AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath()); ...... 删除其他代码 if (req.getMappedFile() == null) { long beginTime = System.currentTimeMillis(); MappedFile mappedFile; step-2: 是否允许堆外内存,要求异步刷盘 并且启动堆外内存才会走该机制 if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { step-2.1: 堆外内存创建方式 mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); ...... 删除其他代码 } else { step-2.2: 没有堆外内存,则mmap内存映射 mappedFile = new MappedFile(req.getFilePath(), req.getFileSize()); } ...... 删除其他代码 if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig() .getMappedFileSizeCommitLog() && this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { step-3: 内存预热以及mlock 写入假值0 进行预热 写入假值进而 *** 作系统发现os page 缺页 从而读取物理磁盘数据到内存 mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(), this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile()); } ......删除其他代码 } ......删除其他代码 }finally { 唤醒添加消息线程 if (req != null && isSuccess) req.getCountDownLatch().countDown(); } return true; }源码分析一warmMappedFile
对1G的commitlog进行预热 写入假值,适当让出cpu 然后通过mlock防止swap
ps: 笔者至今也没有查到为什么填充假值0,而不是1或者其他很官方的说明,读者有知道的可以留言
public void warmMappedFile(FlushDiskType type, int pages) { long beginTime = System.currentTimeMillis(); ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); int flush = 0; long time = System.currentTimeMillis(); 对1G的commitlog进行预热 写入假值,适当让出cpu 然后通过mlock防止swap for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) { 仅分配内存并调用 mlock 并不会为调用进程锁定这些内存,因为对应的分页可能是写时复制(copy-on-write)的5。因此,你应该在每个页面中写入一个假的值 也就是说仅仅分配,但内存映射尚未执行 byteBuffer.put(i, (byte) 0); // force flush when flush disk type is sync if (type == FlushDiskType.SYNC_FLUSH) { if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { flush = i; mappedByteBuffer.force(); } } 主动放弃cpu if (j % 1000 == 0) { log.info("j={}, costTime={}", j, System.currentTimeMillis() - time); time = System.currentTimeMillis(); try { Thread.sleep(0); } catch (InterruptedException e) { log.error("Interrupted", e); } } } 刷盘 if (type == FlushDiskType.SYNC_FLUSH) { log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}", this.getFileName(), System.currentTimeMillis() - beginTime); mappedByteBuffer.force(); } 通过mlock避免内存被 *** 作系统swap this.mlock(); }mlock
调用c语言的mlock函数完成内存锁定防止内存不足或其他情况导致内存被置换出,从而导致下次内存缺页, *** 作系统需要从磁盘进行IO
public void mlock() { final long beginTime = System.currentTimeMillis(); final long address = ((DirectBuffer) (this.mappedByteBuffer)).address(); Pointer pointer = new Pointer(address); { // 位置+ 长度 int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize)); log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); } { int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED); log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); } }总结
commitlog的创建方式分为直接内存映射或者内存映射+堆外缓冲的方式[参见刷盘章节]commitlog的创建会期望一次创建两个通过假值0填充完成内存映射通过mlock锁定内存所有的工作都是为了appendMessage时能够高性能写入缓冲区
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)