Pulsar源码解析-客户端-多消费者MultiTopicsConsumerImpl创建底层实现

Pulsar源码解析-客户端-多消费者MultiTopicsConsumerImpl创建底层实现,第1张

Pulsar源码解析-客户端-多消费者MultiTopicsConsumerImpl创建底层实现

上一篇介绍了单个消费者的创建实现,本章介绍多个消费者

一、MultiTopicsConsumerImpl创建入口
public class PulsarClientImpl {

public  CompletableFuture> subscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) {
        // 配置的正则
        if (conf.getTopicsPattern() != null) {
            return patternTopicSubscribeAsync(conf, schema, interceptors);
        } 
      	// 单个消费者
		else if (conf.getTopicNames().size() == 1) {
            return singleTopicSubscribeAsync(conf, schema, interceptors);
        } 
        // 多个
        else {
            return multiTopicSubscribeAsync(conf, schema, interceptors);
        }
    }
}

继续multiTopicSubscribeAsync(...)

public class PulsarClientImpl {

    private  CompletableFuture> multiTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) {
        CompletableFuture> consumerSubscribedFuture = new CompletableFuture<>();

		// 创建多消费者实现
        Consumerbase consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
                externalExecutorProvider, consumerSubscribedFuture, schema, interceptors,
                true);
		// 添加多消费者
        consumers.add(consumer);

        return consumerSubscribedFuture;
    }
}

PulsarClient维护的是最外层消费者实现,MultiTopicsConsumerImpl内部又有多个则由它自己维护
可以看到,核心实现都在MultiTopicsConsumerImpl构造中。
继续new MultiTopicsConsumerImpl<>(...)

二、MultiTopicsConsumerImpl构造
public class MultiTopicsConsumerImpl extends Consumerbase {

MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData conf,
            ExecutorProvider executorProvider, CompletableFuture> subscribeFuture, Schema schema,
            ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist, MessageId startMessageId,
            long startMessageRollbackDurationInSec) {
            // 封装消费者常用的几乎所有api,跟consumerImpl一样
        super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
                schema, interceptors);
                // key:topic value:分区数
        this.partitionedTopics = new ConcurrentHashMap<>();
        // 具体的消费者
        this.consumers = new ConcurrentHashMap<>();
        // 挂起的消费者
        this.pausedConsumers = new ConcurrentlinkedQueue<>();
        // 跟consumerImpl那个不一样,那个是决定什么时候发起一次拉取查询
        // 当超过阈值时挂起消费者,低于时恢复
        this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
        // 统计总分区数
        this.allTopicPartitionsNumber = new AtomicInteger(0);
        // 起始消息id和起始时间 一般都是reader api指定从哪里开始消费
        this.startMessageId = startMessageId != null ? new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId)) : null;
        this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
		// 跟踪未ack的消息
        if (conf.getAckTimeoutMillis() != 0) {
            if (conf.getTickDurationMillis() > 0) {
                this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
            } else {
                this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
            }
        } else {
            this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
        }
		// 封装用户配置的订阅名和消费者名称
        this.internalConfig = getInternalConsumerConfig();
        // 数据打印频率
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl(this) : null;
		// 分区的自动发现
        if (conf.isAutoUpdatePartitions()) {
            topicsPartitionChangedListener = new TopicsPartitionChangedListener();
            partitionsAutoUpdateTimeout = client.timer()
                .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
        }
		// 外层校验过了,不为空
        if (conf.getTopicNames().isEmpty()) {
            setState(State.Ready);
            subscribeFuture().complete(MultiTopicsConsumerImpl.this);
            return;
        }
        // 核心代码
        // 创建具体消费者
        List> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
                .collect(Collectors.toList());
                // 等待全部创建完成
        FutureUtil.waitForAll(futures)
            .thenAccept(finalFuture -> {
            	// 总分区数比接收队列还大用分区数
                if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
                    setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
                }
                // 更新状态已就绪
                setState(State.Ready);

                // 所有分区消费者创建完成启动拉取
                startReceivingMessages(new ArrayList<>(consumers.values()));
                subscribeFuture().complete(MultiTopicsConsumerImpl.this);
            })
            .exceptionally(ex -> {
                return null;
            });
    }
}

上面代码有2块需要进入分析
1、创建消费者subscribeAsync(t, createTopicIfDoesNotExist)
2、启动startReceivingMessages(new ArrayList<>(consumers.values()));

先分析1,可以看到是循环调用,每个topic都会创建,topic可能是分区的,那么subscribeAsync(t, createTopicIfDoesNotExist)内部肯定会再次循环分区数创建具体消费者。

public class MultiTopicsConsumerImpl extends Consumerbase {

public CompletableFuture subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) {
        TopicName topicNameInstance = getTopicName(topicName);
        String fullTopicName = topicNameInstance.toString();

        CompletableFuture subscribeResult = new CompletableFuture<>();
		// 查询topic的分区数
        client.getPartitionedTopicmetadata(topicName)
                .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions,
                    createTopicIfDoesNotExist))
                .exceptionally(ex1 -> {
                    return null;
                });

        return subscribeResult;
    }
}

