Kafka删除数据有两种方式
按照时间,超过一段时间后删除过期消息
按照消息大小,消息数量超过一定大小后删除最旧的数据
Kafka删除数据的最小单位:segment
Kafka删除数据主逻辑:kafka源码
def cleanupLogs() { debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds for(log <- allLogs if !log.config.compact) { debug("Garbage collecting '" + log.name + "'") total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds") }
Kafka一段时间(配置文件设置)调用一次 cleanupLogs,删除所有应该删除的日志数据。
cleanupExpiredSegments 负责清理超时的数据
private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds log.deleteOldSegments(startMs - _.lastModified >log.config.retentionMs) }
cleanupSegmentsToMaintainSize 负责清理超过大小的数据
<br>private def cleanupSegmentsToMaintainSize(log: Log): Int = { if(log.config.retentionSize < 0 || log.size <log.config.retentionSize) return 0 var diff = log.size - log.config.retentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size >= 0) { diff -= segment.size true } else { false } } log.deleteOldSegments(shouldDelete) }
server.properties 中添加delete.topic.enable=true执行D:\Developer\kafka_2.10-0.10.0.0\bin\windows>kafka-topics.bat --zookeeper 127.0.0.1:2181 --delete --topic TEST-TOPIC
执行zookeeper-server-stop.bat,后在重启kafka
重新发送消息即可,会自动根据配置的partitions重建
注意:不执行1步骤,就是假删除。
[ocdp@kafka1 bin]$ /usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --zookeeper znode1:2181 --delete --topic test_topicTopic test_topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[ocdp@kafka1 bin]$
[ocdp@kafka1 bin]$ /usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --zookeeper znode1:2181 --list
test_topic - marked for deletion
彻底删除topic ?
https://community.hortonworks.com/articles/29900/zookeeper-using-superdigest-to-gain-full-access-to.html
[ocdp@znode1 bin]$ export ZK_CLASSPATH=/etc/zookeeper/conf/:/usr/hdp/current/zookeeper-server/lib/*:/usr/hdp/current/zookeeper-server/*
[ocdp@znode1 bin]$
[ocdp@znode1 bin]$ java -cp $ZK_CLASSPATH org.apache.zookeeper.server.auth.DigestAuthenticationProvider super:super123
super:super123->super:UdxDQl4f9v5oITwcAsO9bmWgHSI=
sudo vim /usr/hdp/current/zookeeper-server/bin/zkServer.sh 添加 -Dzookeeper.DigestAuthenticationProvider.superDigest=super:UdxDQl4f9v5oITwcAsO9bmWgHSI=
重启zookeeper
/usr/hdp/2.6.0.3-8/zookeeper/bin/zkCli.sh -server demo1:2181
[zk: demo1:2181(CONNECTED) 1] addauth digest super:super123
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)