消息队列RocketMQ入门实践--消息存储(五)

消息队列RocketMQ入门实践--消息存储(五),第1张

系列文章目录

消息队列RocketMQ入门实践(一)
消息队列RocketMQ入门实践(二)
消息队列RocketMQ入门实践–关键特性(三)
消息队列RocketMQ入门实践–关键特性(四)


文章目录
  • 系列文章目录
  • 前言
  • 一、消息存储整体架构
  • 二、刷盘策略
    • 2.1 同步刷盘
    • 2.2 异步刷盘
    • 2.3 broker配置文件中指定刷盘方式
  • 三、刷盘原理
    • 3.1 零拷贝
    • 3.2 零拷贝实现的方式
  • 总结


前言

嗨,大家好,我是希留。

消息存储是RocketMQ中最为复杂和最为重要的一部分,本文将从消息存储整体架构以及刷盘策略来聊一聊RocketMQ是如何进行消息存储的。


一、消息存储整体架构

引用官方的一张设计图来说明下MQ的存储设计

消息存储架构图中主要有下面三个跟消息存储相关的文件构成

  • (1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

  • (2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

  • (3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME/store/index{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小:40+500W4+2000W20=420000040个字节大小,约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

上面是官方文档的详细介绍,不是特别好理解。我们可以这样去理解:

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,CommitLog是真正存储数据的文件,ConsumeQueue是索引文件,存储数据指向到物理文件的配置。


如上图所示:

  • 消息主体以及元数据都存储在CommitLog当中
  • Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。
  • 每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去commitLog中拿到消息主体。
二、刷盘策略

RocketMQ 的所有消息都是持久化的,先写入系统 页缓存(PageCache),然后写入磁盘,可以保证内存和磁盘都有一份数据,访问时,直接从内存读取。

为了提高性能, 会尽可能地保证磁盘的顺序写。消息在通过Producer 写入 RocketMQ 的时候,有两种写磁盘方式,分别是同步刷盘与异步刷盘。

2.1 同步刷盘


如上图所示,只有在消息真正持久化至磁盘后,RocketMQ的Broker端才会真
正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。

同步刷盘流程如下:

  • (1). 写入 PAGECACHE 后,线程等待,通知刷盘线程刷盘。
  • (2). 刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。
  • (3). 前端等待线程向用户返回成功。
2.2 异步刷盘


如上图所示,在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写 *** 作的返回快,吞吐量大。当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

这种方式,能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

2.3 broker配置文件中指定刷盘方式
flushDiskType=ASYNC_FLUSH -- 异步
flushDiskType=SYNC_FLUSH -- 同步
三、刷盘原理

RocketMQ实现消息刷盘采用了零拷贝技术。那什么又是零拷贝技术呢?

3.1 零拷贝

零拷贝是指计算机执行IO *** 作时,CPU不需要将数据从一个存储区域复制到另一个存储区域,进而减少上下文切换以及CPU的拷贝时间。它是一种IO *** 作优化技术。

零拷贝主要的任务就是避免CPU将数据从一块存储拷贝到另外一块存储,避免让CPU做大量的数据拷贝任务,减少不必要的拷贝,或者让别的组件来做这一类简单的数据传输任务,让CPU解脱出来专注于别的任务。这样就可以让系统资源的利用更加有效。

做后端开发的小伙伴应该都做过文件下载类的功能,我们以文件下载为例,服务端的主要任务是:将服务端主机磁盘中的文件不做修改地从已连接的socket发出去。 *** 作系统底层I/O过程如下图所示:

从上图可以看出,过程共产生了四次数据拷贝,在此过程中,我们没有对文件内容做任何修改,那么在内核空间和用户空间来回拷贝数据无疑就是一种浪费,而零拷贝主要就是为了解决这种低效性。

3.2 零拷贝实现的方式

零拷贝包含以下两种方式:

  • 使用 mmap + write 方式
    优点:即使频繁调用,使用小块文件传输,效率也很高
    缺点:不能很好地利用 DMA 方式,会比 sendfile 多消耗 CPU,内存安全性控制复杂,需要避免 JVM Crash问题。

  • 使用 sendfile 方式
    优点:可以利用 DMA 方式,消耗 CPU 较少,大块文件传输效率高,无内存安全新问题。
    缺点:小块文件效率低亍 mmap 方式,只能是 BIO 方式传输,不能使用 NIO。

RocketMQ 选择了第一种方式,mmap+write 方式,因为有小块数据传输的需求,效果会比 sendfile 更好。

实现过程如下图所示:

磁盘上的数据会通过DMA被拷贝的内核缓冲区,接着 *** 作系统会把这段内核缓冲区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用write(), *** 作系统直接将内核缓冲区的内容拷贝到socket缓冲区中,这一切都发生在内核态,最后,socket缓冲区再把数据发到网卡去。


总结

感谢大家的阅读,以上就是今天要讲的内容,本文介绍了RocketMQ的消息存储结构以及刷盘方式和实现原理。

若觉得本文对您有帮助的话,帮忙点赞评论关注,支持一波哟~

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/735909.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-27
下一篇 2022-04-27

发表评论

登录后才能评论

评论列表(0条)

保存