一、消费者服务端创建入口上一篇介绍的是消费者客户端的创建,本章解析服务端创建
由于代码量较多,省略非重点
public class ServerCnx { protected void handleSubscribe(final CommandSubscribe subscribe) { // 参数含义客户端消费者构造中都介绍过 final long requestId = subscribe.getRequestId(); final long consumerId = subscribe.getConsumerId(); TopicName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe); final String subscriptionName = subscribe.getSubscription(); final SubType subType = subscribe.getSubType(); final String consumerName = subscribe.hasConsumerName() ? subscribe.getConsumerName() : ""; final boolean isDurable = subscribe.isDurable(); final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl( subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex()) : null; final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0; final boolean readCompacted = subscribe.hasReadCompacted() && subscribe.isReadCompacted(); final Map二、Subscription创建metadata = CommandUtils.metadataFromCommand(subscribe); final InitialPosition initialPosition = subscribe.getInitialPosition(); final long startMessageRollbackDurationSec = subscribe.hasStartMessageRollbackDurationSec() ? subscribe.getStartMessageRollbackDurationSec() : -1; final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null; final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.isReplicateSubscriptionState(); final boolean forceTopicCreation = subscribe.isForceTopicCreation(); final KeySharedmeta keySharedmeta = subscribe.hasKeySharedmeta() ? new KeySharedmeta().copyFrom(subscribe.getKeySharedmeta()) : emptyKeySharedmeta; // 查询权限 CompletableFuture isAuthorizedFuture = isTopicOperationAllowed( topicName, subscriptionName, TopicOperation.ConSUME ); // 权限校验 isAuthorizedFuture.thenApply(isAuthorized -> { if (isAuthorized) { CompletableFuture consumerFuture = new CompletableFuture<>(); // 服务端保存消费者实例 CompletableFuture existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture); // 已存在且已创建完成直接返回 if (existingConsumerFuture != null) { if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) { Consumer consumer = existingConsumerFuture.getNow(null); commandSender.sendSuccessResponse(requestId); return null; } else { return null; } } // forceTopicCreation是客户端传的是否自动创建 // isAllowAutoTopicCreation是broker.conf配置的是否自动创建 boolean createTopicIfDoesNotExist = forceTopicCreation && service.isAllowAutoTopicCreation(topicName.toString()); // 之前讲producer服务端创建时也有这么一行 // service.getOrCreateTopic(topicName.toString()) // 本质一样,创建topic // 所以可以得出:消费者或生产者创建时都会创建topic,具体创建过程看生产者中的介绍 service.getTopic(topicName.toString(), createTopicIfDoesNotExist) .thenCompose(optTopic -> { Topic topic = optTopic.get(); // 没有创建成功 例如不允许自动创建订阅 // broker.conf中配置,默认允许 boolean rejectSubscriptionIfDoesNotExist = isDurable && !service.isAllowAutoSubscriptionCreation(topicName.toString()) && !topic.getSubscriptions().containsKey(subscriptionName); if (rejectSubscriptionIfDoesNotExist) { return 异常... } // schema校验 if (schema != null) { return topic.addSchemaIfIdleOrCheckCompatible(schema) .thenCompose(v -> // 创建订阅 topic.subscribe( ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedmeta)); } else { return topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedmeta); } }) .thenAccept(consumer -> { // 响应客户端 // 就是之前讲的消费者客户端的打开连接创建消费者那里 // 更新消费者客户端状态Ready,如果是单消费者会发起拉取请求 if (consumerFuture.complete(consumer)) { commandSender.sendSuccessResponse(requestId); } else { try { consumer.close(); } catch (BrokerServiceException e) { } consumers.remove(consumerId, consumerFuture); } }) .exceptionally(exception -> { }); } else { ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); } return null; }).exceptionally(ex -> { return null; }); } }
重点在topic.subscribe(...)
public class PersistentTopic { public CompletableFuturesubscribe(final TransportCnx cnx, String subscriptionName, long consumerId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map metadata, boolean readCompacted, InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicatedSubscriptionState, KeySharedmeta keySharedmeta) { final CompletableFuture future = new CompletableFuture<>(); if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) { SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier( cnx.clientAddress().toString().split(":")[0], consumerName, consumerId); if (subscribeRateLimiter.isPresent() && (!subscribeRateLimiter.get().subscribeAvailable(consumer) || !subscribeRateLimiter.get().tryAcquire(consumer))) { return future; } } lock.readLock().lock(); try { if (isFenced) { return future; } // 统计消费者 handleConsumerAdded(subscriptionName, consumerName); } finally { lock.readLock().unlock(); } // 创建订阅 // initialPosition是从最新的还是最老的开始消费,默认最新 // startMessageRollbackDurationSec 配置的时间,默认0 // replicatedSubscriptionState是否复制 默认false,消费者客户端配置 CompletableFuture extends Subscription> subscriptionFuture = isDurable ? // getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, replicatedSubscriptionState) : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec); int maxUnackedMessages = isDurable ? getMaxUnackedMessagesOnConsumer() : 0; subscriptionFuture.thenAccept(subscription -> { // 创建消费者 Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedmeta, startMessageId); // 把消费者添加到订阅中 // 可以发现,订阅才是外层,管理具体消费者实例 addConsumerToSubscription(subscription, consumer).thenAccept(v -> { checkBackloggedCursors(); if (!cnx.isActive()) { // 异常 } else { // 对等集群复制 checkReplicatedSubscriptionControllerState(); future.complete(consumer); } }).exceptionally(e -> { return null; }); }).exceptionally(ex -> { return null; }); return future; } }
我们只分析最重要最常用的持久化订阅
继续getDurableSubscription
private CompletableFuturegetDurableSubscription(String subscriptionName, InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); // 是否超过topic最大创建的订阅数量 if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) { return subscriptionFuture; } Map properties = PersistentSubscription.getbaseCursorProperties(replicated); // 由打开一个Cursor ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { // 创建持久化订阅实例 PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> createPersistentSubscription(subscriptionName, cursor, replicated)); if (replicated && !subscription.isReplicated()) { subscription.setReplicated(replicated); } // 重置起始读、删除位置 if (startMessageRollbackDurationSec > 0) { resetSubscriptionCursor(subscription, subscriptionFuture, startMessageRollbackDurationSec); } else { subscriptionFuture.complete(subscription); } } @Override public void openCursorFailed(ManagedLedgerException exception, Object ctx) { decrementUsageCount(); subscriptionFuture.completeExceptionally(new PersistenceException(exception)); if (exception instanceof ManagedLedgerFencedException) { close(); } } }, null); return subscriptionFuture; }
ledger.asyncOpenCursor是创建ManagedCursor和数据初始化等
ManagedCursor从ManagedLedger中读数据,提供了各种读的api。
ManagedCursor和ManagedLedger底层细节专题单独讲,目前只需知道它们的作用和关系。
目前我们需要了解消息如何发送,如何接收,这个过程中分别在客户端和服务端如何实现的
我只需要知道在这里完成了Subscription创建
Subscription是根据订阅名称1对1创建,不同订阅名是广播消费,消费者名称生成的是consumer实例。Subscription根据consumer构建成Dispatcher进行消息分发,Dispatcher由订阅类型决定。
下一章介绍Subscription拉取请求的实现。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)