在Kafka内部有一个名为__consumer_offsets的topic,这个topic主要保存了每个消费组对topic下的partition消费的偏移量,即消费到了哪个offset。为了实现消费组offset管理的功能,当一个消费者获取到一条消息时,需要让消费者使用offset commit请求API来提交offsets。我们让brokers记录offset消息到磁盘上,并且使用消息的复制特征,达到持久性和可用性。所以实际上在broker端的offset提交处理和生产者请求的处理是一样的逻辑。在broker端重构代码可以使我们重用已有的代码。
OffsetManager模块提供了对这种偏移量的管理功能,主要包括
(1)缓存最新的偏移量 (2)提供对偏移量的查询 (3)compact 保留最新的偏移量,以此来控制该Topic的日志大小
其中offsetsCache提供针对Consumer偏移量的保存和查询,compact作为定时任务,定时执行。
1、Consumer偏移量的保存Consumer在接收到消息后,将当前消息的偏移量发生至__consumer_offsets,当Broker Server接收到消息时,会将这些消息保存到日志。一个offset提交消息包括下面字段:
其中key包括Consumer Group(消费组),topic,partition。Payload包括Offset。metadata,timestamp。
当Broker Server接收到消息时,除了将消息保存至日志以外,还会调用OffsetManager提供的putOffset方法将消息保存至offsetsCache中,代码如下:
def putOffsets(group: String, offsets: Map[TopicAndPartition, OffsetAndmetadata]) { offsets.foreach { case (topicAndPartition, offsetAndmetadata) => putOffset(GroupTopicPartition(group, topicAndPartition), offsetAndmetadata) } } private def putOffset(key: GroupTopicPartition, offsetAndmetadata: OffsetAndmetadata) { offsetsCache.put(key, offsetAndmetadata) }2、Consumer偏移量的读取
当Broker Server接收到查询偏移量的请求时,如果发现偏移量保存在Kafka中,则调用OffsetManager中的getOffsets方法将偏移量取出,代码如下:
def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetmetadataAndError] = { //计算该group位于__consumer_offsets的哪个分区 val offsetsPartition = partitionFor(group) followerTransitionLock synchronized { //只有partition为leader所在的Broker Server提供查询服务 if (leaderIsLocal(offsetsPartition)) { //如果目标partition的数据正在加载,则无法获取其偏移量,其只会发生在Broker Server启动阶段,因为需要从指定的主分区加载数据 if (loadingPartitions synchronized loadingPartitions.contains(offsetsPartition)){ topicPartitions.map { topicAndPartition => val groupTopicPartition = GroupTopicPartition(group, topicAndPartition) (groupTopicPartition.topicPartition, OffsetmetadataAndError.OffsetsLoading) }.toMap } else { if (topicPartitions.size == 0) { // 如果topicPartitions的大小为0,则获取该group的所有偏移量消息 offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndmetadata) => (groupTopicPartition.topicPartition, OffsetmetadataAndError(offsetAndmetadata.offset, offsetAndmetadata.metadata, ErrorMapping.NoError)) }.toMap } else { //如果topicPartitions的大小不为0,则返回指定的偏移量 topicPartitions.map { topicAndPartition => val groupTopicPartition = GroupTopicPartition(group, topicAndPartition) (groupTopicPartition.topicPartition, getOffset(groupTopicPartition)) }.toMap } } } else { //partition不为leader,不对外提供服务 topicPartitions.map { topicAndPartition => val groupTopicPartition = GroupTopicPartition(group, topicAndPartition) (groupTopicPartition.topicPartition, OffsetmetadataAndError.NotOffsetManagerForGroup) }.toMap } } }
需要注意的是,Kafka是如何将Consumer Group产生的偏移量保存在__consumer_offsets的不同分区上的?其本质是通过计算不同的Consumer Group的hash值和__consumer_offsets的分区数的模数,其结果作为指定分区的索引。因此在getOffsets的第一步就开始进行取模运算。
def partitionFor(group: String): Int = Utils.abs(group.hashCode) % config.offsetsTopicNumPartitions
不同的group对应的偏移量信息在__consumer_offsets的Topic里面存放格式如下:
其中key包括group+topic+partition,Value就是消费组的消费偏移量
3、Compact策略当Consumer Group经过长时间运行之后,不再产生偏移量时,很可能其已经不需要保存在__consumer_offsets里面的偏移量信息时,此时Broker Server 需要有一种机制去清理之前保存的偏移量,这就是所谓的Compact策略。这个策略会将长时间没有更新的Consumer Group对应的偏移量清理掉,保留持续不断在更新的Consumer Group的偏移量,其具体执行流程如下:
private def compact() { val startMs = SystemTime.milliseconds //通过将当前时间减去上次更新时间,判断每个值的活跃度,将那些长时间没有更新的筛选出来 val staleOffsets = offsetsCache.filter(startMs - _._2.timestamp > config.offsetsRetentionMs) //将长时间没有消费的offset记录下来,然后从缓存中删除 // delete the stale offsets from the table and generate tombstone messages to remove them from the log val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndmetadata) => //筛选出分区索引 val offsetsPartition = partitionFor(groupTopicAndPartition.group) //从内存中删除 offsetsCache.remove(groupTopicAndPartition) //然后产生墓碑记录,即bytes为null,只是将key传进去产生空记录,最后按partition分组 val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group, groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition) (offsetsPartition, new Message(bytes = null, key = commitKey)) }.groupBy{ case (partition, tombstone) => partition } //将墓碑记录写进日志文件,如果在开启日志合并线程的情况下,则会保留最新的记录,即Value为null的记录 val numRemoved = tombstonesForPartition.flatMap { case(offsetsPartition, tombstones) => val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition) partitionOpt.map { partition => val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition) val messages = tombstones.map(_._2).toSeq try { partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) tombstones.size } catch { ...... } } }.sum }
在开启日志合并线程的情况下,最后只会保留为null的记录
4、总结总之,Kafka集群通过OffsetManager模块提供了记录、查询offset偏移量的功能,同时通过compact函数定时清理长时间没有更新的偏移量,控制了Topic的大小,为Consumer Group不断消费消息提供了支撑。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)