一、消息拉取请求入口上一篇介绍了服务端消费者的创建,本章介绍消息拉取请求实现
public class ServerCnx { protected void handleFlow(CommandFlow flow) { checkArgument(state == State.Connected); CompletableFutureconsumerFuture = consumers.get(flow.getConsumerId()); if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { Consumer consumer = consumerFuture.getNow(null); if (consumer != null) { // 传入客户端配置的默认1000 consumer.flowPermits(flow.getMessagePermits()); } else { } } } }
继续consumer.flowPermits(...)
二、Consumer层请求拉取public class Consumer { public void flowPermits(int additionalNumberOfMessages) { checkArgument(additionalNumberOfMessages > 0); // 超过限制不拉取 if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= maxUnackedMessages) { blockedConsumerOnUnackedMsgs = true; } int oldPermits; if (!blockedConsumerOnUnackedMsgs) { oldPermits = MESSAGE_PERMITS_UPDATeR.getAndAdd(this, additionalNumberOfMessages); // 通过订阅 subscription.consumerFlow(this, additionalNumberOfMessages); } else { oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages); } } }
上一章介绍过subscription是最外层,subscription关联的每个consumer持有subscription的引用,具体看上一章的consumer创建的构造传参。多个消费者相同的订阅名称配置都是调用subscription.consumerFlow(...);
继续subscription.consumerFlow(...);
public class PersistentSubscription { public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { // 更新拉取时间 this.lastConsumedFlowTimestamp = System.currentTimeMillis(); // 根据具体的订阅类型创建的dispatcher进行拉取 dispatcher.consumerFlow(consumer, additionalNumberOfMessages); } }
继续dispatcher.consumerFlow(...)
dispatcher实现有两个:
1、PersistentDispatcherSingleActiveConsumer
订阅模式:Exclusive、Failover
作用:组装从bk读出entry,组装数据,发送到请求的consumer
2、PersistentDispatcherMultipleConsumers
订阅模式:Shared、Key_Shared是PersistentStickyKeyDispatcherMultipleConsumers继承PersistentDispatcherMultipleConsumers
Shared作用:组装从bk读出entry,组装数据,轮询发送到已存在的consumer,如果消费者客户端配置了优先级相当于加权轮询
Key_Shared作用:组装从bk读出entry,组装数据,根据客户端配置的key的hash % consumer数量得出consumer(大概这意思,实际还是有些细节优化的)
核心都差不多,区别就是上面加粗的黑色。所以我们只看PersistentDispatcherMultipleConsumers的实现
四、Subscription对应的分发策略-PersistentDispatcherMultipleConsumers第1层回调public class PersistentDispatcherMultipleConsumers { public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { if (!consumerSet.contains(consumer)) { return; } totalAvailablePermits += additionalNumberOfMessages; readMoreEntries(); } }
继续readMoreEntries();
public class PersistentDispatcherMultipleConsumers { public synchronized void readMoreEntries() { // 获取一个consumer的availablePermits int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits(); // 与总availablePermits比较 int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits); if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) { // 通过限流配置计算出 1:读的数量 2:读的字节大小 PaircalculateResult = calculateToRead(currentTotalAvailablePermits); // 读的数量 int messagesToRead = calculateResult.getLeft(); // 读的大小 long bytesToRead = calculateResult.getRight(); if (messagesToRead == -1 || bytesToRead == -1) { return; } // 获取重新投递的列表或者可发送的延迟列表数据 Set messagesToReplayNow = getMessagesToReplayNow(messagesToRead); if (!messagesToReplayNow.isEmpty()) { havePendingReplayRead = true; // 如果topic开启延迟投递 // 1、过滤已经标记删除的数据 2、剩余从bk读出发给客户端 // 返回的是已经标记删除的 Set extends Position> deletedMessages = topic.isDelayedDeliveryEnabled() ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); // 从重新投递队列移除删除的 deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); // if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; // 重新读 topic.getBrokerService().executor().execute(() -> readMoreEntries()); } } // 超过最大未ack的数量阻塞消息分发 else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) { } // 没有正在读的请求 else if (!havePendingRead) { havePendingRead = true; // 读取 this是第一层回调 cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal, topic.getMaxReadPosition()); } else { } } else { } } }
开始读cursor.asyncReadEntriesOrWait(...)
四、ManagedCursorImpl读取-第2层回调OpReadEntrypublic class ManagedCursorImpl { public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) { if (isClosed()) { return; } // 通过ledger读取的平均流量计算最小读多少 int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes); // 当前读的位置比上次插入的位置小,说明可以读 if (hasMoreEntries()) { // 读取 asyncReadEntries(numberOfEntriesToRead, callback, ctx, maxPosition); } else { OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx, maxPosition); if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) { return; } // 延迟检查是否可读 if (config.getNewEntriesCheckDelayInMillis() > 0) { ledger.getScheduledExecutor() .schedule(() -> checkForNewEntries(op, callback, ctx), config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS); } else { // 检查是否可读 // 如果不可读 加入ledger.waitingCursors(),发送消息的回调中唤醒,以前解析发送时说过 checkForNewEntries(op, callback, ctx); } } } }
可以读,继续asyncReadEntries(numberOfEntriesToRead, callback, ctx, maxPosition)
public class ManagedCursorImpl { public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) { checkArgument(numberOfEntriesToRead > 0); if (isClosed()) { callback.readEntriesFailed(new ManagedLedgerException .CursorAlreadyClosedException("Cursor was already closed"), ctx); return; } // 上面调用过了,所以这里是直接return就是numberOfEntriesToRead int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes); PENDING_READ_OPS_UPDATER.incrementAndGet(this); // 读取 第二层回调 OpReadEntry op = OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition); ledger.asyncReadEntries(op); } }
继续看ledger中的读取ledger.asyncReadEntries(op)
public class ManagedLedgerImpl { void asyncReadEntries(OpReadEntry opReadEntry) { final State state = STATE_UPDATER.get(this); if (state == State.Fenced || state == State.Closed) { opReadEntry.readEntriesFailed(new ManagedLedgerFencedException(), opReadEntry.ctx); return; } // 读的位置对应的ledger和当前ledger一致 long ledgerId = opReadEntry.readPosition.getLedgerId(); LedgerHandle currentLedger = this.currentLedger; if (currentLedger != null && ledgerId == currentLedger.getId()) { // 使用当前ledger读 internalReadFromLedger(currentLedger, opReadEntry); } else { // 说明:写的快,读的慢,ledger切换后,当前读的位置还是以前ledger // ledgers是zk加载topic对应所有ledger的元信息缓存在内存 // 获取到当前的ledger信息 LedgerInfo ledgerInfo = ledgers.get(ledgerId); // 异常场景 if (ledgerInfo == null || ledgerInfo.getEntries() == 0) { opReadEntry.updateReadPosition(new PositionImpl(opReadEntry.readPosition.getLedgerId() + 1, 0)); opReadEntry.checkReadCompletion(); return; } // 创建一个ledger对应的 *** 作bk的客户端LedgerHandle实例 getLedgerHandle(ledgerId).thenAccept(ledger -> internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex -> { opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), opReadEntry.ctx); return null; }); } } }
继续internalReadFromLedger(currentLedger, opReadEntry)
五、ManagedLedgerImpl读取-第3层回调ReadEntryCallbackWrapperpublic class ManagedLedgerImpl { private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) { // 上面分析这条链路maxPosition默认long最大值,其他链路调用可以自定义设置 if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) { opReadEntry.checkReadCompletion(); return; } // 最后一次读的entryId long firstEntry = opReadEntry.readPosition.getEntryId(); long lastEntryInLedger; PositionImpl lastPosition = last/confirm/iedEntry; // 如果没有ledger切换,上一次新增的消息就是最后entry if (ledger.getId() == lastPosition.getLedgerId()) { lastEntryInLedger = lastPosition.getEntryId(); } else { // 获取以前ledger的最后一个entry lastEntryInLedger = ledger.getLastAdd/confirm/ied(); } if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) { lastEntryInLedger = min(opReadEntry.maxPosition.getEntryId(), lastEntryInLedger); } // 如果起始id比最后一个还大,说明没办法读。 if (firstEntry > lastEntryInLedger) { if (currentLedger == null || ledger.getId() != currentLedger.getId()) { Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1); if (nextLedgerId != null) { opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0)); } else { opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0)); } } else { opReadEntry.updateReadPosition(opReadEntry.readPosition); } opReadEntry.checkReadCompletion(); return; } // 开始id+读的数量=结束id 因为entryId是自增的 long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger); // 读取 asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx); } }
继续asyncReadEntry
public class ManagedLedgerImpl { protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader, OpReadEntry opReadEntry, Object ctx) { if (config.getReadEntryTimeoutSeconds() > 0) { long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this); long createdTime = System.nanoTime(); // 第三层回调 ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry, opReadEntry, readOpCount, createdTime, ctx); lastReadCallback = readCallback; // 缓存读 entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount); } else { entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx); } } }
继续看缓存读实现entryCache.asyncReadEntry
六、Broker端缓存读取public class EntryCacheImpl { private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final ReadEntriesCallback callback, Object ctx) { final long ledgerId = lh.getId(); final int entriesToRead = (int) (lastEntry - firstEntry) + 1; final PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry); final PositionImpl lastPosition = PositionImpl.get(lh.getId(), lastEntry); // 缓存实现是ConcurrentSkipListMap // value是堆外内存 // 获取entry CollectioncachedEntries = entries.getRange(firstPosition, lastPosition); // 数量一致 if (cachedEntries.size() == entriesToRead) { long totalCachedSize = 0; final List entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); for (EntryImpl entry : cachedEntries) { entriesToReturn.add(EntryImpl.create(entry)); totalCachedSize += entry.getLength(); entry.release(); } // 监控记录命中缓存 manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(), totalCachedSize); // 放入回调,一层一层向上。等会我们挑一个重要的回调解析 callback.readEntriesComplete((List) entriesToReturn, ctx); } else { // 说明没全部命中 // 清空,这也是为了降低实现的复杂度,大部分还是能命中 if (!cachedEntries.isEmpty()) { cachedEntries.forEach(entry -> entry.release()); } // 从bk读 lh.readAsync(firstEntry, lastEntry).whenCompleteAsync( (ledgerEntries, exception) -> { if (exception != null) { return; } try { long totalSize = 0; final List entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead); for (LedgerEntry e : ledgerEntries) { EntryImpl entry = EntryImpl.create(e); entriesToReturn.add(entry); totalSize += entry.getLength(); } // 监控记录未命中缓存 manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize); ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize); // 放入回调,一层一层向上。等会我们挑一个重要的回调解析 callback.readEntriesComplete((List) entriesToReturn, ctx); } finally { ledgerEntries.close(); } }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{ return null; }); } } }
可以看到,如果缓存没命中会直接调用bk,收到bk响应组装list放入回调结束。并没有放入缓存,这个需要知道,也没必要放入,因为消息大多数场景都是消费完结束了。放入命中率低,还占用空间。缓存主要发送时往里放,调大可以提升性能。对应配置项:managedLedgerCacheSizeMB
接下来回调就不一一解析了,挑一个最重要的解析一下就是第一层回调。
七、第一层回调PersistentDispatcherMultipleConsumers客户端解析public class PersistentDispatcherMultipleConsumers { public synchronized void readEntriesComplete(Listentries, Object ctx) { ... sendMessagesToConsumers(readType, entries); } }
继续sendMessagesToConsumers
public class PersistentDispatcherMultipleConsumers { protected void sendMessagesToConsumers(ReadType readType, Listentries) { // 如果最后一次删除比read位置大,说明当前数据可能存在已删除 if (needTrimAckedMessages()) { // 过滤已删除的数据 cursor.trimDeletedEntries(entries); } int entriesToDispatch = entries.size(); // Trigger read more messages if (entriesToDispatch == 0) { readMoreEntries(); return; } EntryWrapper[] entryWrappers = new EntryWrapper[entries.size()]; // 消息存在批量,一批也是一个entry // 这里统计具体消息条数 int remainingMessages = updateEntryWrapperWithmetadata(entryWrappers, entries); int start = 0; // 发送总数 long totalMessagesSent = 0; // 发送总字节数 long totalBytesSent = 0; // 平均 int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1; int firstAvailableConsumerPermits, currentTotalAvailablePermits; boolean dispatchMessage; while (entriesToDispatch > 0) { // 第一个消费者拉取数 firstAvailableConsumerPermits = getFirstAvailableConsumerPermits(); currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits); dispatchMessage = currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0; if (!dispatchMessage) { break; } // 获取下一个 Consumer c = getNextConsumer(); if (c == null) { return; } // 当前消费者的拉取数量 int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1; // serviceConfig.getDispatcherMaxRoundRobinBatchSize()默认20 // availablePermits客户端默认配置1000 // 如果是批量remainingMessages可能大于1000,实际可能是20 int messagesForC = Math.min(Math.min(remainingMessages, availablePermits), serviceConfig.getDispatcherMaxRoundRobinBatchSize()); // 计算一个折中的大小 messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1); if (messagesForC > 0) { // 对着一批entry进行分批发送 // 计算结束位置 int end = Math.min(start + messagesForC, entries.size()); if (readType == ReadType.Replay) { entries.subList(start, end).forEach(entry -> { redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); }); } SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); List entriesForThisConsumer = entries.subList(start, end); EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size()); EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size()); // 过滤 例如:延迟消息未到时间过滤 filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay); // 调用consumer发送 c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker); int msgSent = sendMessageInfo.getTotalMessages(); remainingMessages -= msgSent; start += messagesForC; entriesToDispatch -= messagesForC; // 递归停止的关键 // TOTAL_AVAILABLE_PERMITS_UPDATER是总的拉取数 // 一直减到<0,说明客户端请求的拉取数量服务端都推送完毕 TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(msgSent - batchIndexesAcks.getTotalAckedIndexCount())); totalMessagesSent += sendMessageInfo.getTotalMessages(); totalBytesSent += sendMessageInfo.getTotalBytes(); } } for (EntryWrapper entry : entryWrappers) { if (entry != null) { entry.recycle(); } } // 分发速率控制 if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { if (topic.getDispatchRateLimiter().isPresent()) { topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent); } if (dispatchRateLimiter.isPresent()) { dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent); } } if (entriesToDispatch > 0) { entries.subList(start, entries.size()).forEach(entry -> { long stickyKeyHash = getStickyKeyHash(entry); addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); }); } // 上面讲过该方法,可以看到这是一个递归调用,链路很长 readMoreEntries(); } }
再往下直接看最后
public class PulsarCommandSenderImpl { public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription, int partitionIdx, Listentries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, RedeliveryTracker redeliveryTracker) { final ChannelHandlerContext ctx = cnx.ctx(); final ChannelPromise writePromise = ctx.newPromise(); ctx.channel().eventLoop().execute(() -> { for (int i = 0; i < entries.size(); i++) { Entry entry = entries.get(i); if (entry == null) { // Entry was filtered out continue; } int batchSize = batchSizes.getBatchSize(i); if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) { continue; } ByteBuf metadataAndPayload = entry.getDataBuffer(); metadataAndPayload.retain(); Commands.skipBrokerEntrymetadataIfExist(metadataAndPayload); if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getValue()) { Commands.skipChecksumIfPresent(metadataAndPayload); } int redeliveryCount = 0; PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); if (redeliveryTracker.contains(position)) { redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position); } // 写入缓冲区 ctx.write( cnx.newMessageAndIntercept(consumerId, entry.getLedgerId(), entry.getEntryId(), partitionIdx, redeliveryCount, metadataAndPayload, batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise()); entry.release(); } // 刷新 ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise); batchSizes.recyle(); if (batchIndexesAcks != null) { batchIndexesAcks.recycle(); } }); return writePromise; } }
到协议层了。
整个核心流程介绍完了,它细节还有很多很多,过程中很多方法没仔细点进去进去分析,会偏离文章主题,还是围绕着消费请求拉取的流程介绍。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)