RocketMQ所有的心跳机制:
1)Producer端:
- Producer与NameSrv随机建立长连接,定期从NameSrv获取topic路由信息;
- Producer与Broker的Master结点建立长连接,用于发送消息;
- 此外Producer与Master维持了一个心跳。
2)ConSumer端:
二、客户端发送心跳
- Conumser与NamseSrv随机建立长连接,定期从NameSrv获取topic路由信息;
- Consumer与Broker的Master和Slave结点建立长连接,用于订阅消息;
- 此外Consumer与Master和slave维持一个心跳。
(1)Producer和Consumer通过MQClientInstance的sendHeartbeatToAllBrokerWithLock()方法实现发送心跳请求;
public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { // 发送心跳包 this.sendHeartbeatToAllBroker(); // 上传类过滤器源码 this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed. [{}]", this.clientId); } }
我们看到sendHeartbeanToAllBrokerWithLock()方法中在sendHeartbeatToAllBroker()之前加了锁,这是因为点啥嘞?
1、RocketMQ对底层进行通信的MQClientInstance进行了复用,即在同一个jvm里的不同的Consumer下面使用的都是同一个MQClientInstance。
2、既然是复用的,那么就可能存在并发,因此这里进行了上锁 *** 作。
3、所以这里是为了防止心跳错乱。
(2)另外在MQClientInstance启动时会启动会调用startScheduledTask()方法,开始一堆定时任务,其中包括:定期默认每30s发送心跳信息到Broker。
MQClientInstance类源码:
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: ....... this.startScheduledTask(); ....... } } }
startScheduledTask()方法启动定时任务:
private void startScheduledTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 清理下线的broker MQClientInstance.this.cleanOfflineBroker(); // 向所有的broker发送心跳 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); }
我们来看看sendHeartbeatToAllBroker()做了什么?
1、MQClientInstance#sendHeartbeatToAllBroker()1.准备心跳信息HeartbeatData,如果心跳信息为空,直接返回;
2. 遍历所有的Broker,尝试向所有的Broker发送心跳包;注意:根据客户端的类型(Producer、Consumer)不同,发送到的Broker对象会又差别。
1、如果启动的是生产者,那么心跳保证消费者的相关信息为空,这时只会向Broker的Mater节点发送心跳;因为RocketMQ中主要Master的Broker才能处理写请求。
2、如果启动的是消费者,则会向所有的Broker发送心跳。
private void sendHeartbeatToAllBroker() { // 心跳包--包装类,主要是Producer和Consumer相关信息 final HeartbeatData heartbeatData = this.prepareHeartbeatData(); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); // 生产者和消费者数据都为空时 if (producerEmpty && consumerEmpty) { log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId); return; } // broker列表不为空时 // todo brokerAddrTable是什么时候初始化的? // 1)当topic的路由信息改变后,会往brokerAddrTable中添加数据 if (!this.brokerAddrTable.isEmpty()) { // 统计发送心跳的次数 long times = this.sendHeartbeatTimesTotal.getAndIncrement(); // 遍历broker列表 Iterator>> it = this.brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry > entry = it.next(); String brokerName = entry.getKey(); // 获取一个broker地址 HashMap oneTable = entry.getValue(); if (oneTable != null) { for (Map.Entry entry1 : oneTable.entrySet()) { Long id = entry1.getKey(); String addr = entry1.getValue(); if (addr != null) { // 消费数据为空 并且 broker不是Mater节点时,不发送心跳。 // 因为Producer只需要与Mater维护心跳即可 if (consumerEmpty) { // broker不是mater节点 if (id != MixAll.MASTER_ID) continue; } try { // 发送心跳 // todo MQClientAPIImpl是什么时候初始化的? // 1)实例化MQClientInstance时初始化mQClientAPIImpl int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); if (!this.brokerVersionTable.containsKey(brokerName)) { this.brokerVersionTable.put(brokerName, new HashMap (4)); } this.brokerVersionTable.get(brokerName).put(addr, version); if (times % 20 == 0) { log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); log.info(heartbeatData.toString()); } } catch (Exception e) { if (this.isBrokerInNameServer(addr)) { log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e); } else { log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr, e); } } } } } } } }
RocketMQ中客户端和服务端的通信通过Netty实现,这里我们的客户端是Consumer/Producer、服务端是Broker。
我们先看一下RocketMQ是如何准备心跳包数据的?
2、心跳包HeartBeatData心跳包内容包括:客户端id、生产者信息、消费者信息;
一般情况下生产者信息和消费者信息是互斥的,producerDataSet和consumerDataSet会有一个为空。但如果一个应用既是生产者,也是消费者,那么这种情况下producerDataSet和consumerDataSet都不为空。
public class HeartbeatData extends RemotingSerializable { // consumer 客户端ID private String clientID; private SetproducerDataSet = new HashSet (); private Set consumerDataSet = new HashSet (); ...... }
我们分别看一下生产者信息和消费者信息都包括什么?
1)生产者信息ProducerData不能再简单了,就一个生产者组的名称。
public class ProducerData { private String groupName; }2)消费者信息ConsumerData
ConsumerData消费者信息包括:
- groupName
- 消费类型:push/pull
- 消息传播方式:集群还是广播
- 启动消费者时从哪开始消费
- 订阅信息SubscriptionData:过滤消息相关标签、SQL规则等。
public class ConsumerData { private String groupName; private ConsumeType consumeType; private MessageModel messageModel; private ConsumeFromWhere consumeFromWhere; private SetsubscriptionDataSet = new HashSet (); }
在我们日常写代码时,这些属性很常见、经常会配置到。SubscriptionData是我们的消费者订阅信息,
其内容如下:
public class SubscriptionData implements Comparable{ // 表示订阅该topic下所有类型消息 public final static String SUB_ALL = "*"; // 是否开启类过滤模式,默认不开启 private boolean classFilterMode = false; // 订阅的topic private String topic; // 订阅表达式 private String subString; // 如果是tag过滤模式,这里是tag列表 private Set tagsSet = new HashSet (); // 如果是tag过滤模式,这里是tag对应的hashCode列表 private Set codeSet = new HashSet (); private long subVersion = System.currentTimeMillis(); // 表达式类型,有TAG和SQL两种,默认是Tag private String expressionType = expressionType.TAG; @JSONField(serialize = false) // 如果开启了类过滤模式,这里存放过滤类java代码 private String filterClassSource; }
这里面,我们平时最常用到的是topic、subString。
// topic , 过滤器 * 表示根据SQL不过滤、TAG-A || TAG-B表示根据TAG过滤 consumer.subscribe("saint-study-topic", "TAG-A || TAG-B");
创建订阅信息的时候,subString会被分割成TAG-A 、TAG-B,然后保存至tagsSet集合里,tag的hashcode会保存到codeSet集合里。
那么心跳包数据是如何组装的?我接着来看。
3、MQClientInstance#prepareHeartbeatData()private HeartbeatData prepareHeartbeatData() { HeartbeatData heartbeatData = new HeartbeatData(); // clientID heartbeatData.setClientID(this.clientId); // Consumer for (Map.Entry1)以准备Consumer的心跳信息来看:entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { ConsumerData consumerData = new ConsumerData(); consumerData.setGroupName(impl.groupName()); consumerData.setConsumeType(impl.consumeType()); consumerData.setMessageModel(impl.messageModel()); consumerData.setConsumeFromWhere(impl.consumeFromWhere()); consumerData.getSubscriptionDataSet().addAll(impl.subscriptions()); consumerData.setUnitMode(impl.isUnitMode()); heartbeatData.getConsumerDataSet().add(consumerData); } } // Producer for (Map.Entry entry : this.producerTable.entrySet()) { MQProducerInner impl = entry.getValue(); if (impl != null) { ProducerData producerData = new ProducerData(); producerData.setGroupName(entry.getKey()); heartbeatData.getProducerDataSet().add(producerData); } } return heartbeatData; }
其遍历MQClientInstance的属性consumerTable.entrySet(),获取到MQConsumerInner信息,然后将其填充到consumerData中。
DefaultMQPushConsumerImplement#start()方法–消费者启动时,会调用MQClientInstance#registerConsumer()方法,将消费者信息(含订阅信息)填充到consumerTable。
咦,这里只是把DefaultMQPushConsumerImpl作为MQConsumerInner传入到了MQClientInstance#registerConsumer,订阅信息在哪里可以看到撒?
往心跳包HeartbeatData的consumerData属性中填充的是MQConsumerInner#subscriptions()方法的返回值。
那我们就看一下DefaultMQPushConsumerImplement#subscriptions()方法:
原来是取的负载均衡服务RebalanceImpl中的subscriptionInner属性,那RebalanceImpl的subscriptionInner属性又是怎么填充的?
既然是订阅信息,会不会和我们的subscribe()订阅 *** 作有关呢?
我们看一下DefaultMQPushConsumerImpl#subscribe()方法:
果然是这样填充的消费者订阅者信息。
下面是组装SubscriptionData订阅信息的代码:
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\|\|"); if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); if (trimString.length() > 0) { subscriptionData.getTagsSet().add(trimString); subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { throw new Exception("subString split error"); } } return subscriptionData; }2)以准备Producer的心跳信息来看:
这里和Consumer的心跳信息来源类似。
其遍历MQClientInstance的属性producerTable.entrySet(),获取到MQProducerInner信息,然后将其填充到producerData中。
DefaultMQProducerImpl#start()方法 生产者启动时,会调用MQClientInstance#registerProducer()方法,将生产者信息填充到producerTable中。
我们接着看一下MQClientInstance#sendHearbeat()是如何发送心跳的?
4、MQClientInstance#sendHearbeat()1、封装请求,包括:请求编码为HEART_BEAT、编程语言为Java、心跳包HeartBeatData;
2、调用远程服务类NettyRemotingClient通过Netty发送心跳。
@Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { // 发送心跳前,执行的钩子函数 doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } // 发送心跳 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); // 发送心跳后,执行的钩子函数 doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
对于RocketMQ是如何通过Netty通信的,不是本文的重点,后续专文分析。
三、总结以上所有分析相关的源码注释请见GitHub中的release-4.8.0分支:https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)