授人以渔式源码分析RocketMQ Broker如何感知Consumer的异常宕机、正常下线?

授人以渔式源码分析RocketMQ Broker如何感知Consumer的异常宕机、正常下线?,第1张

授人以渔式源码分析RocketMQ Broker如何感知Consumer的异常宕机、正常下线? 一、前言

在研究RocketMQ Consumer消费超时,进而导致重复消费时,突然想到了一个事情:Broker如何感知Consumer/Producer的异常宕机、下线?以及剩余的Consumer如何快速重新做负载?,原本在理完Broker和Consumer的源码后也得出了一些结论,但总感觉差点意思,于是在本地运行了RocketMQ源码(RocketMQ源码如何在本地运行),并增加了一些控制台输出和日志;本文将结合控制台和日志输出进行保姆级授人以渔式的源码分析。

1)前置知识

如果想要深入研究RocketMQ(完全看懂这篇文章),对于一些类的作用可以参考:
1、RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)
2、RocketMQ:深度剖析Broker启动流程原理、源码
3、深度剖析RocketMQ Consumer start启动流程源码

当然,不乏很多大佬,对一些类很了解。下面我们直接开始正文。

二、思路+源码分析

Consumer的下线分两种情况:正常下线和异常宕机。

1、正常下线(代码中shutDown)

正常下线,最初我个人感觉它有点像kill -15 [pid] 命令或者说是线程池的正常shutDown(),于是推测是否RocketMQ也像它们一样,在程序关闭的时候会有一个钩子函数进入到Consumer的shutdown或者close方法。于是我进入到Consumer的实现类DefaultMQPushConsumerImpl中找到了shutdown()方法。

进入到shutdown()方法内部,有一些停止消息消费服务、负载均衡服务、持久化消息offset等 *** 作,我看到一个mQClientFactory.unregisterConsumer()方法好像很符合我的预期。

再进去看一下,MQClientInstance#unregisterConsumer()它做了什么?

1、因为MQClientInstance是JVM层面唯一的,所以我们要在其consumerTable字段中移除当前消费者。
2、以加锁的方式下线客户端,unregisterClientWithLock()。

public void unregisterConsumer(final String group) {
    this.consumerTable.remove(group);
    // client下线
    this.unregisterClientWithLock(null, group);
}

继续往下走:

// MQClientInstance#unregisterClientWithLock()
private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) {
    try {
        if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                // 下线客户端,点进去
                this.unregisterClient(producerGroup, consumerGroup);
            } catch (Exception e) {
                log.error("unregisterClient exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed. [{}]", this.clientId);
        }
    } catch (InterruptedException e) {
        log.warn("unregisterClientWithLock exception", e);
    }
}

// MQClientInstance#unregisterClient()
private void unregisterClient(final String producerGroup, final String consumerGroup) {
    Iterator>> it = this.brokerAddrTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry> entry = it.next();
        String brokerName = entry.getKey();
        HashMap oneTable = entry.getValue();

        if (oneTable != null) {
            for (Map.Entry entry1 : oneTable.entrySet()) {
                String addr = entry1.getValue();
                if (addr != null) {
                    try {
                        // 点进入
                        this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
                        log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
                    } catch (RemotingException e) {
                        log.error("unregister client exception from broker: " + addr, e);
                    } catch (InterruptedException e) {
                        log.error("unregister client exception from broker: " + addr, e);
                    } catch (MQBrokerException e) {
                        log.error("unregister client exception from broker: " + addr, e);
                    }
                }
            }
        }
    }
}

最后会进入到MQClientApiImpl#unregisterClient()中通过netty调用Broker,告知Broker当前Consumer下线了。

假如我直接就kill -9 或者 服务所在机器突然宕机,RocketMQ怎么处理呢?本来已经打算"鸵鸟算法"pass掉它了,但我太想知道它是怎么搞的了,第二天开始加日志、控制台输出,根据日志输出去源码全局搜索找入口。

2、异常宕机(直接kill掉进程)

在RebalanceImpl#rebalanceByTopic()方法中consumer根据topic做queue的负载均衡时,我们把订阅当前topic下的所有consumer的ID都在控制台输出:

启动两个消费者,Consumer和Consumer2,他们都是study-consumer这个Consumer Group下的消费者。

然后把Consumer停掉之后,看Consumer2控制台的输出:


从控制台的输出来看,在14:37:07这个时间点Consumer2重新进行了负载均衡;接着我们去看一下Broker的日志在这个时间点发生了什么?

