以此记录一下在使用 Kafka 的过程中遇到的一些古怪问题
因为是初学,所以遇到的问题还不算多,之后会慢慢更新(大概 👀)
生产者相关
Topic 不存在记录一下有关 Producer 的一些问题
问题描述:org.apache.kafka.common.errors.TimeoutException: Topic xxx not present in metadata after xxx ms.
很显然,意思是 Topic 不存在于 metadata 导致了 Producer 发送超时
这个问题倒是很常见,但是它怪就怪在:导致这个问题出现的原因千奇百怪。比如:
- Kafka 中没有这个 Topic,并且设置了不自动创建 Topic
- Topic 存在,但是要发送的消息所指定的 Partition 信息有错误
- 甚至于程序中某某依赖包发生了冲突也可能导致了这个错误(StackOverflow 上看到了一个贴子,但是还没验证过)
总之原因就是很奇葩,以上所说的这些问题根源都可以通过在网上搜索一些博客、贴子来解决
问题出现的原因在这里要说的是另一个导致了这个问题的原因:未开放 Kafka 的相关端口!
当时发现是这个原因时,我也是哭笑不得,因为这个原因和这个问题的描述(Topic xxx not presen…)没有半毛钱关系,最开始遇到这个问题的时候也没有往这方面想,导致卡了很久
这里稍微吐槽一下 Kafka 的 Java 客户端,异常描述太笼统了,导致 debug 的时候没有什么头绪
既然找到了是未开放端口引起的问题,那么解决起来也很简单:开放对应的端口呗。两种方案:
- 暂时性开放:
# Kafka 的默认端口是 9092,可以选择暂时性开放
firewall-cmd --zone=public --add-port=9092/tcp
- 永久性开放
# Kafka 的默认端口是 9092,可以添加 --permanent 选项来永久性开放
firewall-cmd --zone=public --add-port=9092/tcp --permanent
# 重启防火墙
systemctl restart firewalld.service
# 重载配置文件
firewall-cmd --reload
开放端口之后,问题就解决了
Batch 中的消息过期问题描述:Expiring 1 record(s) for xxx-0:120000 ms has passed since batch creation
问题描述中的 xxx
指对应的 Topic。很明显,是说自批次创建以来,某某主题中的 1 条记录过期了
要理解这个问题,得先了解 Kafka 的生产者发送消息的原理:
“为了提高效率,消息被分批次写入 Kafka。批次就是一组消息,这些消息属于同一个主题和分区。”
———— 来自《Kafka权威指南》
所以,Producer 调用了 send
方法后,消息并不会被立即发送,而是会待在某个批次中之后再一起发送。批次中的消息过期了,说明该批次并没有被发送给 Kafka,消息发送出现了问题
针对这个问题,我也在网上了搜了一些博看来看,意识到有可能是 Kafka 的一些配置出现了问题。之前在查看修改 Kafka 的配置文件(server.properties)的时候,留了一个心眼,有意的记住了 listeners
和 advertised.listeners
这两项配置。然后我设置修改了一下这两项配置,果然问题就解决了
一开始我是没有对 listeners
和 advertised.listeners
这两项配置做修改的,维持了默认的样子,如下:
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://your.host.name:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
可以看到这两项配置默认是不启用的,被注释掉了
后来我开放了这两项配置:
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://xx.xx.xx.xx:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://xx.xx.xx.xx:9092
上面的 xx.xx.xx.xx
都是指你的 Kafka 服务器的 IP 地址,也就是说 listeners
和 advertised.listeners
的值是设置成一样的
然后再用 Producer 发送消息,就 ok 了
最后
文章有什么错误之处欢迎(麻烦)各位指出 🙆♂️
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)