Pulsar源码解析-服务端-消费者创建的底层实现

Pulsar源码解析-服务端-消费者创建的底层实现,第1张

Pulsar源码解析-服务端-消费者创建的底层实现

上一篇介绍的是消费者客户端的创建,本章解析服务端创建

一、消费者服务端创建入口

由于代码量较多,省略非重点

public class ServerCnx {

protected void handleSubscribe(final CommandSubscribe subscribe) {
		// 参数含义客户端消费者构造中都介绍过
        final long requestId = subscribe.getRequestId();
        final long consumerId = subscribe.getConsumerId();
        TopicName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe);
        final String subscriptionName = subscribe.getSubscription();
        final SubType subType = subscribe.getSubType();
        final String consumerName = subscribe.hasConsumerName() ? subscribe.getConsumerName() : "";
        final boolean isDurable = subscribe.isDurable();
        final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
                subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
                subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
                : null;
        final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
        final boolean readCompacted = subscribe.hasReadCompacted() && subscribe.isReadCompacted();
        final Map metadata = CommandUtils.metadataFromCommand(subscribe);
        final InitialPosition initialPosition = subscribe.getInitialPosition();
        final long startMessageRollbackDurationSec = subscribe.hasStartMessageRollbackDurationSec()
                ? subscribe.getStartMessageRollbackDurationSec()
                : -1;
        final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
        final boolean isReplicated = subscribe.hasReplicateSubscriptionState()
                && subscribe.isReplicateSubscriptionState();
        final boolean forceTopicCreation = subscribe.isForceTopicCreation();
        final KeySharedmeta keySharedmeta = subscribe.hasKeySharedmeta()
              ? new KeySharedmeta().copyFrom(subscribe.getKeySharedmeta())
              : emptyKeySharedmeta;
		// 查询权限
        CompletableFuture isAuthorizedFuture = isTopicOperationAllowed(
                topicName,
                subscriptionName,
                TopicOperation.ConSUME
        );
        // 权限校验
        isAuthorizedFuture.thenApply(isAuthorized -> {
            if (isAuthorized) {
                        CompletableFuture consumerFuture = new CompletableFuture<>();
                        // 服务端保存消费者实例
                        CompletableFuture existingConsumerFuture = consumers.putIfAbsent(consumerId,
                                consumerFuture);
						// 已存在且已创建完成直接返回
                        if (existingConsumerFuture != null) {
                            if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                                Consumer consumer = existingConsumerFuture.getNow(null);
                                commandSender.sendSuccessResponse(requestId);
                                return null;
                            } else {
                                return null;
                            }
                        }
						// forceTopicCreation是客户端传的是否自动创建
						// isAllowAutoTopicCreation是broker.conf配置的是否自动创建
                        boolean createTopicIfDoesNotExist = forceTopicCreation
                                && service.isAllowAutoTopicCreation(topicName.toString());
						// 之前讲producer服务端创建时也有这么一行
						// service.getOrCreateTopic(topicName.toString())
						// 本质一样,创建topic
						// 所以可以得出:消费者或生产者创建时都会创建topic,具体创建过程看生产者中的介绍
                        service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
                                .thenCompose(optTopic -> {
                                    Topic topic = optTopic.get();
                                    // 没有创建成功 例如不允许自动创建订阅
                                    // broker.conf中配置,默认允许
                                    boolean rejectSubscriptionIfDoesNotExist = isDurable
                                        && !service.isAllowAutoSubscriptionCreation(topicName.toString())
                                        && !topic.getSubscriptions().containsKey(subscriptionName);

                                    if (rejectSubscriptionIfDoesNotExist) {
                                        return 异常...
                                    }
									// schema校验
                                    if (schema != null) {
                                        return topic.addSchemaIfIdleOrCheckCompatible(schema)
                                                .thenCompose(v -> 
                                                // 创建订阅
                                                topic.subscribe(
                                                        ServerCnx.this, subscriptionName, consumerId,
                                                        subType, priorityLevel, consumerName, isDurable,
                                                        startMessageId, metadata,
                                                        readCompacted, initialPosition, startMessageRollbackDurationSec,
                                                        isReplicated, keySharedmeta));
                                    } else {
                                        return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                            subType, priorityLevel, consumerName, isDurable,
                                            startMessageId, metadata, readCompacted, initialPosition,
                                            startMessageRollbackDurationSec, isReplicated, keySharedmeta);
                                    }
                                })
                                .thenAccept(consumer -> {
                                	// 响应客户端
                                	// 就是之前讲的消费者客户端的打开连接创建消费者那里
                                	// 更新消费者客户端状态Ready,如果是单消费者会发起拉取请求
                                    if (consumerFuture.complete(consumer)) {
                                        commandSender.sendSuccessResponse(requestId);
                                    } else {
                                        try {
                                            consumer.close();
                                        } catch (BrokerServiceException e) {
                                        }
                                        consumers.remove(consumerId, consumerFuture);
                                    }

                                })
                                .exceptionally(exception -> {
                                });
                    } else {
                        ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
                    }
                    return null;
        }).exceptionally(ex -> {
            return null;
        });
    }
}
二、Subscription创建

