Pulsar源码解析-服务端-消费者消息拉取的底层实现

Pulsar源码解析-服务端-消费者消息拉取的底层实现,第1张

Pulsar源码解析-服务端-消费者消息拉取的底层实现

上一篇介绍了服务端消费者的创建,本章介绍消息拉取请求实现

一、消息拉取请求入口
public class ServerCnx {

    protected void handleFlow(CommandFlow flow) {
        checkArgument(state == State.Connected);

        CompletableFuture consumerFuture = consumers.get(flow.getConsumerId());

        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (consumer != null) {
            	// 传入客户端配置的默认1000
                consumer.flowPermits(flow.getMessagePermits());
            } else {
            }
        }
    }
}

继续consumer.flowPermits(...)

二、Consumer层请求拉取
public class Consumer {

    public void flowPermits(int additionalNumberOfMessages) {
        checkArgument(additionalNumberOfMessages > 0);
		// 超过限制不拉取
        if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= maxUnackedMessages) {
            blockedConsumerOnUnackedMsgs = true;
        }
        int oldPermits;
        if (!blockedConsumerOnUnackedMsgs) {
            oldPermits = MESSAGE_PERMITS_UPDATeR.getAndAdd(this, additionalNumberOfMessages);
            // 通过订阅
            subscription.consumerFlow(this, additionalNumberOfMessages);
        } else {
            oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
        }
    }
}

上一章介绍过subscription是最外层,subscription关联的每个consumer持有subscription的引用,具体看上一章的consumer创建的构造传参。多个消费者相同的订阅名称配置都是调用subscription.consumerFlow(...);
继续subscription.consumerFlow(...);

三、Consumer所属的Subscription层消息拉取请求
public class PersistentSubscription {

    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
    	// 更新拉取时间
        this.lastConsumedFlowTimestamp = System.currentTimeMillis();
        // 根据具体的订阅类型创建的dispatcher进行拉取
        dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
    }
}

继续dispatcher.consumerFlow(...)
dispatcher实现有两个:
1、PersistentDispatcherSingleActiveConsumer
订阅模式:Exclusive、Failover
作用:组装从bk读出entry,组装数据,发送到请求的consumer

2、PersistentDispatcherMultipleConsumers
订阅模式:Shared、Key_Shared是PersistentStickyKeyDispatcherMultipleConsumers继承PersistentDispatcherMultipleConsumers

Shared作用:组装从bk读出entry,组装数据,轮询发送到已存在的consumer,如果消费者客户端配置了优先级相当于加权轮询

Key_Shared作用:组装从bk读出entry,组装数据,根据客户端配置的key的hash % consumer数量得出consumer(大概这意思,实际还是有些细节优化的)

核心都差不多,区别就是上面加粗的黑色。所以我们只看PersistentDispatcherMultipleConsumers的实现

四、Subscription对应的分发策略-PersistentDispatcherMultipleConsumers第1层回调
public class PersistentDispatcherMultipleConsumers {

    public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
        if (!consumerSet.contains(consumer)) {
            return;
        }
        totalAvailablePermits += additionalNumberOfMessages;
        readMoreEntries();
    }
}

继续readMoreEntries();

public class PersistentDispatcherMultipleConsumers {

public synchronized void readMoreEntries() {
		// 获取一个consumer的availablePermits
        int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
        // 与总availablePermits比较
        int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
        if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
            // 通过限流配置计算出 1:读的数量 2:读的字节大小
            Pair calculateResult = calculateToRead(currentTotalAvailablePermits);
            // 读的数量
            int messagesToRead = calculateResult.getLeft();
            // 读的大小
            long bytesToRead = calculateResult.getRight();

            if (messagesToRead == -1 || bytesToRead == -1) {
                return;
            }
			// 获取重新投递的列表或者可发送的延迟列表数据
            Set messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
            if (!messagesToReplayNow.isEmpty()) {

                havePendingReplayRead = true;
                // 如果topic开启延迟投递
                // 1、过滤已经标记删除的数据 2、剩余从bk读出发给客户端
                // 返回的是已经标记删除的
                Set deletedMessages = topic.isDelayedDeliveryEnabled()
                        ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
				// 从重新投递队列移除删除的
                deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
                        ((PositionImpl) position).getEntryId()));
                        //  
                if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
                    havePendingReplayRead = false;
                    // 重新读
                    topic.getBrokerService().executor().execute(() -> readMoreEntries());
                }
            }
            // 超过最大未ack的数量阻塞消息分发 
            else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {

            }
            // 没有正在读的请求
            else if (!havePendingRead) {
                havePendingRead = true;
                // 读取 this是第一层回调
                cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
                        ReadType.Normal, topic.getMaxReadPosition());
            } else {
            }
        } else {
        }
    }
}

