在研究RocketMQ Consumer消费超时,进而导致重复消费时,突然想到了一个事情:Broker如何感知Consumer/Producer的异常宕机、下线?以及剩余的Consumer如何快速重新做负载?,原本在理完Broker和Consumer的源码后也得出了一些结论,但总感觉差点意思,于是在本地运行了RocketMQ源码(RocketMQ源码如何在本地运行),并增加了一些控制台输出和日志;本文将结合控制台和日志输出进行保姆级授人以渔式的源码分析。
1)前置知识如果想要深入研究RocketMQ(完全看懂这篇文章),对于一些类的作用可以参考:
1、RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)
2、RocketMQ:深度剖析Broker启动流程原理、源码
3、深度剖析RocketMQ Consumer start启动流程源码
当然,不乏很多大佬,对一些类很了解。下面我们直接开始正文。
二、思路+源码分析Consumer的下线分两种情况:正常下线和异常宕机。
1、正常下线(代码中shutDown)正常下线,最初我个人感觉它有点像kill -15 [pid] 命令或者说是线程池的正常shutDown(),于是推测是否RocketMQ也像它们一样,在程序关闭的时候会有一个钩子函数进入到Consumer的shutdown或者close方法。于是我进入到Consumer的实现类DefaultMQPushConsumerImpl中找到了shutdown()方法。
进入到shutdown()方法内部,有一些停止消息消费服务、负载均衡服务、持久化消息offset等 *** 作,我看到一个mQClientFactory.unregisterConsumer()方法好像很符合我的预期。
再进去看一下,MQClientInstance#unregisterConsumer()它做了什么?
1、因为MQClientInstance是JVM层面唯一的,所以我们要在其consumerTable字段中移除当前消费者。
2、以加锁的方式下线客户端,unregisterClientWithLock()。
public void unregisterConsumer(final String group) { this.consumerTable.remove(group); // client下线 this.unregisterClientWithLock(null, group); }
继续往下走:
// MQClientInstance#unregisterClientWithLock() private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) { try { if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { // 下线客户端,点进去 this.unregisterClient(producerGroup, consumerGroup); } catch (Exception e) { log.error("unregisterClient exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed. [{}]", this.clientId); } } catch (InterruptedException e) { log.warn("unregisterClientWithLock exception", e); } } // MQClientInstance#unregisterClient() private void unregisterClient(final String producerGroup, final String consumerGroup) { Iterator>> it = this.brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry > entry = it.next(); String brokerName = entry.getKey(); HashMap oneTable = entry.getValue(); if (oneTable != null) { for (Map.Entry entry1 : oneTable.entrySet()) { String addr = entry1.getValue(); if (addr != null) { try { // 点进入 this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000); log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr); } catch (RemotingException e) { log.error("unregister client exception from broker: " + addr, e); } catch (InterruptedException e) { log.error("unregister client exception from broker: " + addr, e); } catch (MQBrokerException e) { log.error("unregister client exception from broker: " + addr, e); } } } } } }
最后会进入到MQClientApiImpl#unregisterClient()中通过netty调用Broker,告知Broker当前Consumer下线了。
假如我直接就kill -9 或者 服务所在机器突然宕机,RocketMQ怎么处理呢?本来已经打算"鸵鸟算法"pass掉它了,但我太想知道它是怎么搞的了,第二天开始加日志、控制台输出,根据日志输出去源码全局搜索找入口。
在RebalanceImpl#rebalanceByTopic()方法中consumer根据topic做queue的负载均衡时,我们把订阅当前topic下的所有consumer的ID都在控制台输出:
启动两个消费者,Consumer和Consumer2,他们都是study-consumer这个Consumer Group下的消费者。
然后把Consumer停掉之后,看Consumer2控制台的输出:
从控制台的输出来看,在14:37:07这个时间点Consumer2重新进行了负载均衡;接着我们去看一下Broker的日志在这个时间点发生了什么?
2021-12-26 14:37:07 INFO NettyEventExecutor - NETTY EVENT: remove channel[ClientChannelInfo [channel=[id: 0x78a85d90, L:/10.90.66.61:10911 ! R:/10.90.66.61:53550], clientId=10.90.66.61@39280, language=JAVA, version=373, lastUpdateTimestamp=1640500621935]][10.90.66.61:53550] from ProducerManager groupChannelTable, producer group: CLIENT_INNER_PRODUCER 2021-12-26 14:37:07 WARN NettyEventExecutor - NETTY EVENT: remove not active channel[ClientChannelInfo [channel=[id: 0x78a85d90, L:/10.90.66.61:10911 ! R:/10.90.66.61:53550], clientId=10.90.66.61@39280, language=JAVA, version=373, lastUpdateTimestamp=1640500621935]] from ConsumerGroupInfo groupChannelTable, consumer group: study-consumer 2021-12-26 14:37:07 WARN PullMessageThread_13 - The broker's subscription is not latest, group: study-consumer *
从日志上来看,在Consumer宕机的那一刻,Broker就立即感知到了。
假如我们对RocketMQ的源码不太熟悉,怎么搞嘞。我们根据日志输出的关键信息去源码项目中全局搜索,比如这里我们拿remove channel[去源码中搜索:
点进入会进入到ProducerManager#doChannelCloseEvent()方法中:
但是好像不太对,我们是Consumer下线,怎么这里进入到了ProducerManager?先简单一提,看到后大家就通透了;本质上管理Consumer和Producer的下线的ProducerManager和ConsumerManager的doChannelCloseEvent()的入口在同一个地方。假如这个我们也不知道,怎么搞嘞?
Broker的日志输出中有两行日志带remove,我们再全局搜索下一行的NETTY EVENT: remove not active channel[,注意这里也有下线的Consumer客户端信息(clientId=10.90.66.61@39280),这个和我们上面做负载均衡时的控制台输出是可以对上的
我们回来,继续看搜索结果:
点进入会进入到ConsumerGroupInfo#doChannelCloseEvent()方法中:
往上找,看哪里调用了它;果然是在ConsumerManager中,和上面找到的ProducerManager一样。
步入正轨了,我们继续往上找,看看哪里调用了ConsumerManager#doChannelCloseEvent()方法;
有三处都调用了ConsumerManager#doChannelCloseEvent(),感觉有点不太对;遇事莫慌,都先点进去看看。他们都是在ClientHousekeppingService类中被调用的,分别是74、81、88行。对应的方法分别为onChannelClose()、onChannelException()、onChannelIdle(),从命名来看分别是在Channel关闭、异常、闲置的时候做Consumer的doChannelCloseEvent() *** 作。
我们一onChannelClose()为例,来看一下哪里调用了它;
在NettyRemotingAbstract抽象类中有一个内部类NettyEventExecutor,它是一个Runnable实现类(即可执行任务);
NettyEventExecutor的run()方法会不停的从其内部的阻塞队列eventQueue中取出NettyEvent,然后根据NettyEvent的类型进入相应的处理逻辑;对应我们的示例,NettyEvent的type为Close。
下面我们需要找到哪里往eventQueue中放入了type为Close的NettyEvent.
(1)首先我们看一下NettyEventExecutor是在哪里启动的?
全局搜索NettyEventExecutor,发现调用其start()方法的位置有两处,分别是NettyRemotingClient和NettyRemotingServer;对于Consumer而言,Broker是作为Server的存在,所以NettyRemotingServer才是我们需要的。
在NettyRemotingServer#start()方法中调用了NettyEventExecutor#start(),而NettyRemotingServer#start()是在Broker启动的时候调用的,具体流程可以参考:RocketMQ:深度剖析Broker启动流程原理、源码。
(2)在NettyEventExecutor#putNettyEvent()方法中会将event添加到eventQueue中;
在NettyEventExecutor的外部类NettyRemotingAbstract#putNettyEvent()方法中会调用它的putNettyEvent()方法。
继续全局搜索putNettyEvent,找到NettyEventType为NettyEventType.CLOSE的地方,共有三处,一样的我们进入NettyRemotingServer中;
嚯,进入了NettyConnectManageHandler#channelInactive();
这个channelInactive()看着就很眼熟,感觉像是Netty的,它还是一个重写的方法;看一下它重写了谁;淦,是Netty的Channel失活执行方法。
最后,找到NettyConnectManageHandler在哪里放入到Netty的BootStrap中的?嗯,嗯,还是在NettyRemotingServer#start()方法中:
private void prepareSharableHandlers() { handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode); encoder = new NettyEncoder(); connectionManageHandler = new NettyConnectManageHandler(); serverHandler = new NettyServerHandler(); }三、总结
RocketMQ Broker 和Consumer/Producer的通信采用Netty,无论Consumer是正常调用Broker下线,还是机器异常宕机下线,都是通过Netty的通信机制使Broker可以实时感知到;异常下线主要依靠Netty的ChannelInboundHandlerAdapter#channelInactive()。
长时间不使用Netty,都忘了channelInactive();希望这篇文章可以帮助更多的人学到看源码、分析问题的思路。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)