源码解析RocketMQ优化(一)——内存预映射机制

源码解析RocketMQ优化(一)——内存预映射机制,第1张

系列文章:

源码解析RocketMQ优化(一)——内存预映射机制
源码解析RocketMQ优化(二)——文件预热

参考:
RocketMQ源码解析-零拷贝 - 掘金 (juejin.cn)
rocketMQ零拷贝+kafka零拷贝+netty零拷贝分析 - 知乎 (zhihu.com)

文章目录
  • 具体实现
  • 源码

经过之前的博客RocketMQ原理详解(一)——零拷贝机制,已知 RocketMQ 使用 mmap 方式对 page cache 进行了内存拷贝。但是当遇到OS进行脏页回写,内存回收,内存swap等情况时,就会引起较大的消息读写延迟。这又该怎么办呢?

其实 RocketMQ 结合 mmap 和 page cache 进行了一些优化,这篇就先说说内存预映射机制
内存预映射机制 ,就是 RocketMQ 预分配 MappedFile,这样在下次获取时候直接返回 MappedFile 实例而不用等待 MappedFile 创建分配。

具体实现
  1. RocketMQ 会维护一个 MappedFileQueue 队列,在消息写入过程中(调用 CommitLog 的putMessage()方法),CommitLog 会先从 MappedFileQueue 中获取队列中最后一个 MappedFile,如果没有则新建一个。

  2. 这里 MappedFile 的创建过程是:将构建好的一个 AllocateRequest 请求(具体做法是,将下一个文件的路径、下下个文件的路径、文件大小为参数封装为 AllocateRequest 对象)添加至队列中,后台运行的 AllocateMappedFileService 服务线程(在Broker启动时,该线程就会创建并运行),会不停地run,只要请求队列里存在请求,就会去执行MappedFile映射文件的创建和预分配工作,分配的时候有两种策略(具体采用哪种策略,也与刷盘的方式有关):

    a. 使用 Mmap 的方式来构建 MappedFile 实例
    b. 从 TransientStorePool 堆外内存池中获取相应的 DirectByteBuffer 来构建 MappedFile

这样,在建分配完下个MappedFile后,AllocateMappedFileService 还会将下下个 MappedFile 预先创建并保存至请求队列中等待下次获取时直接返回。下次获取时候直接返回就可以不用等待 MappedFile 创建分配所产生的时间延迟。

源码

CommitLog 会先从 MappedFileQueue 中获取队列中最后一个 MappedFile

org/apache/rocketmq/store/CommitLog #asyncPutMessages

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
 	......
    try {
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
       ......
    } 
   ......
}

org/apache/rocketmq/store/MappedFileQueue#getLastMappedFile

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    long createOffset = -1;
    MappedFile mappedFileLast = getLastMappedFile();

    if (mappedFileLast == null) {
        createOffset = startOffset - (startOffset % this.mappedFileSize);
    }

    if (mappedFileLast != null && mappedFileLast.isFull()) {
        createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }

    if (createOffset != -1 && needCreate) {
        // 如果没有则新建一个
        return tryCreateMappedFile(createOffset);
    }

    return mappedFileLast;
}

 public MappedFile getLastMappedFile() {
     MappedFile mappedFileLast = null;

     while (!this.mappedFiles.isEmpty()) {
         try {
             mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
             break;
         } catch (IndexOutOfBoundsException e) {
             //continue;
         } catch (Exception e) {
             log.error("getLastMappedFile has exception.", e);
             break;
         }
     }

     return mappedFileLast;
 }

新建 下个MappedFile 和 下下个 MappedFile

org/apache/rocketmq/store/MappedFileQueue#tryCreateMappedFile

protected MappedFile tryCreateMappedFile(long createOffset) {
    // 生成下一个文件的路径、下下个文件的路径
    String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
    String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
            + this.mappedFileSize);
    // 创建这两个路径对应的MappedFile
    return doCreateMappedFile(nextFilePath, nextNextFilePath);
}

protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
    MappedFile mappedFile = null;

    if (this.allocateMappedFileService != null) {
        // 调用allocateMappedFileService去创建 下个MappedFile 和 下下个MappedFile 
        mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
    } else {
        try {
            mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
        } catch (IOException e) {
            log.error("create mappedFile exception", e);
        }
    }

    if (mappedFile != null) {
        if (this.mappedFiles.isEmpty()) {
            mappedFile.setFirstCreateInQueue(true);
        }
        this.mappedFiles.add(mappedFile);
    }

    return mappedFile;
}