重点在topic.subscribe(...)

public class PersistentTopic {

    public CompletableFuture subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
                                                 SubType subType, int priorityLevel, String consumerName,
                                                 boolean isDurable, MessageId startMessageId,
                                                 Map metadata, boolean readCompacted,
                                                 InitialPosition initialPosition,
                                                 long startMessageRollbackDurationSec,
                                                 boolean replicatedSubscriptionState,
                                                 KeySharedmeta keySharedmeta) {

        final CompletableFuture future = new CompletableFuture<>();
        if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) {
            SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
                    cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);
            if (subscribeRateLimiter.isPresent() && (!subscribeRateLimiter.get().subscribeAvailable(consumer)
                    || !subscribeRateLimiter.get().tryAcquire(consumer))) {
                return future;
            }
        }

        lock.readLock().lock();
        try {
            if (isFenced) {
                return future;
            }
            // 统计消费者
            handleConsumerAdded(subscriptionName, consumerName);
        } finally {
            lock.readLock().unlock();
        }

		// 创建订阅
		// initialPosition是从最新的还是最老的开始消费,默认最新
		// startMessageRollbackDurationSec 配置的时间,默认0
		// replicatedSubscriptionState是否复制 默认false,消费者客户端配置
        CompletableFuture subscriptionFuture = isDurable ? //
                getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
                        replicatedSubscriptionState)
                : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
                startMessageRollbackDurationSec);

        int maxUnackedMessages = isDurable
                ? getMaxUnackedMessagesOnConsumer()
                : 0;

        subscriptionFuture.thenAccept(subscription -> {
        	// 创建消费者
            Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
                    maxUnackedMessages, cnx, cnx.getAuthRole(), metadata,
                    readCompacted, initialPosition, keySharedmeta, startMessageId);
                    // 把消费者添加到订阅中
                    // 可以发现,订阅才是外层,管理具体消费者实例
            addConsumerToSubscription(subscription, consumer).thenAccept(v -> {
                checkBackloggedCursors();
                if (!cnx.isActive()) {
                    // 异常
                } else {
                	// 对等集群复制
                    checkReplicatedSubscriptionControllerState();
                    future.complete(consumer);
                }
            }).exceptionally(e -> {
                return null;
            });
        }).exceptionally(ex -> {
            return null;
        });

        return future;
    }
}

我们只分析最重要最常用的持久化订阅
继续getDurableSubscription

private CompletableFuture getDurableSubscription(String subscriptionName,
            InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated) {
        CompletableFuture subscriptionFuture = new CompletableFuture<>();
        // 是否超过topic最大创建的订阅数量
        if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
            return subscriptionFuture;
        }

        Map properties = PersistentSubscription.getbaseCursorProperties(replicated);
		// 由打开一个Cursor
        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
            @Override
            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
				// 创建持久化订阅实例
                PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
                        name -> createPersistentSubscription(subscriptionName, cursor, replicated));

                if (replicated && !subscription.isReplicated()) {
                    subscription.setReplicated(replicated);
                }
				// 重置起始读、删除位置
                if (startMessageRollbackDurationSec > 0) {
                    resetSubscriptionCursor(subscription, subscriptionFuture, startMessageRollbackDurationSec);
                } else {
                    subscriptionFuture.complete(subscription);
                }
            }

            @Override
            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                decrementUsageCount();
                subscriptionFuture.completeExceptionally(new PersistenceException(exception));
                if (exception instanceof ManagedLedgerFencedException) {
                    close();
                }
            }
        }, null);
        return subscriptionFuture;
    }

ledger.asyncOpenCursor是创建ManagedCursor和数据初始化等
ManagedCursor从ManagedLedger中读数据,提供了各种读的api。
ManagedCursor和ManagedLedger底层细节专题单独讲,目前只需知道它们的作用和关系。
目前我们需要了解消息如何发送,如何接收,这个过程中分别在客户端和服务端如何实现的

我只需要知道在这里完成了Subscription创建

Subscription是根据订阅名称1对1创建,不同订阅名是广播消费,消费者名称生成的是consumer实例。Subscription根据consumer构建成Dispatcher进行消息分发,Dispatcher由订阅类型决定。

下一章介绍Subscription拉取请求的实现。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5681092.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存