2021-12-26 14:37:07 INFO NettyEventExecutor - NETTY EVENT: remove channel[ClientChannelInfo [channel=[id: 0x78a85d90, L:/10.90.66.61:10911 ! R:/10.90.66.61:53550], clientId=10.90.66.61@39280, language=JAVA, version=373, lastUpdateTimestamp=1640500621935]][10.90.66.61:53550] from ProducerManager groupChannelTable, producer group: CLIENT_INNER_PRODUCER
2021-12-26 14:37:07 WARN NettyEventExecutor - NETTY EVENT: remove not active channel[ClientChannelInfo [channel=[id: 0x78a85d90, L:/10.90.66.61:10911 ! R:/10.90.66.61:53550], clientId=10.90.66.61@39280, language=JAVA, version=373, lastUpdateTimestamp=1640500621935]] from ConsumerGroupInfo groupChannelTable, consumer group: study-consumer
2021-12-26 14:37:07 WARN PullMessageThread_13 - The broker's subscription is not latest, group: study-consumer *


从日志上来看,在Consumer宕机的那一刻,Broker就立即感知到了。

假如我们对RocketMQ的源码不太熟悉,怎么搞嘞。我们根据日志输出的关键信息去源码项目中全局搜索,比如这里我们拿remove channel[去源码中搜索:


点进入会进入到ProducerManager#doChannelCloseEvent()方法中:

但是好像不太对,我们是Consumer下线,怎么这里进入到了ProducerManager?先简单一提,看到后大家就通透了;本质上管理Consumer和Producer的下线的ProducerManager和ConsumerManager的doChannelCloseEvent()的入口在同一个地方。假如这个我们也不知道,怎么搞嘞?

Broker的日志输出中有两行日志带remove,我们再全局搜索下一行的NETTY EVENT: remove not active channel[,注意这里也有下线的Consumer客户端信息(clientId=10.90.66.61@39280),这个和我们上面做负载均衡时的控制台输出是可以对上的


我们回来,继续看搜索结果:

点进入会进入到ConsumerGroupInfo#doChannelCloseEvent()方法中:

往上找,看哪里调用了它;果然是在ConsumerManager中,和上面找到的ProducerManager一样。

步入正轨了,我们继续往上找,看看哪里调用了ConsumerManager#doChannelCloseEvent()方法;

有三处都调用了ConsumerManager#doChannelCloseEvent(),感觉有点不太对;遇事莫慌,都先点进去看看。他们都是在ClientHousekeppingService类中被调用的,分别是74、81、88行。对应的方法分别为onChannelClose()、onChannelException()、onChannelIdle(),从命名来看分别是在Channel关闭、异常、闲置的时候做Consumer的doChannelCloseEvent() *** 作。

我们一onChannelClose()为例,来看一下哪里调用了它;

在NettyRemotingAbstract抽象类中有一个内部类NettyEventExecutor,它是一个Runnable实现类(即可执行任务);

1)NettyRemotingAbstract的内部类NettyEventExecutor

NettyEventExecutor的run()方法会不停的从其内部的阻塞队列eventQueue中取出NettyEvent,然后根据NettyEvent的类型进入相应的处理逻辑;对应我们的示例,NettyEvent的type为Close。

下面我们需要找到哪里往eventQueue中放入了type为Close的NettyEvent.

(1)首先我们看一下NettyEventExecutor是在哪里启动的?
全局搜索NettyEventExecutor,发现调用其start()方法的位置有两处,分别是NettyRemotingClient和NettyRemotingServer;对于Consumer而言,Broker是作为Server的存在,所以NettyRemotingServer才是我们需要的。

在NettyRemotingServer#start()方法中调用了NettyEventExecutor#start(),而NettyRemotingServer#start()是在Broker启动的时候调用的,具体流程可以参考:RocketMQ:深度剖析Broker启动流程原理、源码。

(2)在NettyEventExecutor#putNettyEvent()方法中会将event添加到eventQueue中;

在NettyEventExecutor的外部类NettyRemotingAbstract#putNettyEvent()方法中会调用它的putNettyEvent()方法。

继续全局搜索putNettyEvent,找到NettyEventType为NettyEventType.CLOSE的地方,共有三处,一样的我们进入NettyRemotingServer中;

嚯,进入了NettyConnectManageHandler#channelInactive();

这个channelInactive()看着就很眼熟,感觉像是Netty的,它还是一个重写的方法;看一下它重写了谁;淦,是Netty的Channel失活执行方法。

最后,找到NettyConnectManageHandler在哪里放入到Netty的BootStrap中的?嗯,嗯,还是在NettyRemotingServer#start()方法中:

private void prepareSharableHandlers() {
    handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
    encoder = new NettyEncoder();
    connectionManageHandler = new NettyConnectManageHandler();
    serverHandler = new NettyServerHandler();
}
三、总结

RocketMQ Broker 和Consumer/Producer的通信采用Netty,无论Consumer是正常调用Broker下线,还是机器异常宕机下线,都是通过Netty的通信机制使Broker可以实时感知到;异常下线主要依靠Netty的ChannelInboundHandlerAdapter#channelInactive()。

长时间不使用Netty,都忘了channelInactive();希望这篇文章可以帮助更多的人学到看源码、分析问题的思路。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5681398.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存