每个consumer group都会选择一个 broker 作为自己的coordinator 它是负责监控这个消费者组里的各个消费者的心跳,以及判断是否消费者宕机了 如果其中一个消费者判断为宕机,会进行rebalance消费者组如何选择coordinator机器
1. 确定由 __consumer_offsets 位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。 2. 找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。 假设有消费者组A,首先对 groupId 进行hash得到一个数字假设为8,接着对 __consumer_offsets 的分区数量(默认为50,可以通过offsets.topic.num.partitions设置)这个值进行取模,取模后得到数字为: 8 % 50 = 8。 查看 __consumer_offsets 这个主题的8号分区 leader 副本在哪台 broker 上面,那一台 broker 就是 coordinator消费者如何与coordinator交互
1. 每个consumer都发送JoinGroup请求到coordinator 2. coordinator从consumer group中选择一个consumer作为leader 3. 将要消费的topic情况发送给这个leader 4. leader负责制定消费方案 5. 通过SyncGroup把消费方案发给coordinator 6. coordinator把消费方案下发给各个consumer 7. consumer从指定的分区的leader broker消费数据
kafka 消费者 groupcoordinator 流程图:
https://blog.csdn.net/qq_32854205/article/details/122288968
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)