开始读cursor.asyncReadEntriesOrWait(...)

四、ManagedCursorImpl读取-第2层回调OpReadEntry
public class ManagedCursorImpl {

public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
                                       PositionImpl maxPosition) {
        if (isClosed()) {
            return;
        }
		// 通过ledger读取的平均流量计算最小读多少
        int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes);
		// 当前读的位置比上次插入的位置小,说明可以读
        if (hasMoreEntries()) {
        	// 读取
            asyncReadEntries(numberOfEntriesToRead, callback, ctx, maxPosition);
        } else {
            OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
                    ctx, maxPosition);

            if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
                return;
            }
            // 延迟检查是否可读
            if (config.getNewEntriesCheckDelayInMillis() > 0) {
                ledger.getScheduledExecutor()
                        .schedule(() -> checkForNewEntries(op, callback, ctx),
                                config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
            } else {
            	// 检查是否可读
            	// 如果不可读 加入ledger.waitingCursors(),发送消息的回调中唤醒,以前解析发送时说过
                checkForNewEntries(op, callback, ctx);
            }
        }
    }
}

可以读,继续asyncReadEntries(numberOfEntriesToRead, callback, ctx, maxPosition)

public class ManagedCursorImpl {

    public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
                                 Object ctx, PositionImpl maxPosition) {
        checkArgument(numberOfEntriesToRead > 0);
        if (isClosed()) {
            callback.readEntriesFailed(new ManagedLedgerException
                    .CursorAlreadyClosedException("Cursor was already closed"), ctx);
            return;
        }
		// 上面调用过了,所以这里是直接return就是numberOfEntriesToRead
        int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes);

        PENDING_READ_OPS_UPDATER.incrementAndGet(this);
        // 读取 第二层回调
        OpReadEntry op = OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition);
        ledger.asyncReadEntries(op);
    }
}

继续看ledger中的读取ledger.asyncReadEntries(op)

public class ManagedLedgerImpl {

void asyncReadEntries(OpReadEntry opReadEntry) {
        final State state = STATE_UPDATER.get(this);
        if (state == State.Fenced || state == State.Closed) {
            opReadEntry.readEntriesFailed(new ManagedLedgerFencedException(), opReadEntry.ctx);
            return;
        }
        // 读的位置对应的ledger和当前ledger一致
        long ledgerId = opReadEntry.readPosition.getLedgerId();
        LedgerHandle currentLedger = this.currentLedger;
        if (currentLedger != null && ledgerId == currentLedger.getId()) {
        	// 使用当前ledger读
            internalReadFromLedger(currentLedger, opReadEntry);
        } else {
        	// 说明:写的快,读的慢,ledger切换后,当前读的位置还是以前ledger
        	// ledgers是zk加载topic对应所有ledger的元信息缓存在内存
        	// 获取到当前的ledger信息
            LedgerInfo ledgerInfo = ledgers.get(ledgerId);
            // 异常场景
            if (ledgerInfo == null || ledgerInfo.getEntries() == 0) {
                opReadEntry.updateReadPosition(new PositionImpl(opReadEntry.readPosition.getLedgerId() + 1, 0));
                opReadEntry.checkReadCompletion();
                return;
            }

            // 创建一个ledger对应的 *** 作bk的客户端LedgerHandle实例
            getLedgerHandle(ledgerId).thenAccept(ledger -> internalReadFromLedger(ledger, opReadEntry)).exceptionally(ex -> {
                opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
                        opReadEntry.ctx);
                return null;
            });
        }
    }
}

继续internalReadFromLedger(currentLedger, opReadEntry)

五、ManagedLedgerImpl读取-第3层回调ReadEntryCallbackWrapper
public class ManagedLedgerImpl {

   private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {
		// 上面分析这条链路maxPosition默认long最大值,其他链路调用可以自定义设置
        if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
            opReadEntry.checkReadCompletion();
            return;
        }
        // 最后一次读的entryId
        long firstEntry = opReadEntry.readPosition.getEntryId();
        long lastEntryInLedger;

