RocketMQ(二): 消息发送流程

RocketMQ(二): 消息发送流程,第1张

RocketMQ(二): 消息发送流程

目录

一. 生产者Producer启动

二. 生产者发送消息 


一. 生产者Producer启动

MQ发消息之前先启动生产者。设置生产者组并且启动生产者。

Broker既是生产者又是消费者,生产者启动的时候会启动两个生产者,一个是进程ID+时间另一个是内部生产者CLIENT_INNER_PRODUCER。

 紧接着是启动内部的生产者,预先启动Broker Netty的客户端;启动一系列的定时任务;消费者去拉取消息;消费者端的负载均衡。

二. 生产者同步发送消息 

RocketMQ底层是默认同步发送,根据项目的例子跟进代码,进入到DefaultMQProducerImpl类中。默认超时时间3秒,同步发消息是没有回调的。

首先获取Topic-Publish就是Topic发布关系。第一次不使用默认的Topic也就是TBW102,远程从Name-Srv去获取发布的Topic,命令是GET_ROUTEINFO_BY_TOPIC。而Name-Srv的Topic则是Broker每30秒去上报的,刚开始启用的使用默认Topic配置去发送消息并放在缓存中设置读写队列为4这是持久化Topic信息。Broker上报的时候则是去缓存QueueData也是Topic的信息的读写数量为4。因此可以得知在Name-Srv中1个Topic对应多个QueueData。

因此第一次的获取Topic-Publish的实体TopicPublishInfo是为null,使用的是默认TBW102这个队列的配置来创建TopicPublishInfo,创建生产者和消费者分别与Topic的订阅关系。因此可以得知在Broker中1个Topic对应多个MessageQueue。

发送消息有发送重试机制,同步重试3次,异步和单向则只发1次没有重试。随机选一个MessageQueue基于底层Netty来发送消息。

 组装发送消息的请求。

 根据Broker的地址进行Netty底层的连接。

RocketMq是在发送消息的时候才进行对Netty Server的连接,并且维护channel。addr为null则是从Name-Srv获取。

 同步请求需要一个CountDownLatch来上锁解锁返回一个ResponseFuture。

Netty-Server接受到数据获取对应命令的Processor来处理消息发送逻辑。

Broker处理消息的流程在SendMessageProcessor中,异步将消息存储在CommitLog中。 

对于Topic的创建是什么时候呢?Topic是如何持久存储的呢?他们的读写队列又分别是多少?

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701798.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存