kafka 0.10.0.1 consumer源码解析

kafka 0.10.0.1 consumer源码解析,第1张

kafka 0.10.0.1 consumer源码解析 消费者源码解析,基于0.10.0.1版本 初始化 consumer 对象
// 主要初始化的几个组件:new几个对象,NetworkClient、ConsumerCoordinator、Fetcher
new KafkaConsumer(properties);
拉取数据
ConsumerRecords records = kafkaConsumer.poll(5000);
    // 核心方法:
    org.apache.kafka.clients.consumer.KafkaConsumer#pollonce
        // 确保 coordinator 准备好,确定哪台服务器是 coordinator
        coordinator.ensureCoordinatorReady();
        org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady
            // 寻找该组对应的 coordinator,返回一个 future 对象,之后对该对象进行轮询,直到拿到返回值。拿到返回值后判断 coordinator 是否正常
            org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendGroupCoordinatorRequest
                // 寻找一台负载较低,未完成请求比较少的节点,连接该节点获取coordinator
                Node node = this.client.leastLoadedNode();
                return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
                .compose(new RequestFutureAdapter() {
                    @Override
                    public void onSuccess(ClientResponse response, RequestFuture future) {
                        // 根据服务端返回的相应信息,封装coordinator
                        handleGroupmetadataResponse(response, future);
                            // start sending heartbeats only if we have a valid generation
                            if (generation > 0)
                                // 开启心跳任务
                                heartbeatTask.reset();
                                    // 执行任务调度
                                    client.schedule(this, now);
                                        // 往一个延时队列里面塞入一个任务
                                        org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#schedule
                                            delayedTasks.add(task, at);
                                        // 这个延时队列有个死循环的代码,只要往里面加任务,就会拿出来执行
                                        
                                        public void poll(long now) {
                                            while (!tasks.isEmpty() && tasks.peek().timeout <= now) {
                                                Entry entry = tasks.poll();
                                                entry.task.run(now);
                                            }
                                        }
                    }
                });

                // 服务端执行 ApiKeys.GROUP_COORDINATOR 的请求逻辑。
                // kafka.server.KafkaApis是server端执行各种kafka请求的地方
                case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
                kafka.server.KafkaApis#handleGroupCoordinatorRequest
                    // 计算 coordinator 在 __consumer_offsets 主题的哪个分区下
                    kafka.coordinator.GroupCoordinator#partitionFor
                        // group id 的 hash 值对 __consumer_offsets 主题的分区数取模
                        def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupmetadataTopicPartitionCount

        // coordinator准备好后,就开始发送 join group 请求,同时选举出 leader consumer
        org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#ensurePartitionAssignment
        if (subscriptions.partitionsAutoAssigned()) // 调用 subscribe 方法时会调用 setSubscriptionType(SubscriptionType.AUTO_TOPICS); 从而这个判断会返为true
             coordinator.ensurePartitionAssignment();
            // join group 的方法
            org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureActiveGroup
                org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendJoinGroupRequest
                    return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
                                    .compose(new JoinGroupResponseHandler());

                    // 服务端执行 ApiKeys.JOIN_GROUP 的请求逻辑
                                case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
                                kafka.server.KafkaApis#handleJoinGroupRequest
                                    kafka.coordinator.GroupCoordinator#handleJoinGroup
                                            // 判断当前组的状态处于哪个阶段,执行不同的方法
                                            kafka.coordinator.GroupCoordinator#doJoinGroup
                                                // 这里决定了哪个 consumer 是 leader consumer
                                                kafka.coordinator.Groupmetadata#add
                                                    def add(memberId: String, member: Membermetadata) {
                                                        assert(supportsProtocols(member.protocols))
                                                        // 从这个方法可以看出哪个 consumer 先来,哪个就是 leader consumer
                                                        if (leaderId == null)
                                                          leaderId = memberId
                                                        members.put(memberId, member)
                                                    }

                // 回调函数 compose(new JoinGroupResponseHandler()) 这里当发生 join group 请求返回成功后会回调
                    org.apache.kafka.clients.consumer.internals.AbstractCoordinator.JoinGroupResponseHandler#handle
                        // 组里的所有成员都会发送 sync group 的请求,但是只有 leader 需要制定消费数据的方案
                        if (joinResponse.isLeader()) {
                            onJoinLeader(joinResponse).chain(future);
                                org.apache.kafka.clients.consumer.internals.AbstractCoordinator#onJoinLeader
                                    // 制定消费数据方案
                                    org.apache.kafka.clients.consumer.internals.AbstractCoordinator#performAssignment
                                    // 发送 sync group 请求,顺便带上消费数据方案
                                    org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendSyncGroupRequest
                                    return client.send(coordinator, ApiKeys.SYNC_GROUP, request)
                                                    .compose(new SyncGroupResponseHandler());
                        } else {
                            onJoinFollower().chain(future);
                        }

                    // 服务端执行 ApiKeys.SYNC_GROUP 的请求逻辑
                    case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
                    kafka.server.KafkaApis#handleSyncGroupRequest
                        // 根据组的不同状态执行不同逻辑
                        kafka.coordinator.GroupCoordinator#doSyncGroup
                            // 由于 join group 已经完成,所以状态应该是AwaitingSync,这里不管是 leader 还是 follower 都会进入这里
                            case AwaitingSync =>
                                // 准备好回调,用来等下下发数据消费方案
                                group.get(memberId).awaitingSyncCallback = responseCallback
                                // if (memberId == group.leaderId) 
                                    // 下发数据消费方案到各个消费者
                                    setAndPropagateAssignment(group, assignment)
                                        // 下发数据消费方案
                                        kafka.coordinator.GroupCoordinator#propagateAssignment
                                        for (member <- group.allMembermetadata) {
                                          if (member.awaitingSyncCallback != null) {
                                            // 调用刚才准备好的的回调下发数据消费方案
                                            member.awaitingSyncCallback(member.assignment, errorCode)
                                            member.awaitingSyncCallback = null
                                    
                                            // reset the session timeout for members after propagating the member's assignment.
                                            // This is because if any member's session expired while we were still awaiting either
                                            // the leader sync group or the storage callback, its expiration will be ignored and no
                                            // future heartbeat expectations will not be scheduled.
                                            completeAndScheduleNextHeartbeatExpiration(group, member)
                                          }
                                    // 更改消费者组状态为 stable
                                    group.transitionTo(Stable)
未完待续

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5694450.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存