kafka原理—01数据存储、日志清理

kafka原理—01数据存储、日志清理,第1张

kafka原理—01数据存储、日志清理 kafka数据存储 一、kafka选型

kafka的设计初衷,是为了解决海量数据的读写,能否快速读写,成了决定因素

1、写
  • 尾部追加,不允许更新

| 顺序读写比随机读写快

2、读

常用两种索引

  • B+tree索引
    • 范围查询,时间复杂度:O(log n)
  • Hash索引
    • 等值查询,只需经过一次算法即可获得对应的键值,时间复杂度:O(1)

选择hash索引的原因(两种索引方式的区别可自行百度):

  • 不需要额外的空间存储索引,不需要维护索引
  • 等值查询更快(由offset进行数据定位)
二、数据存储

segment分为索引文件和日志文件

1、索引文件

包含两个部分:偏移量、存储位置(message的物理位置)

  • 偏移量索引文件.index
    • 根据消息偏移量查找映射关系
    • 4个字节存储相对偏移量
    • 4个字节存储position
  • 时间索引文件.timeindex(0.10.1版本才有)
    • 根据时间戳查找映射关系
    • 8个字节timestamp存储
    • 4个字节存储position
2、日志文件

.log默认大小1G

包含两个部分:消息、消息物理偏移地址

根据上图一般会有些困惑?

  • 1、如何查找?以偏移量368775为例

    • 根据index文件名排序,通过二分法定位具体index
    • 找到不比368775大的.index文件(368769.index,442365.index)
    • 368775比368769大6,所以找到位置,6,1407
    • 在368769.log中定位到偏移量为1407的消息
    • Message368775即为对应的消息
    • (timeindex文件同理)
  • 2、索引文件的编号为什么不连续?

    • 为了将索引存在内存中,采用稀疏存储
    • 避免索引占用空间,每隔4k简历一条索引
    • 没有建立索引的offset,无法一次定位,但是扫描范围较小
3、消息 3.1 消息格式的版本 kafka版本message版本segment0.10前v0.log+.index0.10.xv1.log+.index+.timeindex0.11~之后v2.log+.index+.timeindex

(上图为message版本)

(上图为attributes各位含义)

一条/一批次消息的各部分含义

  • CRC32:消息的CRC校验码。
  • magic:消息版本。当magic为0时,消息的offset使用绝对offset;当magic为1时,消息的offset使用相对offset。
  • attributes:
    • 0~2位表示消息使用的压缩类型,
      • 0(000)-无压缩;
      • 1(001)-gzip;
      • 2(010)-snappy;
      • 3(011)-lz4。
    • 第3位表示时间戳类型,
      • 0->创建时间
      • 1->追加时间。
    • 其他属性如图
  • Control消息用来支持事务功能
  • key length,key,value length,value顾名思义,表示key、value的长度与值
  • producer id,producer epoch,first sequence用于幂等

v2版本record batch中attribute属性废弃,但仍然保留字段

3.2版本小结
  • V0 V1缺陷:无论key/value是否存在,都需要4字节保存信息

  • V0最小消息14字节,V1版最小消息为22字节,V2版本最小消息集为61字节。小于最小消息则消息非法

  • v2虽然最小字节变大了,然而批量发送场景下,提升了效率(record只存了消息内容)

v0无timestamp,是否不支持时间策略?

查询时timestamp返回-1,不支持

三、日志清理策略
log.cleanup.policy = delete,compact

delete-日志删除;compact-日志压缩

kafka同时支持两种清理策略

1、日志删除

属于粗粒度的清理,直接删除segment

1.1 三种策略
  • 基于时间策略
    • 顾名思义,超过一定时间则删除
  • 基于大小策略
    • 顾名思义,超过一定大小则删除
  • 基于日志起始偏移量
    • 顾名思义,根据偏移量进行删除
1.2 配置
 // 时间
log.retention.hours=168 //7d
// 大小
log.retention.bytes= 1073741824 //1G

log.retention.check.interval.ms=300000 //5min
log.segment.bytes=1073741824 //1G
log.cleaner.delete.retention.ms=86400000 // 1d
log.cleaner.backoff.ms=15000 //15s

配置含义:

  • segment的大小为1GB,每5分钟检查一次是否有segment已经过了7d,如果有将其标记为.deleted(0.index.deleted)。
  • 清理线程会每隔15秒检查一次,是否有标记为deleted的segment的保留时间超过一天了,如果有将其从文件系统删除

若多种策略同时配置,谁先到达配置阈值,就用哪种策略

2、日志压缩(Log Compaction)

属于细粒度的清理,以key为粒度的清理

使用场景:kafka异常恢复、只关心最新数据

2.1 效果

根据key只保留最新的value,如果value为null,该key一段时间后会被清理

2.2 选择合适的文件

根据日志污浊率限定清理范围

log.cleaner.min.cleanable.ratio(默认值为0.5)

dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes)
日志的污浊率 = dirty部分的日志占用大小 / (clean部分的日志占用大小+dirty部分的日志占用大小)

2.3 流程
  • 1、压缩过程中将需要保留的消息拷贝到一个.clean为后缀的临时文件中(0000.log.clean)
  • 2、压缩后将.clean文件修改为.swap文件(0000.log.clean->0000.log.swap)
  • 3、删除原有log文件,将.swap后缀删除(0000.log.swap->0000.log)

只以log为例,压缩方式是针对整个segment(.index .log)

2.4 原理
  • 0、每个broker会启动log.cleaner.thread(默认值为1)个线程负责执行清理任务

    • 检查cleaner-offset-checkpoint文件
      • firstDirtyOffset:每次清理的起点,清理完成后会更新
    • 根据firstDirtyOffset将log文件分成clean(清理)、dirty(未清理)两部分
  • 1、第一遍把消息的每个key的哈希值和最后出现的offset存储在SkimpyOffsetMap(哈希表)中

  • 2、第二遍检查消息是否符合保留条件,将不符合的删除

    • 如果value不为null,直接保留最新
    • 如果value为null(墓碑消息tombstone,默认24h)
      • 先常规清理——消息会保留一段时间,consumer能获取到该消息,且发现value被删除
      • 一段时间后,key会被删除,consumer也无法获取
      • (为后续压缩打标记)

如果要删除一个key,put一个null就行

2.5 重新分组

因为压缩后的各个segment都会变小,需要重新分组来重写log和index

将多个小的文件进行合并

分组规则

  • log大小之和为1g,index文件之和为10m(默认情况)
  • 清理墓碑消息、保留最新value
2.6 真正的清理过程

清理过程一共分为3步

  • step1、遍历所有需要清理的segment,开始清理日志,生成新的segment
  • step2、清除墓碑消息,并将1中的多个小的segment进行merge
  • step3、merge step2生成的segment,重写log和index

注意:日志压缩、日志删除都不包含activeSegment(正在写入的segment)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5698698.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存