说到kafka的日志清理策略,就会涉及到配置项cleanup.policy,在broker server端配置项是log.cleanup.policy,在topic级别则是cleanup.policy。所以可以是全局设置,或者单独对某个topic设置,一般不会修改默认值(即delete),根据业务需求自己单独可对topic设置其他policy。
log.cleanup.policy的默认值是delete,可选项有delete/compact,在0.10版本后才可以同时配置delete,compact。
上面的两个值就是对应了kafka提供的两种日志清理策略:
日志删除(delete):按照一定的保留策略来直接删除不符合条件的日志分段。
日志压缩(compact):针对每个消息的key进行compact,对于有相同key的的不同value值进行合并,只保留最后一个版本。注意这个compact和日志本身的压缩算法不是一回事。
(下面说的所有日志文件就是指Log,即topic的单个partition;日志文件分段就是LogSegment)
deletekafka日志管理器中(LogManager)会有一个专门的日志删除任务来周期性检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。当前日志分段的保留策略有3种:基于时间的保留策略、基于日志大小的保留策略以及基于日志起始偏移量的保留策略。
源码阅读:对应源码中的LogManager->startup()里的 kafka-log-retention 线程任务(假如配置了compact或者delete,compact同时配置,则删除就不再由该线程处理了而是由另外一个线程LogCleaner处理)。注意里面的cleanupLogs方法,即cleanup.policy里不能包含有compact,不然不会执行delete相关的 *** 作,这也看出和同时配置delete,compact的不同,同时配置的话,会由LogCleaner类的相关逻辑去处理。不同的删除策略由deleteOldSegments方法处理
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
/**
* Delete any eligible logs. Return the number of segments deleted.
* Only consider logs that are not compacted.
*/
def cleanupLogs() {
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
for(log <- allLogs; if !log.config.compact) {
debug("Garbage collecting '" + log.name + "'")
total += log.deleteOldSegments()
}
debug("Log cleanup completed. " + total + " files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}
/**
* Delete any log segments that have either expired due to time based retention
* or because the log size is > retentionSize
*/
def deleteOldSegments(): Int = {
if (!config.delete) return 0
deleteRetenionMsBreachedSegments() + deleteRetentionSizeBreachedSegments()
}
这里说下各个删除策略以及相关配置:
基于时间
代码调用链:deleteOldSegments -> deleteSegment -> asyncDeleteSegment -> scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值retentionMs来寻找可删除的的日志分段文件集合deletableSegments。retentionMs可以通过broker端参数log.retention.hours、log.retention.minutes以及log.retention.ms来配置,优先级是ms > minutes > hours。也可以通过topic级别的配置项retention.ms来设置。默认情况下只有log.retention.hours有值为168,所以默认情况下日志分段文件的保留时间为7天。
查找过期的日志分段文件,是根据日志分段中最大的时间戳largestTimeStamp来计算。要获取日志分段中的最大时间戳largestTimeStamp的值,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则才设置为最近修改时间lastModifiedTime。
若待删除的日志分段的总数等于该日志文件中所有的日志分段的数量,那么说明所有的日志分段都已过期,但是该日志文件中还要有一个日志分段来用于接收消息的写入,即必须要保证有一个活跃的日志分段activeSegment,在此种情况下,会先切分出一个新的日志分段作为activeSegment,然后再执行删除 *** 作。
删除日志分段时,首先会从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取 *** 作。然后将日志分段文件添加上“.deleted”的后缀,当然也包括日志分段对应的索引文件。最后交由一个以“delete-file”命名的延迟任务来删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。注意startup()里还有个线程是 kafka-delete-logs,不是来做这个的。
基于日志大小
日志删除任务会检查当前日志的大小是否超过设定的阈值retentionSize来寻找可删除的日志分段的文件集合deletableSegments。retentionSize可以通过broker端参数log.retention.bytes来配置,默认值为-1,表示无限大。也可以通过topic级别的配置项retention.bytes来设置。注意log.retention.bytes配置的是日志文件的大小(即Log单个partition的大小),而不是单个的日志分段(LogSegment)的大小。
它首先计算日志文件的总大小size和retentionSize的差值diff,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合deletableSegments。查找出deletableSegments之后就执行删除 *** 作。
基于日志起始偏移量
无法配置,所以不用关注。
一般情况下日志文件的起始偏移量logStartOffset(logStartOffset值是整个 Log 对象对外可见消息的最小位移值)等于第一个日志分段的baseOffset,但是这并不是绝对的,logStartOffset的值可以通过DeleteRecordsRequest请求、日志的清理和截断等 *** 作修改。
基于日志起始偏移量的删除策略的判断依据是某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是则可以删除此日志分段。参考上图,假设logStartOffset等于25,日志分段1的起始偏移量为0,日志分段2的起始偏移量为11,日志分段3的起始偏移为23,那么我们通过如下动作收集可删除的日志分段的文件集合deletableSegments:
1.从头开始遍历每个日志分段,日志分段1的下一个日志分段的起始偏移量为11,小于logStartOffset的大小,将日志分段1加入到deletableSegments中;
2.日志分段2的下一个日志偏移量的起始偏移量为23,也小于logStartOffset的大小,将日志分段2页加入到deletableSegments中;
3.日志分段3的下一个日志偏移量在logStartOffset的右侧,故从日志分段3开始的所有日志分段都不会被加入到deletableSegments中。
收集完可删除的日志分段的文件集合之后的删除 *** 作同基于日志大小的保留策略和基于时间的保留策略相同。
在版本0.10.2.2之前,compact是有bug的,有时会遇到,解决办法假如不能升级kafka外,可以先将cleanup.policy设置为delete,删除掉旧的数据(前提是删掉的数据没有影响才行),详见bug单:https://issues.apache.org/jira/browse/KAFKA-5413
compact策略会定期将相同key的消息进行合并,只保留最新的value值。如下图:
compact的功能由LogCleaner来实现的,在LogManager里可以看到启动了 LogCleaner。整体代码调用链:startup -> CleanerThread.start -> doWork -> cleanOrSleep。可以通过配置log.cleaner.threads来配置清理的线程数,默认值1。注意除了cleanup.policy的配置外,还需要log.cleaner.enable配置为true(默认值)才能启动compact。
compact相关的原理:
compact通过引入清理点的概念,将日志文件分为日志尾部(log tail)和日志头部(log head),即对应着就是已清理部分(clean)和未清理部分(dirty)。所以下面两个图是一样的意思。与清理点相关的文件就是清理点检查文件。我们可以通过配置log.dir或者log.dirs参数来设置Kafka日志的存放目录,而对于每一个数据目录下都有一个名为“cleaner-offset-checkpoint”的文件,这个文件就是清理检查点文件,它记录了每个日志的最近一次清理点的位置(即记录每个主题的每个分区中已清理的偏移量)。通过清理检查点文件可以将日志文件(Log)分成两个部分,参考下图,通过检查点cleaner checkpoint来划分出一个已经清理过的clean部分和一个还未清理过的dirty部分。
上图中firstDirtyOffset(与cleaner checkpoint相等)表示dirty部分的起始偏移量,而firstUncleanableOffset为dirty部分的截止偏移量,整个dirty部分的偏移量范围为[firstDirtyOffset, firstUncleanableOffset),注意这里是左闭右开区间。为了避免当前活跃的日志分段activeSegment成为热点文件,activeSegment不会参与Log Compaction的 *** 作。同时Kafka支持通过服务端参数log.cleaner.min.compaction.lag.ms(默认值为0,topic端则是min.compaction.lag.ms)来配置消息在被清理前的最小保留时间,默认情况下firstUncleanableOffset等于activeSegment的baseOffset。在2.3.x版本以后,添加了配置项log.cleaner.max.compaction.lag.ms(默认值是9223372036854775807,topic端是max.compaction.lag.ms),表示消息在被清理前的最大保留时间。它和min.compaction.lag.ms、min.cleanable.ratio一同控制是否压缩,即情况一:满足min.cleanable.ratio,并且日志有满足条件min.compaction.lag.ms的脏(未压缩)记录;情况2:如果日志保留时间达到max.compaction.lag.ms,即有这种达到max的脏(未压缩)记录。
注意引入清理点后的一些关键特点:
(1)每次待清理的日志包括之前清理过的日志尾部和从未清理过的日志头部。
(2)日志压缩前后,日志分段中每条消息的偏移量和写入时总是保持一致。被保留的消息即使复制到新的日志分段,也不会改变消息的偏移量。即消息总是有序的,日志压缩不会对消息重排序。
(3)日志压缩后,消息的物理位置会发生变化。因为生成了新的日志分段文件,日志分段中的每条消息的物理位置会重新按照新文件来组织。
(4)日志压缩后,dirty部分的消息偏移量是逐一递增的,而clean部分的消息偏移量是断续的(因为有些被压缩),如果客户端总能赶上dirty部分,它就能读取到日志的所有消息,反之,就不可能读到全部的消息(有些key被压缩,就只能读取到最后一个)。
每个broker会启动log.cleaner.thread(默认值为1)个日志清理线程负责执行清理任务,这些线程会选择“污浊率”最高的日志文件进行清理。清理线程每次运行时,只会选择清理一个最需要清理的日志。用cleanBytes表示clean部分的日志占用大小,dirtyBytes表示dirty部分的日志占用大小,那么这个日志的污浊率(dirtyRatio)为:
dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes)
为了防止日志不必要的频繁清理 *** 作,kafka还使用了参数log.cleaner.min.cleanable.ratio(默认值为0.5)来限定可进行清理 *** 作的最小污浊率。
选择出了待清理的segment以后,如何筛选key?kafka中的每个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建key与offset的映射关系的哈希表。日志清理需要遍历两次日志文件,第一次遍历把每个key的哈希值和最后出现的offset都保存在SkimpyOffsetMap中,映射模型如下图所示。第二次遍历检查每个消息是否符合保留条件,如果符合就保留下来,否则就会被清理掉。假设一条消息的offset为O1,这条消息的key在SkimpyOffsetMap中所对应的offset为O2,如果O1>=O2即为满足保留条件。
默认情况下SkimpyOffsetMap使用MD5来计算key的哈希值,占用空间大小为16B,根据这个哈希值来从SkimpyOffsetMap中找到对应的槽位,如果发生冲突则用线性探测法处理。为了防止哈希冲突过于频繁,我们也可以通过broker端参数log.cleaner.io.buffer.load.factor(默认值为0.9)来调整负载因子。偏移量占用空间大小为8B,故一个映射项占用大小为24B。每个日志清理线程的SkimpyOffsetMap的内存占用大小为log.cleaner.dedupe.buffer.size / log.cleaner.thread,默认值为 = 128MB/1 = 128MB。所以默认情况下SkimpyOffsetMap可以保存128MB * 0.9 /24B ≈ 5033164个key的记录。假设每条消息的大小为1KB,那么这个SkimpyOffsetMap可以用来映射4.8GB的日志文件,而如果有重复的key,那么这个数值还会增大,整体上来说SkimpyOffsetMap极大的节省了内存空间且非常高效。
kafka提供了一个墓碑消息(tombstone)的概念。如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息。日志清理线程发现墓碑消息时会先进行常规的清理,并保留墓碑消息一段时间。墓碑消息的保留条件是当前墓碑消息所在的日志分段的最近修改时间lastModifiedTime大于deleteHorizonMs,参考前面的图,这个deleteHorizonMs的计算方式为clean部分中最后一个日志分段的最近修改时间减去保留阈值deleteRetionMs(通过broker端参数log.cleaner.delete.retention.ms配置,默认值为86400000,即24小时)的大小,即:
deleteHorizonMs = clean部分中最后一个LogSegment的lastModifiedTime - deleteRetionMs
Log Compact执行过后的日志分段的大小会比原先的日志分段的要小,为了防止出现太多的小文件,Kafka在实际清理过程中并不对单个的日志分段进行单独清理,而是会将日志文件中offset从0至firstUncleanableOffset的所有日志分段进行分组,每个日志分段只属于一组,分组策略为:按照日志分段的顺序遍历,每组中日志分段的占用空间大小之和不超过segmentSize(可以通过broker端参数log.segments.bytes设置,默认值为1GB),且对应的索引文件占用大小之和不超过maxIndexSize(可以通过broker端参数log.index.interval.bytes设置,默认值为10MB)。同一个组的多个日志分段清理过后,只会生成一个新的日志分段。
总体说下日志压缩的具体步骤如下:
(1)消息追加到活动的日志分段,选择活动日志分段之前的所有日志分段参与日志压缩。
(2)为日志头部构建一张消息键到偏移量的映射表,相同键但偏移量低于映射表的消息会被删除。
(3)通过复制消息的方式,将需要保存的消息复制到新的日志分段,每条键都只有一条最新的消息。
(4)复制完成后,新的日志分段会代替所有参与压缩 *** 作的旧日志分段。
(5)更新日志的清理点,为下次的日志压缩做准备。清理点会将日志分成头部和尾部。
如图:
delete,compact同时配置delete,compact则同时有compact和delete的效果,不过delete *** 作也是在LogCleaner线程里做的,在cleanOrSleep的最后代码里会做。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)