        PositionImpl lastPosition = last/confirm/iedEntry;
		// 如果没有ledger切换,上一次新增的消息就是最后entry
        if (ledger.getId() == lastPosition.getLedgerId()) {
            lastEntryInLedger = lastPosition.getEntryId();
        } else {
        	// 获取以前ledger的最后一个entry
            lastEntryInLedger = ledger.getLastAdd/confirm/ied();
        }
        if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) {
            lastEntryInLedger = min(opReadEntry.maxPosition.getEntryId(), lastEntryInLedger);
        }
		// 如果起始id比最后一个还大,说明没办法读。
        if (firstEntry > lastEntryInLedger) {

            if (currentLedger == null || ledger.getId() != currentLedger.getId()) {
                Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1);
                if (nextLedgerId != null) {
                    opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0));
                } else {
                    opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0));
                }
            } else {
                opReadEntry.updateReadPosition(opReadEntry.readPosition);
            }

            opReadEntry.checkReadCompletion();
            return;
        }
		// 开始id+读的数量=结束id 因为entryId是自增的
        long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
        // 读取
        asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx);
    }
}

继续asyncReadEntry

public class ManagedLedgerImpl {

    protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader,
            OpReadEntry opReadEntry, Object ctx) {
        if (config.getReadEntryTimeoutSeconds() > 0) {
            long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
            long createdTime = System.nanoTime();
            // 第三层回调
            ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
                    opReadEntry, readOpCount, createdTime, ctx);
            lastReadCallback = readCallback;
            // 缓存读
            entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
        } else {
            entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
        }
    }
}

继续看缓存读实现entryCache.asyncReadEntry

六、Broker端缓存读取
public class EntryCacheImpl {

private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
            final ReadEntriesCallback callback, Object ctx) {
        final long ledgerId = lh.getId();
        final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
        final PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry);
        final PositionImpl lastPosition = PositionImpl.get(lh.getId(), lastEntry);

		// 缓存实现是ConcurrentSkipListMap
		// value是堆外内存
		// 获取entry
        Collection cachedEntries = entries.getRange(firstPosition, lastPosition);
		// 数量一致
        if (cachedEntries.size() == entriesToRead) {
            long totalCachedSize = 0;
            final List entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);

            for (EntryImpl entry : cachedEntries) {
                entriesToReturn.add(EntryImpl.create(entry));
                totalCachedSize += entry.getLength();
                entry.release();
            }
			// 监控记录命中缓存
            manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(), totalCachedSize);
			// 放入回调,一层一层向上。等会我们挑一个重要的回调解析
            callback.readEntriesComplete((List) entriesToReturn, ctx);

        } else {
        	// 说明没全部命中
        	// 清空,这也是为了降低实现的复杂度,大部分还是能命中
            if (!cachedEntries.isEmpty()) {
                cachedEntries.forEach(entry -> entry.release());
            }

            // 从bk读
            lh.readAsync(firstEntry, lastEntry).whenCompleteAsync(
                    (ledgerEntries, exception) -> {
                        if (exception != null) {
                            return;
                        }
                        try {
                            long totalSize = 0;
                            final List entriesToReturn
                                = Lists.newArrayListWithExpectedSize(entriesToRead);
                            for (LedgerEntry e : ledgerEntries) {
                                EntryImpl entry = EntryImpl.create(e);

                                entriesToReturn.add(entry);
                                totalSize += entry.getLength();
                            }
							// 监控记录未命中缓存
                            manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
                            ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize);
							// 放入回调,一层一层向上。等会我们挑一个重要的回调解析
                            callback.readEntriesComplete((List) entriesToReturn, ctx);
                        } finally {
                            ledgerEntries.close();
                        }
                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
                    	return null;
                    });
        }
    }
}

可以看到,如果缓存没命中会直接调用bk,收到bk响应组装list放入回调结束。并没有放入缓存,这个需要知道,也没必要放入,因为消息大多数场景都是消费完结束了。放入命中率低,还占用空间。缓存主要发送时往里放,调大可以提升性能。对应配置项:managedLedgerCacheSizeMB

接下来回调就不一一解析了,挑一个最重要的解析一下就是第一层回调。

七、第一层回调PersistentDispatcherMultipleConsumers客户端解析
public class PersistentDispatcherMultipleConsumers {

public synchronized void readEntriesComplete(List entries, Object ctx) {
        ...
        sendMessagesToConsumers(readType, entries);
    }
}

继续sendMessagesToConsumers

