我们先来看一下RocketMQ的消息存储流程,当消息发送到RocketMQ上时,会被顺序写入CommitLog文件,这样能保证消息存储的高性能和高吞吐量。
但是消息是按照Topic来消费的,如果消费时从CommitLog上查找对应的消息时,会比较慢。为了提高消息消费的效率,RocketMQ会将Topic一样的消息放在ConsumerQueue中,每个ConsumerQueue又分为几个写队列,一个队列一个文件。
假如创建一个名为TopicTest的topic,并创建4个写队列。那么在RocketMQ是通过如下形式存储的
需要注意的是,CommitLog和ComsumerQueue并不是将相同的消息存储了2份。CommitLog存储了消息原始的内容,而ComsumerQueue主要存储了消息在CommitLog中的偏移量,具体的消息格式看下图
borker端存储的消息格式如下
ConsumerQueue中消息的格式如下
根据commitlog offset 和 size 就能从IndexFile中获取到具体的消息内容,而 tag hashcode 用来根据topic+tag消费时过滤消息
从存储图看到还有一个IndexFile和CommitLog也有关系
IndexFile的主要作用就是用来根据Message Key和Unique Key查找对应的消息
IndexFile文件结构如下所示
从图中可以看出,IndexFile主要分为如下3部分,IndexHead,Hash槽,Index条目
IndexHead的格式如下
Hash槽存储的内容为落在该Hash槽内的Index的索引(看后面图示你就会很清楚了)
每个Index条目的格式如下
key的组成有如下两种形式
- Topic#Unique Key
- Topic#Message Key
Unique Key是在producer端发送消息生成的
// DefaultMQProducerImpl#sendKernelImpl if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); }
Message Key是我们在发送消息的时候设置的哈,通常具有业务意义,方便我们快速查找消息
// 指定 topicName,tagName,MessageKey,消息内容,然后发送消息 String messageKey = UUID.randomUUID().toString(); Message message = new Message(TOPIC_NAME, TAG_NAME, messageKey, ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.println(sendResult);
IndexFile构成过程比较麻烦,画图演示一下把,你可以把IndexFile想成基于文件实现的HashMap。
假如说往数组长度为10的HashMap依次放入3条key为11,34,21的数据,HashMap的结构如下
将key为11,34,21的数据放到IndexFile中的过程如下(假如hash槽的数量为10)
具体的过程为
- 将消息顺序放到Index条目中,将11放到index=1的位置(用index[1]表示哈),11%1=1,算出hash槽的位置是1,存的值是0(刚开始都是0,用hash[0]表示),将index[1].preIndexNo=hash[0]=0,hash[0]=1(1为index数组下标哈)
- 将34放到index[2],34%10=4,index[2].preIndexNo=hash[0]=0
- 将21放到index[3],21%10=1,index[3].preIndexNo=hash[1]=1
从图中可以看出来,当发生hash冲突时Index条目的preIndexNo属性充当了链表的作用。查找的过程和HashMap基本类似,先定位到槽的位置,然后顺着链表找就行了。
对具体算法感兴趣的可以看看源码,我就不贴代码了,有点多
// IndexFile的构建过程 org.apache.rocketmq.store.index.IndexFile#putKey // IndexFile的查找过程 org.apache.rocketmq.store.index.IndexFile#selectPhyOffset其他文件
除了上述三种文件外,在rocketmq store文件夹下还有如下几种其他文件
lock:有时候一台机器上会起多个broker,如果数据文件放在一个目录,这时候可以通过锁来提示你使用另一个目录,防止冲突
checkpoint:文件检查点,存储commitLog最后一次刷盘时间戳,consumeQueue最后一次刷盘时间戳,IndexFile最后一次刷盘时间戳
config:运行期间一些配置信息
abort:如果存在abort文件说明Broker非正常关闭,该文件默认启动时创建,正常退出时删除
[1]https://itzones.cn/2019/07/07/RocketMQ%E5%AD%98%E5%82%A8%E6%96%87%E4%BB%B6/
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)