Kafka文件存储机制

Kafka文件存储机制,第1张

在server.properties文件中配置了碧李log.dir属性,该目录存储日志文件

在生产者创建主题时,会在该目录下创建 “topic名称”+“-”+“分区号” 拼接的目录,该目录存储的是消息数据。

例如,主题名为 t01,对应会创建 t01-0,t01-1目录

效率低下,Kafka 采取了 分片 索引 机制,将每个 partition 分为多个 segment。每个 segment

对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的型嫌命名

规则为:topic 名称+分区序号。例如,first 这个 topic 有三个分区,则其对应的文件夹为 first-

0,first-1,first-2。

index 和 log 文件以当前 segment 的第一条消息的 offset 命名。悔租迟下图为 index 文件和 log

文件的结构示意图

index文件用于存储索引、log文件用于存储消息,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

Kafka 消息是以世粗山主题为单位进行归类,各个主题之间是彼此独立的,互不影响。

每个主题⼜可以分为⼀个或多个分区。

每个分区各⾃存在⼀个记录消息数据的日志文件。

图中,创建了⼀个 tp_demo_01 主题,其存在6个 Parition,对应的每个Parition下存在⼀个 [Topic-Parition] 命名的消息⽇志⽂件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的⽂件凳消,比如: .index、.timestamp、.log、.snapshot 等。

其中,文件名⼀致的⽂件集合就称为 LogSement。

当满⾜如下⼏个条件中的其中之⼀,就会触发文件的切分:

偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。

文件:

查看⼀个topic分区目录下的内容,发现有log、index和timeindex三个⽂件:

创建主题搜中:

创建消息⽂件:

将⽂本消息⽣产到主题中:

查看存储⽂件:

如果想查看这些文件,可以使⽤kafka提供的shell来完成,几个关键信息如下:

(1)offset是逐渐增加的整数,每个offset对应⼀个消息的偏移量。

(2)position:消息批字节数,用于计算物理地址。

(3)CreateTime:时间戳。

(4)magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型。

(5)compresscodec:None说明没有指定压缩类型,kafka目前提供了4种可选择,0-None、1-GZIP、2-snappy、3-lz4。

(6)crc:对所有字段进行校验后的crc值。

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息元数据中存在若⼲的时间戳信息。如果 broker 端参数log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增⻓。反之如果是CreateTime 则⽆法保证顺序。

注意:timestamp文件中的 offset 与 index ⽂件中的 relativeOffset 不是⼀⼀对应的。因为数据的写⼊是各自追加。

思考:如何查看偏移量为23的消息?

Kafka 中存在⼀个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在00000000000000000000.index ,通过二分法在偏移量索引文件中找到不⼤于 23 的最⼤索引项,即 offset 20 那栏,然后从⽇志分段⽂件中的物理位置为320 开始顺序查找偏移量为 23 的消息。

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引⽂件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若⼲的时间戳信息。

如果 broker 端参数log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是CreateTime 则无法保证顺序。

通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。

时间戳索引索引格式:前⼋个字节表示时间戳,后四个字节表示偏移量。

思考:查找时间戳为 1557554753430 开始的消息?

Kafka 提供两种⽇志清理策略:

⽇志删除:按照⼀定的删除策略,将不满⾜条件的数据进⾏数据删除

⽇志压缩:针对每个消息的 Key 进⾏整合,对于有相同 Key 的不同 Value 值,只保留最后⼀个版本。

Kafka 提供 log.cleanup.policy 参数进⾏相应配置,默认值: delete ,还可以选择 compact 。

主题级别的配置项是 cleanup.policy 。

基于时间

⽇志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定⽇志保留的

时间节点。如果超过该设定值,就需要进⾏删除。默认是 7 天, log.retention.ms 优先级最⾼。

Kafka 依据⽇志分段中最⼤的时间戳进⾏定位。

⾸先要查询该⽇志分段所对应的时间戳索引⽂件,查找时间戳索引⽂件中最后⼀条索引项,若最后⼀条索引项的时间戳字段值⼤于 0,则取该值,否则取最近修改时间。

为什么不直接选最近修改时间呢?

因为日志文件可以有意⽆意的被修改,并不能真实的反应日志分段的最⼤时间信息。

删除过程

⽇志压缩是Kafka的⼀种机制,可以提供较为细粒度的记录保留,⽽不是基于粗粒度的基于时间的保留。

对于具有相同的Key,⽽数据不同,只保留最后⼀条数据,前⾯的数据在合适的情况下删除。

⽇志压缩特性,就实时计算来说,可以在异常容灾⽅⾯有很好的应⽤途径。⽐如,我们在Spark、Flink中做实时

计算时,需要⻓期在内存⾥⾯维护⼀些数据,这些数据可能是通过聚合了⼀天或者⼀周的⽇志得到的,这些数据⼀旦

由于异常因素(内存、⽹络、磁盘等)崩溃了,从头开始计算需要很⻓的时间。⼀个⽐较有效可⾏的⽅式就是定时将

内存⾥的数据备份到外部存储介质中,当崩溃出现时,再从外部存储介质中恢复并继续计算。

使⽤⽇志压缩来替代这些外部存储有哪些优势及好处呢?这⾥为⼤家列举并总结了⼏点:

Kafka即是数据源⼜是存储⼯具,可以简化技术栈,降低维护成本

使⽤外部存储介质的话,需要将存储的Key记录下来,恢复的时候再使⽤这些Key将数据取回,实现起来有⼀定的⼯程难度和复杂度。使⽤Kafka的⽇志压缩特性,只需要把数据写进Kafka,等异常出现恢复任务时再读

回到内存就可以了

Kafka对于磁盘的读写做了⼤量的优化⼯作,⽐如磁盘顺序读写。相对于外部存储介质没有索引查询等⼯作

量的负担,可以实现⾼性能。同时,Kafka的⽇志压缩机制可以充分利⽤廉价的磁盘,不⽤依赖昂贵的内存

来处理,在性能相似的情况下,实现⾮常⾼的性价⽐(这个观点仅仅针对于异常处理和容灾的场景来说)

主题的 cleanup.policy 需要设置为compact。

Kafka的后台线程会定时将Topic遍历两次:


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/8162765.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-14
下一篇 2023-04-14

发表评论

登录后才能评论

评论列表(0条)

保存