RocketMQ源码解析:RocketMQ是如何存储消息的?

RocketMQ源码解析:RocketMQ是如何存储消息的?,第1张

RocketMQ源码解析:RocketMQ是如何存储消息的?

Broker端使用到的文件


我们先来看一下RocketMQ的消息存储流程,当消息发送到RocketMQ上时,会被顺序写入CommitLog文件,这样能保证消息存储的高性能和高吞吐量。

但是消息是按照Topic来消费的,如果消费时从CommitLog上查找对应的消息时,会比较慢。为了提高消息消费的效率,RocketMQ会将Topic一样的消息放在ConsumerQueue中,每个ConsumerQueue又分为几个写队列,一个队列一个文件。

假如创建一个名为TopicTest的topic,并创建4个写队列。那么在RocketMQ是通过如下形式存储的

需要注意的是,CommitLog和ComsumerQueue并不是将相同的消息存储了2份。CommitLog存储了消息原始的内容,而ComsumerQueue主要存储了消息在CommitLog中的偏移量,具体的消息格式看下图

borker端存储的消息格式如下

内容解释长度TOTALSIZE消息总长度4字节MAGICCODE魔术,固定值Oxdaa320a74字节BODYCRC消息crc校验码4字节QUEUEID消息队列id4字节FLAG消息flag,供应用程序使用4字节QUEUEOFFSET消息在消费队列的偏移量8字节PHYSICALOFFSET消息在CommitLog文件中的偏移量8字节SYSFLAG消息系统flag,例如是否压缩,是否是事务消息等4字节BORNTIMESTAMP生产者调用消息发送API的时间戳8字节BORNHOST消息发送者ip,端口号8字节STORETIMESTAMP消息存储时间戳8字节STOREHOSTADDRESSBroker服务器ip+端口号8字节RECONSUMETIMES消息重试次数4字节Prepared Transaction Offset事务消息物理偏移量8字节BodyLength消息体长度4字节Body消息体内容BodyLength字节TopicLengthtopic长度,1字节,即主题名称不能超过255个字符1字节Topic主题TopicLength字节PropertiesLength消息属性长度2字节Properties消息属性PropertiesLength字节

ConsumerQueue中消息的格式如下

根据commitlog offset 和 size 就能从IndexFile中获取到具体的消息内容,而 tag hashcode 用来根据topic+tag消费时过滤消息

从存储图看到还有一个IndexFile和CommitLog也有关系

IndexFile的主要作用就是用来根据Message Key和Unique Key查找对应的消息

IndexFile文件结构如下所示


从图中可以看出,IndexFile主要分为如下3部分,IndexHead,Hash槽,Index条目

IndexHead的格式如下

字段解释beginTimestamp消息的最小存储时间endTimestamp消息的最大存储时间beginPhyOffset消息的最小偏移量(commitLog文件中的偏移量)endPhyOffset消息的最大偏移量(commitLog文件中的偏移量)hashSlotCounthash槽个数indexCountindex条目当前已使用的个数

Hash槽存储的内容为落在该Hash槽内的Index的索引(看后面图示你就会很清楚了)

每个Index条目的格式如下

字段解释hashcodekey的hashcodephyoffset消息的偏移量(commitLog文件中的偏移量)timedif该消息存储时间与第一条消息的时间戳的差值,小于0该消息无效pre index no该条目的前一条记录的Index索引,当hash冲突时,用来构建链表

key的组成有如下两种形式

  1. Topic#Unique Key
  2. Topic#Message Key

Unique Key是在producer端发送消息生成的

// DefaultMQProducerImpl#sendKernelImpl
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}

Message Key是我们在发送消息的时候设置的哈,通常具有业务意义,方便我们快速查找消息

// 指定 topicName,tagName,MessageKey,消息内容,然后发送消息
String messageKey = UUID.randomUUID().toString();
Message message = new Message(TOPIC_NAME, TAG_NAME, messageKey,
        ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);

IndexFile构成过程比较麻烦,画图演示一下把,你可以把IndexFile想成基于文件实现的HashMap。

假如说往数组长度为10的HashMap依次放入3条key为11,34,21的数据,HashMap的结构如下

将key为11,34,21的数据放到IndexFile中的过程如下(假如hash槽的数量为10)

具体的过程为

  1. 将消息顺序放到Index条目中,将11放到index=1的位置(用index[1]表示哈),11%1=1,算出hash槽的位置是1,存的值是0(刚开始都是0,用hash[0]表示),将index[1].preIndexNo=hash[0]=0,hash[0]=1(1为index数组下标哈)
  2. 将34放到index[2],34%10=4,index[2].preIndexNo=hash[0]=0
  3. 将21放到index[3],21%10=1,index[3].preIndexNo=hash[1]=1

从图中可以看出来,当发生hash冲突时Index条目的preIndexNo属性充当了链表的作用。查找的过程和HashMap基本类似,先定位到槽的位置,然后顺着链表找就行了。

对具体算法感兴趣的可以看看源码,我就不贴代码了,有点多

// IndexFile的构建过程
org.apache.rocketmq.store.index.IndexFile#putKey

// IndexFile的查找过程
org.apache.rocketmq.store.index.IndexFile#selectPhyOffset
其他文件

除了上述三种文件外,在rocketmq store文件夹下还有如下几种其他文件

lock:有时候一台机器上会起多个broker,如果数据文件放在一个目录,这时候可以通过锁来提示你使用另一个目录,防止冲突
checkpoint:文件检查点,存储commitLog最后一次刷盘时间戳,consumeQueue最后一次刷盘时间戳,IndexFile最后一次刷盘时间戳
config:运行期间一些配置信息
abort:如果存在abort文件说明Broker非正常关闭,该文件默认启动时创建,正常退出时删除

源码解析 参考博客

[1]https://itzones.cn/2019/07/07/RocketMQ%E5%AD%98%E5%82%A8%E6%96%87%E4%BB%B6/

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5479494.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存