整体的流程如下图,其中为了简便没画 Name Server(也就是消息的中转,都是通过Name Server 再确认消息要放到哪个broker 的哪个topic分片上)。
producer完全无状态,可以集群部署。
首先分析一下RocketMQ的客户端发送消息的源码:
//构造Producer DefaultMQProducer producer = new DefaultMQProducer("producerGroupName"); //初始化Producer,整个应用生命周期内,只需要初始化一次 producer.start(); // 构造Message Message msg = new Message("Topic_A",//topic "Taga",//tag:给消息打标签,用于区分一类消息,可为null "OrderId123",//key:自定义key,可用于去重,可以null ("Hello RocketMQ").getBytes(StandardCharsets.UTF_8));//body:消息内容 //发送消息并返回结果 SendResult sendResult = producer.send(msg); //清理资源,关闭网络连接,注销自己 producer.shutdown();
在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:
- 如果没有指定namesrv地址,将会自动寻址启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳…启动负载均衡的服务
初始化完成后,开始发送消息,发送消息的主要代码如下:
代码中需要关注的两个方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。
如果Producer发送消息失败,会自动重试,重试的策略:
- 重试次数 < retryTimesWhenSendFailed(可配置)总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)同时满足上面两个条件后,Producer会选择另外一个队列发送消息
有如下两种:
一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。
但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。
消费端的Push模式是通过长轮询的模式来实现的,就如同下图:
Push模式原理:
消费模式Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。
当然,Consumer端是通过一个线程将阻塞队列linkedBlockingQueue中的PullRequest发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest时,如果发现没有消息,就会把PullRequest扔到ConcurrentHashMap中缓存起来。
broker在启动时,会启动一个线程不停的从ConcurrentHashMap取出PullRequest检查,直到有数据返回。
消费集群:使用相同 Group ID 的订阅者属于同一个集群。同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点。
集群消费:当使用集群消费模式时,消息队列 RocketMQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有 9 条消息,其中一个Consumer Group有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。
如下图:
广播消费:当使用广播消费模式时,消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。在广播消费中的Consumer Group概念可以认为在消息划分方面无意义。
如下图:
注意:每条消息都需要被相同逻辑的多台机器处理。
使用集群模式模拟广播
如果业务需要使用广播模式,也可以创建多个 Group ID,用于订阅同一个 Topic。
消费端的负载均衡注意:和上面的广播消费相比,每条消息都需要被多台机器处理,但每台机器的逻辑可以相同也可以不一样
消费端会通过RebalanceService线程,20秒钟做一次基于topic下的所有队列负载:
遍历Consumer下的所有topic,然后根据topic订阅所有的消息获取同一topic和Consumer Group下的所有Consumer然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)