目录
二. 生产者发送消息
一. 生产者Producer启动
MQ发消息之前先启动生产者。设置生产者组并且启动生产者。
Broker既是生产者又是消费者,生产者启动的时候会启动两个生产者,一个是进程ID+时间另一个是内部生产者CLIENT_INNER_PRODUCER。
紧接着是启动内部的生产者,预先启动Broker Netty的客户端;启动一系列的定时任务;消费者去拉取消息;消费者端的负载均衡。
二. 生产者同步发送消息RocketMQ底层是默认同步发送,根据项目的例子跟进代码,进入到DefaultMQProducerImpl类中。默认超时时间3秒,同步发消息是没有回调的。
首先获取Topic-Publish就是Topic发布关系。第一次不使用默认的Topic也就是TBW102,远程从Name-Srv去获取发布的Topic,命令是GET_ROUTEINFO_BY_TOPIC。而Name-Srv的Topic则是Broker每30秒去上报的,刚开始启用的使用默认Topic配置去发送消息并放在缓存中设置读写队列为4这是持久化Topic信息。Broker上报的时候则是去缓存QueueData也是Topic的信息的读写数量为4。因此可以得知在Name-Srv中1个Topic对应多个QueueData。
因此第一次的获取Topic-Publish的实体TopicPublishInfo是为null,使用的是默认TBW102这个队列的配置来创建TopicPublishInfo,创建生产者和消费者分别与Topic的订阅关系。因此可以得知在Broker中1个Topic对应多个MessageQueue。
发送消息有发送重试机制,同步重试3次,异步和单向则只发1次没有重试。随机选一个MessageQueue基于底层Netty来发送消息。
组装发送消息的请求。
根据Broker的地址进行Netty底层的连接。
RocketMq是在发送消息的时候才进行对Netty Server的连接,并且维护channel。addr为null则是从Name-Srv获取。
同步请求需要一个CountDownLatch来上锁解锁返回一个ResponseFuture。
Netty-Server接受到数据获取对应命令的Processor来处理消息发送逻辑。
Broker处理消息的流程在SendMessageProcessor中,异步将消息存储在CommitLog中。
对于Topic的创建是什么时候呢?Topic是如何持久存储的呢?他们的读写队列又分别是多少?
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)