RocketMQ第五讲

RocketMQ第五讲,第1张

broker是RocketMQ的核心,核心工作就是接收生成这的消息,进行存储。同时,收到消费者的请求后,从磁盘读取内容,把结果返回给消费者。

消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

CommitLog文件中保存了消息的全量内容。不同的Topic的消息,在CommitLog都是顺序存放的。就是来一个消息,不管Topic是什么,直接追加的CommitLog中。

broker启动了一个专门的线程来构建索引,把CommitLog中的消息,构建了两种类型的索引。ConsumerQueue和Index。正常消费的时候,是根据Topic来消费,会用到ConsumerQueue索引。

也可根据返回的offsetMsgId,解析出ip,端口和CommitLog中的物理消息偏移量,直接去CommitLog中取数据。

引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。

其中,ConsumeQueue(逻辑消费队列)作陪前为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。

IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: {fileName},文件名fileName是以创芦慧清建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底碧搭层实现为hash索引。

按照Message Key查询消息的时候,会用到这个索引文件。

IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W 4+2000W 20= 420000040个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入 *** 作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4 500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。20 2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。

“按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

RocketMQ中有两个核心模块,remoting模块和store模块。remoting模块在NameServer,Produce,Consumer和Broker都用到。store只在Broker中用到,包含了存储文件 *** 作的API,对消息实体的 *** 作是通过DefaultMessageStore进行 *** 作。

属性和方法很多,就不往这里放了。

文件存储实现类,包括多个内部类

· 对于文件夹下的一个文件

上面介绍了broker的核心业务流程和架构,关键接口和类,启动流程。最后介绍一下broker的线程模型,只有知道了线程模型,才能大概知道前面介绍的那些事如何协同工作的,对broker才能有一个立体的认识。

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。关于Reactor线程模型,可以看看我之前写的这篇文档: Reactor线程模型

上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务 *** 作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。

上面的图和这段画是从官方文档抄过来的,但是文字和图对应的不是很好,画的也不够详细,但是主要流程是这个样子。以后有时间了,我重新安装自己的理解,画一张更详细的图。

AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,应该是守护线程

FileWatchService:

NettyEventExecutor:

NettyNIOBoss_:一个

NettyServerNIOSelector_:默认为三个

NSScheduledThread:定时任务线程

ServerHouseKeepingService:守护线程

ThreadDeathWatch-2-1:守护线程,Netty用,已经废弃

RemotingExecutorThread(1-8):工作线程池,没有共用NettyServerNIOSelector_,直接初始化8个线程

AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,共九个:

RocketmqBrokerAppender_inner

RocketmqFilterAppender_inner

RocketmqProtectionAppender_inner

RocketmqRemotingAppender_inner

RocketmqRebalanceLockAppender_inner

RocketmqStoreAppender_inner

RocketmqStoreErrorAppender_inner

RocketmqWaterMarkAppender_inner

RocketmqTransactionAppender_inner

SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE

PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE

ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE

QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE

AdminBrokerThread_:remotingServer.registerDefaultProcessor

ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT

HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT

EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION

ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET

brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true)

==================================================================

BrokerControllerScheduledThread:=>

BrokerController.this.getBrokerStats().record()

BrokerController.this.consumerOffsetManager.persist()

BrokerController.this.consumerFilterManager.persist()

BrokerController.this.protectBroker()

BrokerController.this.printWaterMark()

log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes())

BrokerController.this.brokerOuterAPI.fetchNameServerAddr()

BrokerController.this.printMasterAndSlaveDiff()

BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister())

BrokerFastFailureScheduledThread:=>

FilterServerManagerScheduledThread:=>

FilterServerManager.this.createFilterServer()

ClientHousekeepingScheduledThread:=>

ClientHousekeepingService.this.scanExceptionChannel()

PullRequestHoldService

FileWatchService

AllocateMappedFileService

AcceptSocketService

BrokerStatsThread1

RocketMQ的部署,这里不做太多的说明,因为衡历颤官方文档上面写的已经非常清晰了,可以照着官方文档一顿 *** 作,下面为官方文档的地址:

https://github.com/apache/rocketmq/tree/master/docs/cn

RocketMQ的调优其实在官方文档的最烂核佳实践中也写的挺清晰的,可以直接参考官方文档,笔者记录的这主要是自己消化后,自己理解的一些东西。

