一、MultiTopicsConsumerImpl创建入口上一篇介绍了单个消费者的创建实现,本章介绍多个消费者
public class PulsarClientImpl { publicCompletableFuture > subscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { // 配置的正则 if (conf.getTopicsPattern() != null) { return patternTopicSubscribeAsync(conf, schema, interceptors); } // 单个消费者 else if (conf.getTopicNames().size() == 1) { return singleTopicSubscribeAsync(conf, schema, interceptors); } // 多个 else { return multiTopicSubscribeAsync(conf, schema, interceptors); } } }
继续multiTopicSubscribeAsync(...)
public class PulsarClientImpl { privateCompletableFuture > multiTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { CompletableFuture > consumerSubscribedFuture = new CompletableFuture<>(); // 创建多消费者实现 Consumerbase consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf, externalExecutorProvider, consumerSubscribedFuture, schema, interceptors, true); // 添加多消费者 consumers.add(consumer); return consumerSubscribedFuture; } }
PulsarClient维护的是最外层消费者实现,MultiTopicsConsumerImpl内部又有多个则由它自己维护
可以看到,核心实现都在MultiTopicsConsumerImpl构造中。
继续new MultiTopicsConsumerImpl<>(...)
public class MultiTopicsConsumerImplextends Consumerbase { MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData conf, ExecutorProvider executorProvider, CompletableFuture > subscribeFuture, Schema schema, ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist, MessageId startMessageId, long startMessageRollbackDurationInSec) { // 封装消费者常用的几乎所有api,跟consumerImpl一样 super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture, schema, interceptors); // key:topic value:分区数 this.partitionedTopics = new ConcurrentHashMap<>(); // 具体的消费者 this.consumers = new ConcurrentHashMap<>(); // 挂起的消费者 this.pausedConsumers = new ConcurrentlinkedQueue<>(); // 跟consumerImpl那个不一样,那个是决定什么时候发起一次拉取查询 // 当超过阈值时挂起消费者,低于时恢复 this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; // 统计总分区数 this.allTopicPartitionsNumber = new AtomicInteger(0); // 起始消息id和起始时间 一般都是reader api指定从哪里开始消费 this.startMessageId = startMessageId != null ? new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId)) : null; this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec; // 跟踪未ack的消息 if (conf.getAckTimeoutMillis() != 0) { if (conf.getTickDurationMillis() > 0) { this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis()); } else { this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis()); } } else { this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED; } // 封装用户配置的订阅名和消费者名称 this.internalConfig = getInternalConsumerConfig(); // 数据打印频率 this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl(this) : null; // 分区的自动发现 if (conf.isAutoUpdatePartitions()) { topicsPartitionChangedListener = new TopicsPartitionChangedListener(); partitionsAutoUpdateTimeout = client.timer() .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS); } // 外层校验过了,不为空 if (conf.getTopicNames().isEmpty()) { setState(State.Ready); subscribeFuture().complete(MultiTopicsConsumerImpl.this); return; } // 核心代码 // 创建具体消费者 List > futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t, createTopicIfDoesNotExist)) .collect(Collectors.toList()); // 等待全部创建完成 FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> { // 总分区数比接收队列还大用分区数 if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); } // 更新状态已就绪 setState(State.Ready); // 所有分区消费者创建完成启动拉取 startReceivingMessages(new ArrayList<>(consumers.values())); subscribeFuture().complete(MultiTopicsConsumerImpl.this); }) .exceptionally(ex -> { return null; }); } }
上面代码有2块需要进入分析
1、创建消费者subscribeAsync(t, createTopicIfDoesNotExist)
2、启动startReceivingMessages(new ArrayList<>(consumers.values()));
先分析1,可以看到是循环调用,每个topic都会创建,topic可能是分区的,那么subscribeAsync(t, createTopicIfDoesNotExist)内部肯定会再次循环分区数创建具体消费者。
public class MultiTopicsConsumerImplextends Consumerbase { public CompletableFuture subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) { TopicName topicNameInstance = getTopicName(topicName); String fullTopicName = topicNameInstance.toString(); CompletableFuture subscribeResult = new CompletableFuture<>(); // 查询topic的分区数 client.getPartitionedTopicmetadata(topicName) .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions, createTopicIfDoesNotExist)) .exceptionally(ex1 -> { return null; }); return subscribeResult; } }
拿到分区数后调用subscribeTopicPartitions(...),其内部先处理schema然后调用doSubscribeTopicPartitions(...),直接看doSubscribeTopicPartitions(...)
三、MultiTopicsConsumerImpl中具体消费者的创建public class MultiTopicsConsumerImplextends Consumerbase { private void doSubscribeTopicPartitions(Schema schema, CompletableFuture subscribeResult, String topicName, int numPartitions, boolean createIfDoesNotExist) { List >> futureList; // 上面查询的分区数如果不等于0,这里判断大于0更严谨一点,有些地方是返回的-1。例如TopicName的获取分区数 if (numPartitions != PartitionedTopicmetadata.NON_PARTITIONED) { // 异常场景已存在 boolean isTopicBeingSubscribedForInOtherThread = partitionedTopics.putIfAbsent(topicName, numPartitions) != null; if (isTopicBeingSubscribedForInOtherThread) { return; } // 统计分区数 allTopicPartitionsNumber.addAndGet(numPartitions); // 用户配置的接收队列和分区topic总接收大小比较 int receiverQueueSize = Math.min(conf.getReceiverQueueSize(), conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); // 用于创建具体consumer的配置 ConsumerConfigurationData configurationData = getInternalConsumerConfig(); configurationData.setReceiverQueueSize(receiverQueueSize); // 遍历分区数 futureList = IntStream .range(0, numPartitions) .mapToObj( partitionIndex -> { // 构建分区topic名字,名字尾部是第几个分区 String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); CompletableFuture > subFuture = new CompletableFuture<>(); // 眼熟吗 就是上一篇讲的单个consumer的创建 // 可以看到下面那个true就是hasParentConsumer // 当时讲解说过,用在发起拉取请求时判断 ConsumerImpl newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName, configurationData, client.externalExecutorProvider(), partitionIndex, true, subFuture, startMessageId, schema, interceptors, createIfDoesNotExist, startMessageRollbackDurationInSec); synchronized (pauseMutex) { // 默认不挂起 if (paused) { newConsumer.pause(); } // 具体的消费者保存到MultiTopicsConsumerImpl // 最上面入口说过pulsarClient保存的是MultiTopicsConsumerImpl consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); } return subFuture; }) .collect(Collectors.toList()); } else { // 非分区消费者创建,只有一个所以+1 allTopicPartitionsNumber.incrementAndGet(); CompletableFuture > subFuture = new CompletableFuture<>(); consumers.compute(topicName, (key, existingValue) -> { // 已存在 异常场景 if (existingValue != null) { return existingValue; } else { // 跟上面只有一个区别,构造中传的分区数不同 ConsumerImpl newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig, client.externalExecutorProvider(), -1, true, subFuture, startMessageId, schema, interceptors, createIfDoesNotExist, startMessageRollbackDurationInSec); synchronized (pauseMutex) { if (paused) { newConsumer.pause(); } } return newConsumer; } }); futureList = Collections.singletonList(subFuture); } // 等待所有topic创建完成 FutureUtil.waitForAll(futureList) .thenAccept(finalFuture -> { // 可以看到下面的内容和构造里一样 // 区别:没有更新Ready状态,所以下面的startReceivingMessages不会执行 // 会执行构造的startReceivingMessages,下面会介绍 if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); } startReceivingMessages(consumers.values().stream() .filter(consumer1 -> { String consumerTopicName = consumer1.getTopic(); return TopicName.get(consumerTopicName).getPartitionedTopicName().equals( TopicName.get(topicName).getPartitionedTopicName()); }) .collect(Collectors.toList())); subscribeResult.complete(null); return; }) .exceptionally(ex -> { return null; }); } }
第一点的创建消费者分析完了,开始第二点启动startReceivingMessages(new ArrayList<>(consumers.values()));
private void startReceivingMessages(List> newConsumers) { // 状态是否就绪,可以看上上面构造中等待完成会更新MultiTopicsConsumerImpl状态为Ready if (getState() == State.Ready) { // 遍历具体的消费者 newConsumers.forEach(consumer -> { // 上一篇分析过该方法的实现 // 发送一个拉取请求 consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize()); // 接收消息 internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer)); }); } }
下面的接收消息receiveMessageFromConsumer(consumer));是重重重重重点
本章只介绍多消费者的创建。内容过多读者会晕,因为发送一个拉取请求后,服务端如何处理还没有介绍。
上面接收消息是重点,解析时间:1、先讲完服务端创建消费者 2、然后讲完服务端处理拉取请求 3、数据推送到客户端消费者 4、消费者客户端接收到消息的处理时,在分析这块实现。
下一篇先介绍第1步创建消费者的服务端实现原理。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)