《RockerMQ源码分析》客户端是如何发送心跳到Broker的?含心跳包数据来源

《RockerMQ源码分析》客户端是如何发送心跳到Broker的?含心跳包数据来源,第1张

《RockerMQ源码分析》客户端是如何发送心跳到Broker的?含心跳包数据来源 一、前言

RocketMQ所有的心跳机制:
1)Producer端:

  1. Producer与NameSrv随机建立长连接,定期从NameSrv获取topic路由信息;
  2. Producer与Broker的Master结点建立长连接,用于发送消息;
  3. 此外Producer与Master维持了一个心跳。

2)ConSumer端:

  1. Conumser与NamseSrv随机建立长连接,定期从NameSrv获取topic路由信息;
  2. Consumer与Broker的Master和Slave结点建立长连接,用于订阅消息;
  3. 此外Consumer与Master和slave维持一个心跳。
二、客户端发送心跳

(1)Producer和Consumer通过MQClientInstance的sendHeartbeatToAllBrokerWithLock()方法实现发送心跳请求;

public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
            // 发送心跳包
            this.sendHeartbeatToAllBroker();
            // 上传类过滤器源码
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed. [{}]", this.clientId);
    }
}

我们看到sendHeartbeanToAllBrokerWithLock()方法中在sendHeartbeatToAllBroker()之前加了锁,这是因为点啥嘞?

1、RocketMQ对底层进行通信的MQClientInstance进行了复用,即在同一个jvm里的不同的Consumer下面使用的都是同一个MQClientInstance。
2、既然是复用的,那么就可能存在并发,因此这里进行了上锁 *** 作。
3、所以这里是为了防止心跳错乱。

(2)另外在MQClientInstance启动时会启动会调用startScheduledTask()方法,开始一堆定时任务,其中包括:定期默认每30s发送心跳信息到Broker。

MQClientInstance类源码:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                .......
                
                this.startScheduledTask();
                .......
        }
    }
}

startScheduledTask()方法启动定时任务:

private void startScheduledTask() {
   this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

       @Override
       public void run() {
           try {
               // 清理下线的broker
               MQClientInstance.this.cleanOfflineBroker();
               // 向所有的broker发送心跳
               MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
           } catch (Exception e) {
               log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
           }
       }
   }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
}

我们来看看sendHeartbeatToAllBroker()做了什么?

1、MQClientInstance#sendHeartbeatToAllBroker()

1.准备心跳信息HeartbeatData,如果心跳信息为空,直接返回;
2. 遍历所有的Broker,尝试向所有的Broker发送心跳包;

注意:根据客户端的类型(Producer、Consumer)不同,发送到的Broker对象会又差别。
1、如果启动的是生产者,那么心跳保证消费者的相关信息为空,这时只会向Broker的Mater节点发送心跳;因为RocketMQ中主要Master的Broker才能处理写请求。
2、如果启动的是消费者,则会向所有的Broker发送心跳。

