《RocketMQ源码分析》部分Consumer下线后,倒推Broker如何做到准实时通知剩余Consumer?

《RocketMQ源码分析》部分Consumer下线后,倒推Broker如何做到准实时通知剩余Consumer?,第1张

《RocketMQ源码分析》部分Consumer下线后,倒推Broker如何做到准实时通知剩余Consumer? 一、承上启下

我们接着上一篇RocketMQ Broker如何感知Consumer的异常宕机、正常下线?继续聊Broker感知到到Consumer下线后,如何通知剩余的Consumer?

二、正文 1、Consumer接收通知

背景:这里在14:37:07这个时间点,同一消费组中的两个消费者挂了一个。

/{user.home}/logs/rocketmqlogs/rocketmq_client.log文件是RocketMQ客户端的日志所在,在其中我们可以看到receive broker's notification这一行日志,字面意思是获取到Broker的通知; 这里我可以看到从一个Consumer下线到其他Consumer感知到大约需要7s左右,这也是为什么说Broker是准实时通知剩余Consumer。

同样的套路,全局搜索receive broker's notification,找到ClientRemotingProcessor#notifyConsumerIdsChanged()方法;

往上走,进入到ClientRemotingProcessor#processRequest()方法中,其用于接收Broker发送的netty调用;当RequestCode为NOTIFY_CONSUMER_IDS_CHANGED时调用notifyConsumerIdsChanged()方法通知Consumer Group的Consumer信息发生变更。

2、Broker发送通知

全局搜索NOTIFY_CONSUMER_IDS_CHANGED,找一下哪里构建了RequestCode为NOTIFY_CONSUMER_IDS_CHANGED的请求(RemotingCommand)?如图只有一处:Broker2Client#notifyConsumerIdsChanged()中;


继续向上找,找到DefaultConsumerIdsChangeListener#handle()方法中,当event为CHANGE时,循环调用每个Consumer,通知Consumer Group的Consumers发生变更;

再看一下DefaultConsumerIdsChangeListener是哪里初始化的?

在初始化BrokerController时会初始化DefaultConsumerIdsChangeListener,进而将其聚合到ConsumerManager类中;

我们再看ConsumerManager类如何使用到它的?在ConsumerManager#doChannelCloseEvent()中使用到了它。

RocketMQ Broker 和Consumer/Producer之间采用Netty通信,无论Consumer下线后,Broker通过Netty的通信机制(ChannelInboundHandlerAdapter#channelInactive())可以实时感知到;

当Consumer下线后,会进入到ConsumerManager#doChannelCloseEvent()中,可以参考:RocketMQ Broker如何感知Consumer的异常宕机、正常下线?

1)ConsumerGroupInfo来源

在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable;同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。参考:《RocketMQ源码分析》Broker是如何处理心跳

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5682119.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存