拿到分区数后调用subscribeTopicPartitions(...),其内部先处理schema然后调用doSubscribeTopicPartitions(...),直接看doSubscribeTopicPartitions(...)

三、MultiTopicsConsumerImpl中具体消费者的创建
public class MultiTopicsConsumerImpl extends Consumerbase {

private void doSubscribeTopicPartitions(Schema schema,
                                            CompletableFuture subscribeResult, String topicName, int numPartitions,
            boolean createIfDoesNotExist) {

        List>> futureList;
        // 上面查询的分区数如果不等于0,这里判断大于0更严谨一点,有些地方是返回的-1。例如TopicName的获取分区数
        if (numPartitions != PartitionedTopicmetadata.NON_PARTITIONED) {
			// 异常场景已存在
            boolean isTopicBeingSubscribedForInOtherThread =
                    partitionedTopics.putIfAbsent(topicName, numPartitions) != null;
            if (isTopicBeingSubscribedForInOtherThread) {
                return;
            }
            // 统计分区数
            allTopicPartitionsNumber.addAndGet(numPartitions);
			// 用户配置的接收队列和分区topic总接收大小比较
            int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
                conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions);
             // 用于创建具体consumer的配置   
            ConsumerConfigurationData configurationData = getInternalConsumerConfig();
            configurationData.setReceiverQueueSize(receiverQueueSize);
			// 遍历分区数
            futureList = IntStream
                .range(0, numPartitions)
                .mapToObj(
                    partitionIndex -> {
                    	// 构建分区topic名字,名字尾部是第几个分区
                        String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
                        CompletableFuture> subFuture = new CompletableFuture<>();
                        // 眼熟吗 就是上一篇讲的单个consumer的创建
                        // 可以看到下面那个true就是hasParentConsumer
                        // 当时讲解说过,用在发起拉取请求时判断
                        ConsumerImpl newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
                                    configurationData, client.externalExecutorProvider(),
                                    partitionIndex, true, subFuture,
                                    startMessageId, schema, interceptors,
                                    createIfDoesNotExist, startMessageRollbackDurationInSec);
                        synchronized (pauseMutex) {
                        	// 默认不挂起
                            if (paused) {
                                newConsumer.pause();
                            }
                            // 具体的消费者保存到MultiTopicsConsumerImpl
                            // 最上面入口说过pulsarClient保存的是MultiTopicsConsumerImpl
                            consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                        }
                        return subFuture;
                    })
                .collect(Collectors.toList());
        } else {
        	// 非分区消费者创建,只有一个所以+1
            allTopicPartitionsNumber.incrementAndGet();

            CompletableFuture> subFuture = new CompletableFuture<>();

            consumers.compute(topicName, (key, existingValue) -> {
            	// 已存在 异常场景
                if (existingValue != null) {
                    return existingValue;
                } else {
                	// 跟上面只有一个区别,构造中传的分区数不同
                    ConsumerImpl newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
                            client.externalExecutorProvider(), -1,
                            true, subFuture, startMessageId, schema, interceptors,
                            createIfDoesNotExist, startMessageRollbackDurationInSec);

                    synchronized (pauseMutex) {
                        if (paused) {
                            newConsumer.pause();
                        }
                    }
                    return newConsumer;
                }
            });

            futureList = Collections.singletonList(subFuture);
        }
		// 等待所有topic创建完成
        FutureUtil.waitForAll(futureList)
            .thenAccept(finalFuture -> {
            	// 可以看到下面的内容和构造里一样
            	// 区别:没有更新Ready状态,所以下面的startReceivingMessages不会执行
            	// 会执行构造的startReceivingMessages,下面会介绍
                if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
                    setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
                }

                startReceivingMessages(consumers.values().stream()
                                .filter(consumer1 -> {
                                    String consumerTopicName = consumer1.getTopic();
                                    return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
                                            TopicName.get(topicName).getPartitionedTopicName());
                                })
                                .collect(Collectors.toList()));

                subscribeResult.complete(null);
                return;
            })
            .exceptionally(ex -> {
                return null;
            });
    }
}

第一点的创建消费者分析完了,开始第二点启动startReceivingMessages(new ArrayList<>(consumers.values()));

    private void startReceivingMessages(List> newConsumers) {
    	// 状态是否就绪,可以看上上面构造中等待完成会更新MultiTopicsConsumerImpl状态为Ready
        if (getState() == State.Ready) {
        	// 遍历具体的消费者
            newConsumers.forEach(consumer -> {
            	// 上一篇分析过该方法的实现
            	// 发送一个拉取请求
                consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
                // 接收消息
                internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
            });
        }
    }

下面的接收消息receiveMessageFromConsumer(consumer));是重重重重重点
本章只介绍多消费者的创建。内容过多读者会晕,因为发送一个拉取请求后,服务端如何处理还没有介绍。
上面接收消息是重点,解析时间:1、先讲完服务端创建消费者 2、然后讲完服务端处理拉取请求 3、数据推送到客户端消费者 4、消费者客户端接收到消息的处理时,在分析这块实现。

下一篇先介绍第1步创建消费者的服务端实现原理。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5681809.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存