public class PersistentDispatcherMultipleConsumers {

protected void sendMessagesToConsumers(ReadType readType, List entries) {
		// 如果最后一次删除比read位置大,说明当前数据可能存在已删除
        if (needTrimAckedMessages()) {
        	// 过滤已删除的数据
            cursor.trimDeletedEntries(entries);
        }

        int entriesToDispatch = entries.size();
        // Trigger read more messages
        if (entriesToDispatch == 0) {
            readMoreEntries();
            return;
        }
        EntryWrapper[] entryWrappers = new EntryWrapper[entries.size()];
        // 消息存在批量,一批也是一个entry
        // 这里统计具体消息条数
        int remainingMessages = updateEntryWrapperWithmetadata(entryWrappers, entries);
        int start = 0;
        // 发送总数
        long totalMessagesSent = 0;
        // 发送总字节数
        long totalBytesSent = 0;
        // 平均
        int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;
		
        int firstAvailableConsumerPermits, currentTotalAvailablePermits;
        boolean dispatchMessage;
        while (entriesToDispatch > 0) {
        	// 第一个消费者拉取数
            firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
            currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
            dispatchMessage = currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0;
            if (!dispatchMessage) {
                break;
            }
            // 获取下一个
            Consumer c = getNextConsumer();
            if (c == null) {
                return;
            }

            // 当前消费者的拉取数量
            int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;

			// serviceConfig.getDispatcherMaxRoundRobinBatchSize()默认20
			// availablePermits客户端默认配置1000
			// 如果是批量remainingMessages可能大于1000,实际可能是20
            int messagesForC = Math.min(Math.min(remainingMessages, availablePermits),
                    serviceConfig.getDispatcherMaxRoundRobinBatchSize());
                    // 计算一个折中的大小
            messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1);

            if (messagesForC > 0) {
            	// 对着一批entry进行分批发送
            	// 计算结束位置
                int end = Math.min(start + messagesForC, entries.size());
                if (readType == ReadType.Replay) {
                    entries.subList(start, end).forEach(entry -> {
                        redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
                    });
                }

                SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                List entriesForThisConsumer = entries.subList(start, end);

                EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
                // 过滤 例如:延迟消息未到时间过滤
                filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
                        batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
				// 调用consumer发送
                c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                        sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);

                int msgSent = sendMessageInfo.getTotalMessages();
                remainingMessages -= msgSent;
                start += messagesForC;
                entriesToDispatch -= messagesForC;
                // 递归停止的关键
                // TOTAL_AVAILABLE_PERMITS_UPDATER是总的拉取数
                // 一直减到<0,说明客户端请求的拉取数量服务端都推送完毕
                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
                        -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
                totalMessagesSent += sendMessageInfo.getTotalMessages();
                totalBytesSent += sendMessageInfo.getTotalBytes();
            }
        }
        for (EntryWrapper entry : entryWrappers) {
            if (entry != null) {
                entry.recycle();
            }
        }
		// 分发速率控制
        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
            if (topic.getDispatchRateLimiter().isPresent()) {
                topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
            }

            if (dispatchRateLimiter.isPresent()) {
                dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
            }
        }

        if (entriesToDispatch > 0) {
            entries.subList(start, entries.size()).forEach(entry -> {
                long stickyKeyHash = getStickyKeyHash(entry);
                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
                entry.release();
            });
        }
        // 上面讲过该方法,可以看到这是一个递归调用,链路很长
        readMoreEntries();
    }
}

再往下直接看最后

public class PulsarCommandSenderImpl {

public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription,
            int partitionIdx, List entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
            RedeliveryTracker redeliveryTracker) {
        final ChannelHandlerContext ctx = cnx.ctx();
        final ChannelPromise writePromise = ctx.newPromise();
        ctx.channel().eventLoop().execute(() -> {
            for (int i = 0; i < entries.size(); i++) {
                Entry entry = entries.get(i);
                if (entry == null) {
                    // Entry was filtered out
                    continue;
                }

                int batchSize = batchSizes.getBatchSize(i);

                if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) {
                    continue;
                }

                ByteBuf metadataAndPayload = entry.getDataBuffer();
                metadataAndPayload.retain();
                Commands.skipBrokerEntrymetadataIfExist(metadataAndPayload);
                if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getValue()) {
                    Commands.skipChecksumIfPresent(metadataAndPayload);
                }

                int redeliveryCount = 0;
                PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                if (redeliveryTracker.contains(position)) {
                    redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
                }
				// 写入缓冲区
                ctx.write(
                        cnx.newMessageAndIntercept(consumerId, entry.getLedgerId(), entry.getEntryId(), partitionIdx,
                                redeliveryCount, metadataAndPayload,
                                batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName),
                        ctx.voidPromise());
                entry.release();
            }
            // 刷新
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
            batchSizes.recyle();
            if (batchIndexesAcks != null) {
                batchIndexesAcks.recycle();
            }
        });

        return writePromise;
    }
}

到协议层了。
整个核心流程介绍完了,它细节还有很多很多,过程中很多方法没仔细点进去进去分析,会偏离文章主题,还是围绕着消费请求拉取的流程介绍。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688440.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存