系列文章:
源码解析RocketMQ优化(一)——内存预映射机制
源码解析RocketMQ优化(二)——文件预热
文章目录参考:
RocketMQ源码解析-零拷贝 - 掘金 (juejin.cn)
rocketMQ零拷贝+kafka零拷贝+netty零拷贝分析 - 知乎 (zhihu.com)
- 具体实现
- 源码
经过之前的博客RocketMQ原理详解(一)——零拷贝机制,已知 RocketMQ 使用 mmap 方式对 page cache 进行了内存拷贝。但是当遇到OS进行脏页回写,内存回收,内存swap等情况时,就会引起较大的消息读写延迟。这又该怎么办呢?
其实 RocketMQ 结合 mmap 和 page cache 进行了一些优化,这篇就先说说内存预映射机制
内存预映射机制 ,就是 RocketMQ 预分配 MappedFile,这样在下次获取时候直接返回 MappedFile 实例而不用等待 MappedFile 创建分配。
-
RocketMQ 会维护一个 MappedFileQueue 队列,在消息写入过程中(调用 CommitLog 的
putMessage()
方法),CommitLog 会先从 MappedFileQueue 中获取队列中最后一个 MappedFile,如果没有则新建一个。 -
这里 MappedFile 的创建过程是:将构建好的一个 AllocateRequest 请求(具体做法是,将下一个文件的路径、下下个文件的路径、文件大小为参数封装为 AllocateRequest 对象)添加至队列中,后台运行的 AllocateMappedFileService 服务线程(在Broker启动时,该线程就会创建并运行),会不停地run,只要请求队列里存在请求,就会去执行MappedFile映射文件的创建和预分配工作,分配的时候有两种策略(具体采用哪种策略,也与刷盘的方式有关):
a. 使用 Mmap 的方式来构建 MappedFile 实例
b. 从 TransientStorePool 堆外内存池中获取相应的 DirectByteBuffer 来构建 MappedFile
这样,在建分配完下个MappedFile后,AllocateMappedFileService 还会将下下个 MappedFile 预先创建并保存至请求队列中等待下次获取时直接返回。下次获取时候直接返回就可以不用等待 MappedFile 创建分配所产生的时间延迟。
源码CommitLog 会先从 MappedFileQueue 中获取队列中最后一个 MappedFile
org/apache/rocketmq/store/CommitLog #asyncPutMessages
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
......
try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
......
}
......
}
org/apache/rocketmq/store/MappedFileQueue#getLastMappedFile
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
// 如果没有则新建一个
return tryCreateMappedFile(createOffset);
}
return mappedFileLast;
}
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
新建 下个MappedFile 和 下下个 MappedFile
org/apache/rocketmq/store/MappedFileQueue#tryCreateMappedFile
protected MappedFile tryCreateMappedFile(long createOffset) {
// 生成下一个文件的路径、下下个文件的路径
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
// 创建这两个路径对应的MappedFile
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
// 调用allocateMappedFileService去创建 下个MappedFile 和 下下个MappedFile
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
将构建好的一个 AllocateRequest 请求(具体做法是,将下一个文件的路径、下下个文件的路径、文件大小为参数封装为 AllocateRequest 对象)添加至队列中
org/apache/rocketmq/store/AllocateMappedFileService#putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
}
}
//创建 新建下一个MappedFile 的AllocateRequest请求
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextFilePath);
return null;
}
//添加进AllocateRequest请求队列
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
canSubmitRequests--;
}
// 创建 新建下下个MappedFile 的AllocateRequest请求
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextNextFilePath);
} else {
//添加进AllocateRequest请求队列
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
后台运行的 AllocateMappedFileService 服务线程(在Broker启动时,该线程就会创建并运行),会不停地run,只要请求队列里存在请求,就会去执行MappedFile映射文件的创建和预分配工作。
org/apache/rocketmq/store/AllocateMappedFileService#run
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
org/apache/rocketmq/store/AllocateMappedFileService#mmapOperation
/**
* Only interrupted by the external thread, will return false
*/
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
// 从队列里取出AllocateRequest请求
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
//新建mappedFile
MappedFile mappedFile;
//分配的时候有两种策略
//1.从 TransientStorePool 堆外内存池中获取相应的 DirectByteBuffer 来构建 MappedFile
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
//2.使用 Mmap 的方式来构建 MappedFile 实例
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
if (elapsedTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}
// pre write mappedFile
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}
这里有一段pre write mappedFile
的代码就是下一篇要讲的另一个RocketMQ优化 *** 作——文件预热了。
👇指路:
源码解析RocketMQ优化(二)——文件预热
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)