// 主要初始化的几个组件:new几个对象,NetworkClient、ConsumerCoordinator、Fetcher new KafkaConsumer拉取数据(properties);
ConsumerRecords未完待续records = kafkaConsumer.poll(5000); // 核心方法: org.apache.kafka.clients.consumer.KafkaConsumer#pollonce // 确保 coordinator 准备好,确定哪台服务器是 coordinator coordinator.ensureCoordinatorReady(); org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady // 寻找该组对应的 coordinator,返回一个 future 对象,之后对该对象进行轮询,直到拿到返回值。拿到返回值后判断 coordinator 是否正常 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendGroupCoordinatorRequest // 寻找一台负载较低,未完成请求比较少的节点,连接该节点获取coordinator Node node = this.client.leastLoadedNode(); return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) .compose(new RequestFutureAdapter () { @Override public void onSuccess(ClientResponse response, RequestFuture future) { // 根据服务端返回的相应信息,封装coordinator handleGroupmetadataResponse(response, future); // start sending heartbeats only if we have a valid generation if (generation > 0) // 开启心跳任务 heartbeatTask.reset(); // 执行任务调度 client.schedule(this, now); // 往一个延时队列里面塞入一个任务 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#schedule delayedTasks.add(task, at); // 这个延时队列有个死循环的代码,只要往里面加任务,就会拿出来执行 public void poll(long now) { while (!tasks.isEmpty() && tasks.peek().timeout <= now) { Entry entry = tasks.poll(); entry.task.run(now); } } } }); // 服务端执行 ApiKeys.GROUP_COORDINATOR 的请求逻辑。 // kafka.server.KafkaApis是server端执行各种kafka请求的地方 case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request) kafka.server.KafkaApis#handleGroupCoordinatorRequest // 计算 coordinator 在 __consumer_offsets 主题的哪个分区下 kafka.coordinator.GroupCoordinator#partitionFor // group id 的 hash 值对 __consumer_offsets 主题的分区数取模 def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupmetadataTopicPartitionCount // coordinator准备好后,就开始发送 join group 请求,同时选举出 leader consumer org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#ensurePartitionAssignment if (subscriptions.partitionsAutoAssigned()) // 调用 subscribe 方法时会调用 setSubscriptionType(SubscriptionType.AUTO_TOPICS); 从而这个判断会返为true coordinator.ensurePartitionAssignment(); // join group 的方法 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureActiveGroup org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendJoinGroupRequest return client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler()); // 服务端执行 ApiKeys.JOIN_GROUP 的请求逻辑 case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request) kafka.server.KafkaApis#handleJoinGroupRequest kafka.coordinator.GroupCoordinator#handleJoinGroup // 判断当前组的状态处于哪个阶段,执行不同的方法 kafka.coordinator.GroupCoordinator#doJoinGroup // 这里决定了哪个 consumer 是 leader consumer kafka.coordinator.Groupmetadata#add def add(memberId: String, member: Membermetadata) { assert(supportsProtocols(member.protocols)) // 从这个方法可以看出哪个 consumer 先来,哪个就是 leader consumer if (leaderId == null) leaderId = memberId members.put(memberId, member) } // 回调函数 compose(new JoinGroupResponseHandler()) 这里当发生 join group 请求返回成功后会回调 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.JoinGroupResponseHandler#handle // 组里的所有成员都会发送 sync group 的请求,但是只有 leader 需要制定消费数据的方案 if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); org.apache.kafka.clients.consumer.internals.AbstractCoordinator#onJoinLeader // 制定消费数据方案 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#performAssignment // 发送 sync group 请求,顺便带上消费数据方案 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendSyncGroupRequest return client.send(coordinator, ApiKeys.SYNC_GROUP, request) .compose(new SyncGroupResponseHandler()); } else { onJoinFollower().chain(future); } // 服务端执行 ApiKeys.SYNC_GROUP 的请求逻辑 case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request) kafka.server.KafkaApis#handleSyncGroupRequest // 根据组的不同状态执行不同逻辑 kafka.coordinator.GroupCoordinator#doSyncGroup // 由于 join group 已经完成,所以状态应该是AwaitingSync,这里不管是 leader 还是 follower 都会进入这里 case AwaitingSync => // 准备好回调,用来等下下发数据消费方案 group.get(memberId).awaitingSyncCallback = responseCallback // if (memberId == group.leaderId) // 下发数据消费方案到各个消费者 setAndPropagateAssignment(group, assignment) // 下发数据消费方案 kafka.coordinator.GroupCoordinator#propagateAssignment for (member <- group.allMembermetadata) { if (member.awaitingSyncCallback != null) { // 调用刚才准备好的的回调下发数据消费方案 member.awaitingSyncCallback(member.assignment, errorCode) member.awaitingSyncCallback = null // reset the session timeout for members after propagating the member's assignment. // This is because if any member's session expired while we were still awaiting either // the leader sync group or the storage callback, its expiration will be ignored and no // future heartbeat expectations will not be scheduled. completeAndScheduleNextHeartbeatExpiration(group, member) } // 更改消费者组状态为 stable group.transitionTo(Stable)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)