如何删除kafka积压数据

如何删除kafka积压数据,第1张

Kafka删除数据有两种方式

按照时间,超过一段时间后删除过期消息

按照消息大小,消息数量超过一定大小后删除最旧的数据

Kafka删除数据的最小单位:segment

Kafka删除数据主逻辑:kafka源码

   def cleanupLogs() {    debug("Beginning log cleanup...")    var total = 0    val startMs = time.milliseconds    for(log <- allLogs if !log.config.compact) {      debug("Garbage collecting '" + log.name + "'")      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)    }    debug("Log cleanup completed. " + total + " files deleted in " +                  (time.milliseconds - startMs) / 1000 + " seconds")  }   

Kafka一段时间(配置文件设置)调用一次 cleanupLogs,删除所有应该删除的日志数据。

cleanupExpiredSegments 负责清理超时的数据

   private def cleanupExpiredSegments(log: Log): Int = {    val startMs = time.milliseconds    log.deleteOldSegments(startMs - _.lastModified >log.config.retentionMs)  }   

cleanupSegmentsToMaintainSize 负责清理超过大小的数据

   <br>private def cleanupSegmentsToMaintainSize(log: Log): Int = {    if(log.config.retentionSize < 0 || log.size <log.config.retentionSize)      return 0    var diff = log.size - log.config.retentionSize    def shouldDelete(segment: LogSegment) = {      if(diff - segment.size >= 0) {        diff -= segment.size        true      } else {        false      }    }    log.deleteOldSegments(shouldDelete)  }   

server.properties 中添加delete.topic.enable=true

执行D:\Developer\kafka_2.10-0.10.0.0\bin\windows>kafka-topics.bat --zookeeper 127.0.0.1:2181 --delete --topic TEST-TOPIC

执行zookeeper-server-stop.bat,后在重启kafka

重新发送消息即可,会自动根据配置的partitions重建

注意:不执行1步骤,就是假删除。

[ocdp@kafka1 bin]$ /usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --zookeeper znode1:2181 --delete --topic    test_topic

Topic test_topic is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

[ocdp@kafka1 bin]$

[ocdp@kafka1 bin]$ /usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --zookeeper znode1:2181 --list

test_topic - marked for deletion

彻底删除topic ?

https://community.hortonworks.com/articles/29900/zookeeper-using-superdigest-to-gain-full-access-to.html

[ocdp@znode1 bin]$ export ZK_CLASSPATH=/etc/zookeeper/conf/:/usr/hdp/current/zookeeper-server/lib/*:/usr/hdp/current/zookeeper-server/*

[ocdp@znode1 bin]$

[ocdp@znode1 bin]$ java -cp $ZK_CLASSPATH org.apache.zookeeper.server.auth.DigestAuthenticationProvider super:super123

super:super123->super:UdxDQl4f9v5oITwcAsO9bmWgHSI=

sudo  vim /usr/hdp/current/zookeeper-server/bin/zkServer.sh  添加 -Dzookeeper.DigestAuthenticationProvider.superDigest=super:UdxDQl4f9v5oITwcAsO9bmWgHSI=

重启zookeeper

/usr/hdp/2.6.0.3-8/zookeeper/bin/zkCli.sh -server demo1:2181

[zk: demo1:2181(CONNECTED) 1] addauth digest super:super123


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/6803453.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-28
下一篇 2023-03-28

发表评论

登录后才能评论

评论列表(0条)

保存