private void sendHeartbeatToAllBroker() {
    // 心跳包--包装类,主要是Producer和Consumer相关信息
    final HeartbeatData heartbeatData = this.prepareHeartbeatData();
    final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
    final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
    // 生产者和消费者数据都为空时
    if (producerEmpty && consumerEmpty) {
        log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
        return;
    }

    // broker列表不为空时
    // todo brokerAddrTable是什么时候初始化的?
    // 1)当topic的路由信息改变后,会往brokerAddrTable中添加数据
    if (!this.brokerAddrTable.isEmpty()) {
        // 统计发送心跳的次数
        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
        // 遍历broker列表
        Iterator>> it = this.brokerAddrTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry> entry = it.next();
            String brokerName = entry.getKey();
            // 获取一个broker地址
            HashMap oneTable = entry.getValue();
            if (oneTable != null) {
                for (Map.Entry entry1 : oneTable.entrySet()) {
                    Long id = entry1.getKey();
                    String addr = entry1.getValue();
                    if (addr != null) {
                        // 消费数据为空 并且 broker不是Mater节点时,不发送心跳。
                        // 因为Producer只需要与Mater维护心跳即可
                        if (consumerEmpty) {
                            // broker不是mater节点
                            if (id != MixAll.MASTER_ID)
                                continue;
                        }

                        try {
                            // 发送心跳
                            // todo MQClientAPIImpl是什么时候初始化的?
                            // 1)实例化MQClientInstance时初始化mQClientAPIImpl
                            int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                            if (!this.brokerVersionTable.containsKey(brokerName)) {
                                this.brokerVersionTable.put(brokerName, new HashMap(4));
                            }
                            this.brokerVersionTable.get(brokerName).put(addr, version);
                            if (times % 20 == 0) {
                                log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                log.info(heartbeatData.toString());
                            }
                        } catch (Exception e) {
                            if (this.isBrokerInNameServer(addr)) {
                                log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
                            } else {
                                log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                        id, addr, e);
                            }
                        }
                    }
                }
            }
        }
    }
}

RocketMQ中客户端和服务端的通信通过Netty实现,这里我们的客户端是Consumer/Producer、服务端是Broker。

我们先看一下RocketMQ是如何准备心跳包数据的?

2、心跳包HeartBeatData

心跳包内容包括:客户端id、生产者信息、消费者信息;

一般情况下生产者信息和消费者信息是互斥的,producerDataSet和consumerDataSet会有一个为空。但如果一个应用既是生产者,也是消费者,那么这种情况下producerDataSet和consumerDataSet都不为空。

public class HeartbeatData extends RemotingSerializable {
    // consumer 客户端ID
    private String clientID;
    
    private Set producerDataSet = new HashSet();
    
    private Set consumerDataSet = new HashSet();
    ......
}

我们分别看一下生产者信息和消费者信息都包括什么?

1)生产者信息ProducerData

不能再简单了,就一个生产者组的名称。

public class ProducerData {
    
    private String groupName;
}
2)消费者信息ConsumerData

ConsumerData消费者信息包括:

  1. groupName
  2. 消费类型:push/pull
  3. 消息传播方式:集群还是广播
  4. 启动消费者时从哪开始消费
  5. 订阅信息SubscriptionData:过滤消息相关标签、SQL规则等。
public class ConsumerData {
    
    private String groupName;
    
    private ConsumeType consumeType;
    
    private MessageModel messageModel;
    
    private ConsumeFromWhere consumeFromWhere;
    
    private Set subscriptionDataSet = new HashSet();
}

在我们日常写代码时,这些属性很常见、经常会配置到。SubscriptionData是我们的消费者订阅信息,
其内容如下:

public class SubscriptionData implements Comparable {
    // 表示订阅该topic下所有类型消息
    public final static String SUB_ALL = "*";
    // 是否开启类过滤模式,默认不开启
    private boolean classFilterMode = false;
    // 订阅的topic
    private String topic;
    // 订阅表达式
    private String subString;
    // 如果是tag过滤模式,这里是tag列表
    private Set tagsSet = new HashSet();
    // 如果是tag过滤模式,这里是tag对应的hashCode列表
    private Set codeSet = new HashSet();
    private long subVersion = System.currentTimeMillis();
    // 表达式类型,有TAG和SQL两种,默认是Tag
    private String expressionType = expressionType.TAG;

    @JSONField(serialize = false)
    // 如果开启了类过滤模式,这里存放过滤类java代码
    private String filterClassSource;
}

这里面,我们平时最常用到的是topic、subString。

// topic , 过滤器 * 表示根据SQL不过滤、TAG-A || TAG-B表示根据TAG过滤
consumer.subscribe("saint-study-topic", "TAG-A || TAG-B");

创建订阅信息的时候,subString会被分割成TAG-A 、TAG-B,然后保存至tagsSet集合里,tag的hashcode会保存到codeSet集合里。

