《Kafka运维管控平台》 ✏️更强大的管控能力✏️ 更高效的问题定位能力 更便捷的集群运维能力 更专业的资源治理 更友好的运维生态
文末送35书
文章目录- 在这里我们先来梳理一下consumeGroup的相关知识
- findCoordinator流程展示
- 客户端源码分析
- org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator
- org.apache.kafka.clients.NetworkClient#leastLoadedNode
- 拆解FindCoordinatorRequest
- 服务端源码分析
- kafka.server.KafkaApis#handleFindCoordinatorRequest
- kafka.coordinator.group.GroupmetadataManager#partitionFor
- kafka.server.KafkaApis#getOrCreateInternalTopic
- kafka.server.KafkaApis#createTopic
- 拆解FindCoordinatorResponse
- 总结
1、首先,我们会给每个consume设置groupId,对于相同groupId且订阅相同topic的consume,会组成consumeGroup,如图一所示
2、对于Server端的topic来说,会有partition这个概念,如图二所示
3、现在我们有多个consume及多个partition,到底由哪个consume来消费哪个partition呢?就由consume启动时的分区分配策略来决定。
-
如果consume数量小于partition的数量,则一个consume有可能消费多个分区,如图三所示
-
如果consume数量大于partition的数量,则会有consume线程空跑,如图四所示
4、kafka的内置topic:consumer_offsets专门记录消费位点信息,既然是内置topic,那自然也会有partition及partition leader的概念,对于同一个groupId的消费位点都会记录在同一个partition中,在这篇文章中findCoordinator即是找到该groupId对应的partition的leader节点,我们知道这个节点才能将位移信息提交到这里保存,如果该partition还有其他副本,则该节点还会与其他副本同步位移信息。与该节点交互都是由GroupCoordinator完成的。
这里还是放一下findCoordinator的代码,看其他consume的代码就发现客户端跟kafkaServer通信的格式大多是这样的,如果通信一次发现该GroupCoordinator的信息还未获取到则继续重试,直到超时,这里的超时时间即为poll时传入的超时时间,这个时间设置贯穿了整个consume的运行代码。
protected synchronized boolean ensureCoordinatorReady(final Timer timer) { //如果还未加入group则与group通信 if (!coordinatorUnknown()) return true; do { if (findCoordinatorException != null && !(findCoordinatorException instanceof RetriableException)) { final RuntimeException fatalException = findCoordinatorException; findCoordinatorException = null; throw fatalException; } final RequestFuturefuture = lookupCoordinator(); client.poll(future, timer); //如果还没回调完成则说明是超时的 if (!future.isDone()) { // ran out of time break; } if (future.failed()) { if (future.isRetriable()) { log.debug("Coordinator discovery failed, refreshing metadata"); client.awaitmetadataUpdate(timer); } else throw future.exception(); //获取group的信息之后client会与group对应的节点建立连接,如果不可用则还会重试 } else if (coordinator != null && client.isUnavailable(coordinator)) { // we found the coordinator, but the connection has failed, so mark // it dead and backoff before retrying discovery markCoordinatorUnknown(); timer.sleep(rebalanceConfig.retryBackoffMs); } //如果与group通信成功则会跳出循环 } while (coordinatorUnknown() && timer.notExpired()); return !coordinatorUnknown(); }
这里还有一点,跟踪代码可以看到以下代码在每次check以及与Server端通信完成之后都会有一样的逻辑,可以仔细思考一下,coordinator即获取到的group节点对象,client.isUnavailable(coordinator)是在与group建立连接,每次判断coordinator不为空且client与group连接失败,则将coordinator置空,为什么会这样呢?很有可能是请求到group的信息之后发现该节点已下线或者不可用,此时服务端很有可能也在进行选举,所以我们需要将coordinator清空,待服务端选举完成后再次通信。
protected synchronized Node checkAndGetCoordinator() { if (coordinator != null && client.isUnavailable(coordinator)) { markCoordinatorUnknown(true); return null; } return this.coordinator; }org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator
这段代码有个亮点就是先寻找的负载最小节点,然后与该节点通信获取group节点的信息。
protected synchronized RequestFutureorg.apache.kafka.clients.NetworkClient#leastLoadedNodelookupCoordinator() { if (findCoordinatorFuture == null) { // find a node to ask about the coordinator //与最小负载的node通信 Node node = this.client.leastLoadedNode(); if (node == null) { log.debug("No broker available to send FindCoordinator request"); return RequestFuture.noBrokersAvailable(); } else { findCoordinatorFuture = sendFindCoordinatorRequest(node); // remember the exception even after the future is cleared so that // it can still be thrown by the ensureCoordinatorReady caller findCoordinatorFuture.addListener(new RequestFutureListener () { @Override public void onSuccess(Void value) {} // do nothing @Override public void onFailure(RuntimeException e) { findCoordinatorException = e; } }); } } return findCoordinatorFuture; }
我们先来看看是如何寻找负载最小节点的,这里代码还是挺讲究的,首先就是取随机数,防止每次都从第一个节点连接,如果判断没有在途的request则直接返回该节点,否则取在途request最小的节点,如果该节点不存在,则依次取连接的节点、需要重试的节点,如果找到不为null的节点则返回该节点,否则返回null。
public Node leastLoadedNode(long now) { List拆解FindCoordinatorRequestnodes = this.metadataUpdater.fetchNodes(); if (nodes.isEmpty()) throw new IllegalStateException("There are no nodes in the Kafka cluster"); int inflight = Integer.MAX_VALUE; Node foundConnecting = null; Node foundCanConnect = null; Node foundReady = null; //随机取一个节点 int offset = this.randOffset.nextInt(nodes.size()); for (int i = 0; i < nodes.size(); i++) { int idx = (offset + i) % nodes.size(); Node node = nodes.get(idx); //如果该节点是可连接的,且selector空闲,且发送队列空闲则可以发送请求 if (canSendRequest(node.idString(), now)) { //inFlightRequests记录了已发送请求但还未收到response的request,这里判定如果该节点没有这种数据则直接作为最小负载节点返回 int currInflight = this.inFlightRequests.count(node.idString()); if (currInflight == 0) { // if we find an established connection with no in-flight requests we can stop right away log.trace("Found least loaded node {} connected with no in-flight requests", node); return node; //否则取inFlightRequests中最小count的节点作为最小负载节点 } else if (currInflight < inflight) { // otherwise if this is the best we have found so far, record that inflight = currInflight; foundReady = node; } } else if (connectionStates.isPreparingConnection(node.idString())) { foundConnecting = node; } else if (canConnect(node, now)) { //如果该节点未被记录或者断连之后超过重试时间,则允许设置该节点 foundCanConnect = node; } else { log.trace("Removing node {} from least loaded node selection since it is neither ready " + "for sending or connecting", node); } } // We prefer established connections if possible. Otherwise, we will wait for connections // which are being established before connecting to new nodes. //优先取状态良好的节点 if (foundReady != null) { log.trace("Found least loaded node {} with {} inflight requests", foundReady, inflight); return foundReady; } else if (foundConnecting != null) { log.trace("Found least loaded connecting node {}", foundConnecting); return foundConnecting; } else if (foundCanConnect != null) { log.trace("Found least loaded node {} with no active connection", foundCanConnect); return foundCanConnect; } else { log.trace("Least loaded node selection failed to find an available node"); return null; } }
通过下图我们来看看发送了哪些数据,key_type有两种枚举,一种是GROUP,另一种是TRANSACTION,如果type为GROUP的话那key就是groupId
服务端源码分析 kafka.server.KafkaApis#handleFindCoordinatorRequest服务端还是通过KafkaApi来处理请求,代码也比较简单。
def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = { val findCoordinatorRequest = request.body[FindCoordinatorRequest] //校验数据 //……省略部分代码 // get metadata (and create the topic if necessary) val (partition, topicmetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => //4.1 找到对应发分区 val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) //4.2 获取对应的元数据 val metadata = getOrCreateInternalTopic(GROUP_metaDATA_TOPIC_NAME, request.context.listenerName) (partition, metadata) case CoordinatorType.TRANSACTION => val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) (partition, metadata) case _ => throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") } //组装返回参数 //……省略部分代码 } }kafka.coordinator.group.GroupmetadataManager#partitionFor
我们知道consume消费后对应的位点是保存在kafka的内部名为"__consumer_offsets"的内置topic中,内置topic初始化时由offsets.topic.num.partitions 参数来决定分区数,默认值是50,相同consumeGroup的offset最终会保存在其中一个分区中,而保存在哪个分区就由下面这段代码来决定,可以看到逻辑很简单,就是取groupId的hashCode,然后对总的分区数取模。比如groupId为"consume_group",最终就会在34号分区保存位点。
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupmetadataTopicPartitionCountkafka.server.KafkaApis#getOrCreateInternalTopic
这里是先从当前node的元数据缓存中拿到对应topic的数据,如果没有,则创建。从这段代码也可以猜想kafka内置topic的创建原理,是一种懒加载的思想,当第一个consume接入之后才会创建对应topicPartition文件。
private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): metadataResponse.Topicmetadata = { val topicmetadata = metadataCache.getTopicmetadata(Set(topic), listenerName) topicmetadata.headOption.getOrElse(createInternalTopic(topic)) }
这里的topicmetadata就是对应入参topic返回类似列表的对象,因为入参只有一个topic,所以直接取第一个数据,数据结构见下图,可以更直观的理解返回参数。
kafka.server.KafkaApis#createTopicTopic创建的流程如下图所示,详情请看 Topic创建流程源码分析
通过下图我们来看看返回了哪些数据,可以看到前面取了很多数据,最终拼到返回参数里面的只有leader所在的节点信息
总结这块代码本身不是很复杂,主要是有一些细节需要考虑,通过仔细思量这些细节对我们今后分析consume异常会大有好处。流程总结如下
1、寻找最小负载节点信息
2、向最小负载节点发送FindCoordinatorRequest
3、最小负载节点处理该请求。
- 首先找到该groupId对应的分区
- 通过内存中缓存的metaData获取该分区的信息,如果不存在则创建topic
- 返回查找到的分区leader信息
从开始决定送书到现在,我已经送麻了, 又快到周五啦,接着搞, 这次联合北大出版社送书, 周五一口气送
「 35 」本,。
提供 其中N款书
1自然语言处理NLP从入门到项目实战(Python语言实现)
2机器学习数学基础:概率论与数理统计
3数据结构和算法基础(Java语言实现)
4 Vue.js框架与Web前端开发从入门到精通
5 人工智能数学基础与Python机器学习实战
6 Vue.js全家桶零基础入门到进阶项目实战
7 Node.js入门指南
8 Web渗透攻防实战
9 架构基础:从需求到架构
1亿级流量Java高并发与网络编程实战
2Java高并发编程指南
3Java深度调试技术
4Java多线程与大数据处理实战
5Java核心技术及面试指南
参与方式:
3. 给本文「一键三连」 支持博主
4. 加抽奖群,参与抽奖 周五开奖!
【编辑推荐】
合理的架构设计使系统具有更好的稳定性、可扩展性和安全性,因此架构设计在系统开发中扮演着极其重要的角色,是系统开发人员的bibei知识。本书深入浅出地对架构设计领域的知识进行了系统全面的讲解,详细剖析了不同功能模块的先进设计理念,并结合了大厂实战案例进行分析,是一本非常有参考价值的书籍。
参与抽奖
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)