我们接着上一篇RocketMQ Broker如何感知Consumer的异常宕机、正常下线?继续聊Broker感知到到Consumer下线后,如何通知剩余的Consumer?
二、正文 1、Consumer接收通知背景:这里在14:37:07这个时间点,同一消费组中的两个消费者挂了一个。
/{user.home}/logs/rocketmqlogs/rocketmq_client.log文件是RocketMQ客户端的日志所在,在其中我们可以看到receive broker's notification这一行日志,字面意思是获取到Broker的通知; 这里我可以看到从一个Consumer下线到其他Consumer感知到大约需要7s左右,这也是为什么说Broker是准实时通知剩余Consumer。
同样的套路,全局搜索receive broker's notification,找到ClientRemotingProcessor#notifyConsumerIdsChanged()方法;
往上走,进入到ClientRemotingProcessor#processRequest()方法中,其用于接收Broker发送的netty调用;当RequestCode为NOTIFY_CONSUMER_IDS_CHANGED时调用notifyConsumerIdsChanged()方法通知Consumer Group的Consumer信息发生变更。
全局搜索NOTIFY_CONSUMER_IDS_CHANGED,找一下哪里构建了RequestCode为NOTIFY_CONSUMER_IDS_CHANGED的请求(RemotingCommand)?如图只有一处:Broker2Client#notifyConsumerIdsChanged()中;
继续向上找,找到DefaultConsumerIdsChangeListener#handle()方法中,当event为CHANGE时,循环调用每个Consumer,通知Consumer Group的Consumers发生变更;
再看一下DefaultConsumerIdsChangeListener是哪里初始化的?
在初始化BrokerController时会初始化DefaultConsumerIdsChangeListener,进而将其聚合到ConsumerManager类中;
我们再看ConsumerManager类如何使用到它的?在ConsumerManager#doChannelCloseEvent()中使用到了它。
RocketMQ Broker 和Consumer/Producer之间采用Netty通信,无论Consumer下线后,Broker通过Netty的通信机制(ChannelInboundHandlerAdapter#channelInactive())可以实时感知到;
当Consumer下线后,会进入到ConsumerManager#doChannelCloseEvent()中,可以参考:RocketMQ Broker如何感知Consumer的异常宕机、正常下线?
1)ConsumerGroupInfo来源在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable;同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。参考:《RocketMQ源码分析》Broker是如何处理心跳
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)