那么心跳包数据是如何组装的?我接着来看。

3、MQClientInstance#prepareHeartbeatData()
private HeartbeatData prepareHeartbeatData() {
    HeartbeatData heartbeatData = new HeartbeatData();

    // clientID
    heartbeatData.setClientID(this.clientId);

    // Consumer
    for (Map.Entry entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            ConsumerData consumerData = new ConsumerData();
            consumerData.setGroupName(impl.groupName());
            consumerData.setConsumeType(impl.consumeType());
            consumerData.setMessageModel(impl.messageModel());
            consumerData.setConsumeFromWhere(impl.consumeFromWhere());
            consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
            consumerData.setUnitMode(impl.isUnitMode());

            heartbeatData.getConsumerDataSet().add(consumerData);
        }
    }

    // Producer
    for (Map.Entry entry : this.producerTable.entrySet()) {
        MQProducerInner impl = entry.getValue();
        if (impl != null) {
            ProducerData producerData = new ProducerData();
            producerData.setGroupName(entry.getKey());

            heartbeatData.getProducerDataSet().add(producerData);
        }
    }

    return heartbeatData;
}
1)以准备Consumer的心跳信息来看:

其遍历MQClientInstance的属性consumerTable.entrySet(),获取到MQConsumerInner信息,然后将其填充到consumerData中。

DefaultMQPushConsumerImplement#start()方法–消费者启动时,会调用MQClientInstance#registerConsumer()方法,将消费者信息(含订阅信息)填充到consumerTable。



咦,这里只是把DefaultMQPushConsumerImpl作为MQConsumerInner传入到了MQClientInstance#registerConsumer,订阅信息在哪里可以看到撒?

往心跳包HeartbeatData的consumerData属性中填充的是MQConsumerInner#subscriptions()方法的返回值。

那我们就看一下DefaultMQPushConsumerImplement#subscriptions()方法:

原来是取的负载均衡服务RebalanceImpl中的subscriptionInner属性,那RebalanceImpl的subscriptionInner属性又是怎么填充的?

既然是订阅信息,会不会和我们的subscribe()订阅 *** 作有关呢?

我们看一下DefaultMQPushConsumerImpl#subscribe()方法:

果然是这样填充的消费者订阅者信息。
下面是组装SubscriptionData订阅信息的代码:

public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
    String subString) throws Exception {
    SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);

    if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
        subscriptionData.setSubString(SubscriptionData.SUB_ALL);
    } else {
        String[] tags = subString.split("\|\|");
        if (tags.length > 0) {
            for (String tag : tags) {
                if (tag.length() > 0) {
                    String trimString = tag.trim();
                    if (trimString.length() > 0) {
                        subscriptionData.getTagsSet().add(trimString);
                        subscriptionData.getCodeSet().add(trimString.hashCode());
                    }
                }
            }
        } else {
            throw new Exception("subString split error");
        }
    }

    return subscriptionData;
}
2)以准备Producer的心跳信息来看:

这里和Consumer的心跳信息来源类似。

其遍历MQClientInstance的属性producerTable.entrySet(),获取到MQProducerInner信息,然后将其填充到producerData中。

DefaultMQProducerImpl#start()方法 生产者启动时,会调用MQClientInstance#registerProducer()方法,将生产者信息填充到producerTable中。


我们接着看一下MQClientInstance#sendHearbeat()是如何发送心跳的?

4、MQClientInstance#sendHearbeat()

1、封装请求,包括:请求编码为HEART_BEAT、编程语言为Java、心跳包HeartBeatData;
2、调用远程服务类NettyRemotingClient通过Netty发送心跳。

@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            // 发送心跳前,执行的钩子函数
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // 发送心跳
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            // 发送心跳后,执行的钩子函数
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

对于RocketMQ是如何通过Netty通信的,不是本文的重点,后续专文分析。

三、总结

以上所有分析相关的源码注释请见GitHub中的release-4.8.0分支:https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5596733.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存