数据格式可以想象成一个 KV 格式的消息,key 就是一个三元组:groupid+topic+分区号,而 value 就是 offset 的值。
查看方式:使用kafka自带的读取类
/bin/kafka-console-consumersh --topic __consumer_offsets --partition 01 --bootstrap-server xxx:9092 --formatter "kafkacoordinatorgroupGroupMetadataManager$OffsetsMessageFormatter" --from-beginning --max-messages 30
一般情况下, 使用 OffsetsMessageFormatter 打印的格式可以概括为:
"[%s,%s,%d]::[OffsetMetadata[%d,%s],CommitTime %d,ExpirationTime %d]"format(group, topic, partition, offset, metadata, commitTimestamp, expireTimestamp)
数据内容:
[flink-payment-alert_query_time_1576066085229,payment-result-count,4]::NULL
[flink-payment-alert_query_time_1576066085229,payment-result-count,3]::NULL
[flink-payment-alert_query_time_1576066085229,payment-result-count,9]::NULL
另外一种是
[work_default_ywintspringcxywblackgoldkafkaorderdomaincoresub,work_default_ywintspringcxywblackgoldkafkaorderdomaintopic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optionalempty, metadata=, commitTimestamp=1636939024066, expireTimestamp=None)
[work_default_ywintspringcxywblackgoldkafkaorderdomaincoresub,work_default_ywintspringcxywblackgoldkafkaorderdomaintopic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optionalempty, metadata=, commitTimestamp=1636939028621, expireTimestamp=None)
[work_default_ywintspringcxywblackgoldkafkaorderdomaincoresub,work_default_ywintspringcxywblackgoldkafkaorderdomaintopic,0]::OffsetAndMetadata(offset=19, leaderEpoch=Optionalempty, metadata=, commitTimestamp=1636939033680, expireTimestamp=None)
还有一种是
[ProcessEngineBusinessProcess,CasBusinessTopic,1]::[OffsetMetadata[99649027,NO_METADATA],CommitTime 1636930671854,ExpirationTime 1637017071854]
[ProcessEngineBusinessProcess,CasBusinessTopic,0]::[OffsetMetadata[99650360,NO_METADATA],CommitTime 1636930671854,ExpirationTime 1637017071854]
[ProcessEngineBusinessProcess,CasBusinessTopic,3]::[OffsetMetadata[99640798,NO_METADATA],CommitTime 1636930672471,ExpirationTime 1637017072471]
分别解释一下:
在 Kafka 中有一个名为“delete-expired-group-metadata”的定时任务来负责清理过期的消费位移,这个定时任务的执行周期由参数 offsetsretentioncheckintervalms 控制,默认值为600000,即10分钟。这和普通的topic的不太一样
还有 metadata,一般情况下它的值要么为 null 要么为空字符串,OffsetsMessageFormatter 会把它展示为 NO_METADATA,否则就按实际值进行展示。
看一下源码里这些类的结构
case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
commitTimestamp: Long = orgapachekafkacommonrequestsOffsetCommitRequestDEFAULT_TIMESTAMP,
expireTimestamp: Long = orgapachekafkacommonrequestsOffsetCommitRequestDEFAULT_TIMESTAMP) {
case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadataNoMetadata) {
override def toString = "OffsetMetadata[%d,%s]"
format(offset,
if (metadata != null && metadatalength > 0) metadata else "NO_METADATA")
}
@Deprecated
public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1
另外0110之后对应的数据格式版本是V2,这个版本的消息相比于v0和v1的版本而言改动很大,同时还参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。
另外:
offset为什么会有墓碑消息
因为offset本身也会过期清理受offsetsretentionminutes 这个配置的影响
看下官网介绍
After a consumer group loses all its consumers (ie becomes empty) its offsets will be kept for this retention period before getting discarded For standalone consumers (using manual assignment), offsets will be expired after the time of last commit plus this retention period
当group里的consumer全部下线后过offsetsretentionminutes 时间后offset就会被删除
val OffsetsRetentionMinutes: Int = 7 24 60 // 默认7天
默认20之前是1天,20及以后是7天 这个官方真是要么就改为2天,结果直接改为7天,改动不可谓不大,而且active的group不会过期
附: >官方文档说得很清楚了:kafka中没有offset时,不论是什么原因,offset没了,这是autooffsetreset配置就会起作用,
最容易测试的方式就是在partition中预存放一些消息,然后新建一个consum group来消费这个partition。
测试步骤:创建两个不同组的消费者,分别设置为earliest和latest
创建两个消费者consumer1(earliest)、consumer2(lastest),分别启动后观察到consumer1消费到10条消息,consumer2消费到0条消息
可以观察到两个消费者都消费了新的10条消息
此时在kafka服务器已经记录了消费者的offset,重启后两个消费者都从记录中的offset开始消费
Kafka中的每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序号,用于partition唯一标识一条消息。
Offset记录着下一条将要发送给Consumer的消息的序号。
Offset从语义上来看拥有两种:Current Offset和Committed Offset。
Current Offset保存在Consumer客户端中,它表示Consumer希望收到的下一条消息的序号。它仅仅在poll()方法中使用。例如,Consumer第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。这样Consumer下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。这样就能够保证每次Consumer poll消息时,都能够收到不重复的消息。
Committed Offset保存在Broker上,它表示Consumer已经确认消费过的消息的序号。主要通过 commitSync 和 commitAsync
API来 *** 作。举个例子,Consumer通过poll() 方法收到20条消息后,此时Current Offset就是20,经过一系列的逻辑处理后,并没有调用 consumercommitAsync() 或 consumercommitSync() 来提交Committed Offset,那么此时Committed Offset依旧是0。
Committed Offset主要用于Consumer Rebalance。在Consumer Rebalance的过程中,一个partition被分配给了一个Consumer,那么这个Consumer该从什么位置开始消费消息呢?答案就是Committed Offset。另外,如果一个Consumer消费了5条消息(poll并且成功commitSync)之后宕机了,重新启动之后它仍然能够从第6条消息开始消费,因为Committed Offset已经被Kafka记录为5。
总结一下,Current Offset是针对Consumer的poll过程的,它可以保证每次poll都返回不重复的消息;而Committed Offset是用于Consumer Rebalance过程的,它能够保证新的Consumer能够从正确的位置开始消费一个partition,从而避免重复消费。
在Kafka 09前,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录中(zookeeper其实并不适合进行大批量的读写 *** 作,尤其是写 *** 作)。而在09之后,所有的offset信息都保存在了Broker上的一个名为__consumer_offsets的topic中。
Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的。
Group Coordinator是运行在Kafka集群中每一个Broker内的一个进程。它主要负责Consumer Group的管理,Offset位移管理以及 Consumer Rebalance 。
对于每一个Consumer Group,Group Coordinator都会存储以下信息:
Consumer Group如何确定自己的coordinator是谁呢? 简单来说分为两步:
由于一个partition只能固定的交给一个消费者组中的一个消费者消费,因此Kafka保存offset时并不直接为每个消费者保存,而是以groupid-topic-partition -> offset的方式保存。如图所示:
Kafka在保存Offset的时候,实际上是将Consumer Group和partition对应的offset以消息的方式保存在__consumers_offsets这个topic中 。
__consumers_offsets默认拥有50个partition,可以通过
的方式来查询某个Consumer Group的offset信息保存在__consumers_offsets的哪个partition中。下图展示了__consumers_offsets中保存的offset消息的格式:
如图所示,一条offset消息的格式为groupid-topic-partition -> offset。因此consumer poll消息时,已知groupid和topic,又通过Coordinator分配partition的方式获得了对应的partition,自然能够通过Coordinator查找__consumers_offsets的方式获得最新的offset了。
前面我们已经描述过offset的存储模型,它是按照 groupid-topic-partition -> offset 的方式存储的。然而Kafka只提供了根据offset读取消息的模型,并不支持根据key读取消息的方式。那么Kafka是如何支持Offset的查询呢?
答案就是Offsets Cache!!
如图所示,Consumer提交offset时,Kafka Offset Manager会首先追加一条条新的commit消息到__consumers_offsets topic中,然后更新对应的缓存。读取offset时从缓存中读取,而不是直接读取__consumers_offsets这个topic。
我们已经知道,Kafka使用 groupid-topic-partition -> offset 的消息格式,将Offset信息存储在__consumers_offsets topic中。请看下面一个例子:
如图,对于audit-consumer这个Consumer Group来说,上面的存储了两条具有相同key的记录: PageViewEvent-0 -> 240 和 PageViewEvent-0 -> 323 。事实上,这就是一种无用的冗余。因为对于一个partition来说,我们实际上只需要它当前最新的Offsets。因此这条旧的 PageViewEvent-0 -> 240 记录事实上是无用的。
为了消除这样的过期数据,Kafka为__consumers_offsets topic设置了Log Compaction功能。Log Compaction意味着对于有相同key的的不同value值,只保留最后一个版本。如果应用只关心key对应的最新value值,可以开启Kafka的Log Compaction功能,Kafka会定期将相同key的消息进行合并,只保留最新的value值。
这张生动的阐述了Log Compaction的过程:
下图阐释了__consumers_offsets topic中的数据在Log Compaction下的变化:
autooffsetreset 表示如果Kafka中没有存储对应的offset信息的话(有可能offset信息被删除),消费者从何处开始消费消息。它拥有三个可选值:
看一下下面两个场景:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)