将构建好的一个 AllocateRequest 请求(具体做法是,将下一个文件的路径、下下个文件的路径、文件大小为参数封装为 AllocateRequest 对象)添加至队列中

org/apache/rocketmq/store/AllocateMappedFileService#putRequestAndReturnMappedFile

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
    int canSubmitRequests = 2;
    if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
            && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
            canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
        }
    }
    
	//创建 新建下一个MappedFile 的AllocateRequest请求
    AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
    boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

    if (nextPutOK) {
        if (canSubmitRequests <= 0) {
            log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
            this.requestTable.remove(nextFilePath);
            return null;
        }
        //添加进AllocateRequest请求队列
        boolean offerOK = this.requestQueue.offer(nextReq);
        if (!offerOK) {
            log.warn("never expected here, add a request to preallocate queue failed");
        }
        canSubmitRequests--;
    }
    
	// 创建 新建下下个MappedFile 的AllocateRequest请求
    AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
    boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
    if (nextNextPutOK) {
        if (canSubmitRequests <= 0) {
            log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
            this.requestTable.remove(nextNextFilePath);
        } else {
            //添加进AllocateRequest请求队列
            boolean offerOK = this.requestQueue.offer(nextNextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
        }
    }

    if (hasException) {
        log.warn(this.getServiceName() + " service has exception. so return null");
        return null;
    }

    AllocateRequest result = this.requestTable.get(nextFilePath);
    try {
        if (result != null) {
            boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
            if (!waitOK) {
                log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                return null;
            } else {
                this.requestTable.remove(nextFilePath);
                return result.getMappedFile();
            }
        } else {
            log.error("find preallocate mmap failed, this never happen");
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
    }

    return null;
}

后台运行的 AllocateMappedFileService 服务线程(在Broker启动时,该线程就会创建并运行),会不停地run,只要请求队列里存在请求,就会去执行MappedFile映射文件的创建和预分配工作。

org/apache/rocketmq/store/AllocateMappedFileService#run

public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped() && this.mmapOperation()) {

    }
    log.info(this.getServiceName() + " service end");
}

org/apache/rocketmq/store/AllocateMappedFileService#mmapOperation

/**
 * Only interrupted by the external thread, will return false
 */
private boolean mmapOperation() {
    boolean isSuccess = false;
    AllocateRequest req = null;
    try {
        // 从队列里取出AllocateRequest请求
        req = this.requestQueue.take();
        AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
        if (null == expectedRequest) {
            log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize());
            return true;
        }
        if (expectedRequest != req) {
            log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
            return true;
        }

        if (req.getMappedFile() == null) {
            long beginTime = System.currentTimeMillis();
			
            //新建mappedFile
            MappedFile mappedFile;
            //分配的时候有两种策略
            //1.从 TransientStorePool 堆外内存池中获取相应的 DirectByteBuffer 来构建 MappedFile 
            if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                try {
                    mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                    mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                } catch (RuntimeException e) {
                    log.warn("Use default implementation.");
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                }
            } else {
                //2.使用 Mmap 的方式来构建 MappedFile 实例
                mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
            }

            long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
            if (elapsedTime > 10) {
                int queueSize = this.requestQueue.size();
                log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
                    + " " + req.getFilePath() + " " + req.getFileSize());
            }

            // pre write mappedFile
            if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                .getMappedFileSizeCommitLog()
                &&
                this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                    this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
            }

            req.setMappedFile(mappedFile);
            this.hasException = false;
            isSuccess = true;
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
        this.hasException = true;
        return false;
    } catch (IOException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
        this.hasException = true;
        if (null != req) {
            requestQueue.offer(req);
            try {
                Thread.sleep(1);
            } catch (InterruptedException ignored) {
            }
        }
    } finally {
        if (req != null && isSuccess)
            req.getCountDownLatch().countDown();
    }
    return true;
}

这里有一段pre write mappedFile的代码就是下一篇要讲的另一个RocketMQ优化 *** 作——文件预热了。

👇指路:
源码解析RocketMQ优化(二)——文件预热

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/870573.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存