备注:以下几个参数对所有的中间件都起作用,比如redis、kafka等

该参数有三个值可以选择:0、1、2

"0":在中间件系统申请内存对时候,os内核会检查可用内存是否足够,如果足够的话就分配给你,如果感觉剩余内存不是太够,干脆就直接拒绝申请,从而导致中间件申请内存失败,出现异常。

"1":所有可用的物理内存都允许分配给你,只要有内存就给你用,这样可以避免内存申请失败的问题,一般将这个参数的值调整为1。

"2":表示内核允许分配超过所有物理内存和交换空间总和的内存

该参数影响中间件系统可以开启线程的数量,如果参数的值太少,可能会造成有些中间无法开启足够的线程,从而导致出错,然后使中间件系统挂掉。该咐败参数的默认值为:65536,这个默认值有时候是不够的,建议这个参数值调大10倍,为655360。

该参数是用来控制swap行为的,这个简单的来说,就是os会把一部分磁盘空间作为swap区域,然后如果有的进程现在可能是不太活跃,就会被 *** 作系统把进程调整为睡眠状态,把进程中的数据放入磁盘上的swap区域,然后让该进程原来占有的内存空间腾出来,交给其他活跃的进程来使用。

将该参数的值设置为0 :意思就是尽量别把任何一个进程放到磁盘swap区域,尽量大家都用物理内存。

将该参数的值设置为100 :意思是尽量把一些进程给放到swap区域去,内存腾出来给活跃的进程使用。

默认该参数的值为60 :有点偏高,可能会导致我们的中间件运行不活跃的时候被迫腾出内存空间然后放磁盘swap区域去。因此一般在生产环境建议将该值调小一些,比如10,让进程尽量使用物理内存,别放磁盘swap区域去。

该参数是用来控制linux上的最大文件链接数的,默认值为1024,一般肯定是不够的,因为在大量频繁的读写磁盘文件的时候或进行网络通信的时候,都会和这个参数有关系。如果采用默认值,可能会出现如下错误:error: too many openfiles。

总结:

因为RocketMQ是用java语言编写的所以在启动的时候需要使用虚拟机,所以对JVM进行调优。

在runbroker.sh启动脚本中可以看到如下内容:

对上面参数对解释:

-server :以服务器的模式启动。

-Xms8g -Xmx8g -Xmn4g : 默认的堆大小是8g,新生代是4g,这里根据实际生产服务器的内存大小,然后进行调整,比如:物理机是48g内存,堆内存可以给到20g,新生代给到8g,剩下的一些留给 *** 作系统。

-XX:+UseG1GC -XX:G1HeapRegionSize=16m :选择G1垃圾回收器来做分代回收,对新生代和老年代都用G1回收。这里把G1的region设置为16m,是因为物理内存比较大,如果物理内存不多时,可以设置成2m,设置大,是可以防止region数量过多。region的含义是:G1的各代存储地址是不连续的,每一代都使用了n个不连续的大小相同的Region,每个Region占有一块连续的虚拟内存地址,如下图所示:

在rocketmq/distribution/target/apache-rocketmq/conf/dledger目录下面的配置文件中,可以找到sendMessageThreadPoolNums=16参数,该参数的意思是:RocketMQ内部用来发送消息的线程池的线程数量,默认是16,如果机器的CPU是24核的话,该参数的值可以设置成24或者30

文件删除:1.在store启动时候会源返启动一个后台线程每隔10秒扫描一下文件,把文件删除,删除文件会删commitlog和consumerlog文件,默认文件存储72小时,有是否超过时间,是否内存不足等,如果删除文件正在被引用,会先不删除,记录下时雹御饥间,引用减小1000,

逻辑:1.RocketMQ 顺序写 Commitlog 文件、 ConsumeQueue 文件,所有写 *** 作全部落在最后一个 CommitLog 或 ConsumeQueue 文件上,之前的文件在下一个文件创建后将不会再被更新

2.非当前写文件在一定时间间隔内没有再次被更新, 则认为是过期文件,可以被删除, RocketMQ 不会关注这个文件上的消息是否全部被消费

触发条价:启用拆者文件过期机制并在磁盘 空间不足或默认在凌晨 4 点删除过期文件,


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/12131044.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-21
下一篇 2023-05-21

发表评论

登录后才能评论

评论列表(0条)

保存