答:是生产者设置的,生产者在发送消息的时候,为每条消息生成一个唯一的offset。
Kafka消息的格式?
答:
Kafka最新版本的消息集叫做RecordBatch,而不是先前的MessageSet。RecordBatch内部存储了一条或多条消息。
RecordBatch的结构包含以下部分:
first offset,起始位移,占位8B
length,消息总长度,占位4B
partition leader epoch,分区leader纪元,可以看做分区leader的版本号或者更新次数,占位4B。
magic,消息格式的版本号,对于V2版本而言,magic的值为2。
attributes,消息属性,占位2B,低三位表示压缩格式,第4位表示时间戳类型,第五位表示当前RecordBatch是否处于事务中第6位表示是否控制消息。
last offset delta,占位4B,RecordBatch中最后一个Record的offset与first offset的差值,主要被broker用来确保RecordBatch中Record组装的正确性。
first timestamp,占位8B,RecordBatch中第一条Record的时间戳。
max timestamp,占位8B,RecordBatch中最大的时间戳,一般情况下是最后一个Record的时间戳。和last offset delta功能一样,主要被broker用来确保RecordBatch中Record组装的正确性。
producer id,即PID,占位8B,用来支持幂等和事务。
producer epoch,占位2B,用来支持幂等和事务。
first sequence,占位4B,用来支持幂等和事务。
records count,占位4B,RecordBatch中Record的总数。
records,存放消息的容器。
Records的数据结构又是什么样的呢?Record包含以下属性:
length,消息总长度。
attributes,目前已弃用,但是还是会占用1B的空间,以备未来的格式扩展。
timestamp delta,时间戳增量,通常一个timestamp占用8B,这里时间戳增量保存的是当前时间戳与RecordBatch中first timestamp的差值,这样可以节省占用空间。
offset delta,位移增量,这个是当前消息的位移与RecordBatch中first offset的差值,这样可以节省占用空间。
key length,消息的key的长度。
key,消息key。
value length,消息的value的长度。
value,消息的值。
headers count,headers的总数。
headers,这个字段是用来支持应用级别的扩展,而不需要将一些应用级别的属性嵌入到消息体中。
Kafka中的每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中。Partition中的每个消息都有一个连续的序号,用于Partition唯一标识一条消息。Offset记录着下一条将要发送给Consumer的消息的序号。
Offset从语义上来看拥有两种: Current Offset 和 Committed Offset 。
Current Offset保存在Consumer客户端中,它表示Consumer希望收到的下一条消息的序号。它仅仅在poll()方法中使用。如,Consumer第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。这样Consumer下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。这样就能够保证每次Consumer poll消息时,都能够收到不重复的消息。
Committed Offset保存在Broker上(V0.9之后的版本),它表示Consumer已经确认消费过的消息的序号。主要通过commitSync()来 *** 作。举例: Consumer通过poll() 方法收到20条消息后,此时Current Offset就是20,经过一系列的逻辑处理后,并没有调用commitSync()来提交Committed Offset,那么此时Committed Offset依旧是0。
Committed Offset主要用于Consumer Rebalance(再平衡)。在Consumer Rebalance的过程中,一个Partition被分配给了一个Consumer,那么这个Consumer该从什么位置开始消费消息呢?答案就是Committed Offset。另外,如果一个Consumer消费了5条消息(poll并且成功commitSync)之后宕机了,重新启动之后,它仍然能够从第6条消息开始消费,因为Committed Offset已经被Kafka记录为5。
小结一下 :
在Kafka V0.9前,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录中(zookeeper其实并不适合进行大批量的读写 *** 作,尤其是写 *** 作)。在V0.9之后,所有的offset信息都保存在了Broker上的一个名为__consumer_offsets的topic(系统自维护的)中。
auto.offset.reset表示如果Kafka中没有存储对应的offset信息的话(有可能offset信息被删除),消费者从何处开始消费消息。有三个可选值:
分两个场景来说明:
a) Consumer消费了5条消息后宕机了,重启之后它读取到对应的Partition的Committed Offset为5,因此会直接从第6条消息开始读取。此时完全依赖于Committed Offset机制,和auto.offset.reset配置完全无关。
b) 新建了一个新的Group,并添加了一个Consumer,它订阅了一个已经存在的Topic。此时Kafka中还没有这个Consumer相应的Offset信息,因此此时Kafka就会根据auto.offset.reset配置来决定这个Consumer从何处开始消费消息。
在Kafka文件存储中,同一个topic下有多个不同的partition,每个partiton为一个目录,partition的名称规则为:topic名称+有序序号,第一个序号从0开始计,最大的序号为partition数量减1,partition是实际物理上的概念,而topic是逻辑上的概念。
如果就以partition为最小存储单位,我们可以想象当Kafka producer不断发送消息,必然会引起partition文件的无限扩张,这样对于消息文件的维护以及已经被消费的消息的清理带来严重的影响,所以这里以segment为单位又将partition细分。每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等的segment(段)数据文件中(每个segment 文件中消息数量不一定相等)这种特性也方便old segment的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个partition只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定。
segment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。这两个文件的命令规则为:Partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下:
以上面的segment文件为例,展示出segment:00000000000000170410的“.index”文件和“.log”文件的对应的关系,如下图:
如上图,“.index”索引文件存储大量的元数据,“.log”数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中message的物理偏移地址。其中以“.index”索引文件中的元数据[3, 348]为例,在“.log”数据文件表示第3个消息,即在全局partition中表示170410+3=170413个消息,该消息的物理偏移地址为348。
那么如何从partition中通过offset查找message呢?以上图为例,读取offset=170418的消息,首先查找segment文件,其中00000000000000000000.index为最开始的文件,第二个文件为00000000000000170410.index(起始偏移为170410+1=170411),而第三个文件为00000000000000239430.index(起始偏移为239430+1=239431),所以这个offset=170418就落到了第二个文件之中。其他后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。其次根据00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置进行读取。
———————————————————
坐标帝都,白天上班族,晚上是知识的分享者
如果读完觉得有收获的话,欢迎点赞加关注
【转】 https://www.baidu.com/link?url=3sPGFihkBbhc41jSaAzCwPnANCbs1z56_l4a1eXnhRtd2XxT5B4r7_2HB3wJT2IhoGMzpD858l_RKRrU8TDQm_&wd=&eqid=e88a5ce70002861e000000035d4cdc76
在kafka的消费者中,有一个非常关键的机制,那就是offset机制。它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。它好比看一本书中的书签标记,每次通过书签标记(offset)就能快速找到该从哪里开始看(消费)。
Kafka对于offset的处理有两种提交方式:(1) 自动提交(默认的提交方式) (2) 手动提交(可以灵活地控制offset)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)