Kafka实践中遇到的问题思考

Kafka实践中遇到的问题思考,第1张

Kafka实践中遇到的问题思考

在实际工作中,kafka实践的时候难免遇到一些问题。下面记录下本人在实际中遇到的一些error。

如果有问题可以留言指正,不过要给出经过验证的结论。

一、Producer发送消息时报错 :Topic {{topic_name}} not present in metadata after 60000 ms

在利用KafkaTemplate发送数据时代码如下:

ListenableFuture> future = kafkaTemplate.
                    send(TOPIC,String.valueOf(msg), msgValue);
            future.addCallback(new ListenableFutureCallback>() {
                @Override
                public void onSuccess(SendResult sendResultMap) {
                    log.info("send message  success,res is {}", sendResultMap);
                }
                @Override
                public void onFailure(Throwable throwable) {
                    log.error(" send message  fail", throwable);
                }
            });

有的时候会如标题的错误,但是也不是每条都报错。查询资料得知,多可用区的Kafka实例,在某个可用区故障后,Kafka客户端在生产或消费消息时,可能会报Topic {{topic_name}} not present in metadata after 60000 ms的错误。

解决方法:

以下三种措施都可以解决此问题,请根据实际情况任意选择一种。

  • 升级Kafka客户端的版本到2.7或以上版本。
  • 修改Kafka客户端的“request.timeout.ms”大于“127s”。
  • 修改Kafka客户端Linux系统的网络参数“net.ipv4.tcp_syn_retries”为“3”。关于第三点我认为Kafka设置配置的时候可以指定retry次数应该也可以。 二、kafka单单异步发送就可以吗?

    相信很多人使用Kafka的时候就单纯的利用异步发送

    public Future send(ProducerRecord record, Callback callback) {}

    方法,发送消息。这样真的能够不阻塞主线程吗?答案是否定的。

    在某些情况下会阻塞主线程比如broker未正确运行,topic未创建等情况。很多场景我们不需要关注发送的结果,但是一旦出现了阻塞,就会影响其他业务逻辑。

    新版kafka代码中出现阻塞的位置如下:

    public Future send(ProducerRecord record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
    
    
    private Future doSend(ProducerRecord record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // first make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = waitonmetadata(record.topic(), record.partition(), maxBlockTimeMs);  //出现问题的地方
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            ...
        } catch (ApiException e) {
            ...
        }
    }
    
    //这里会导致阻塞
    private ClusterAndWaitTime waitonmetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already and reset expiry
        Cluster cluster = metadata.fetch();
    
        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);
    
        metadata.add(topic);
    
        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // Return cached metadata if we have it, and if the record's partition is either undefined
        // or within the known partition range
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);
    
        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        long elapsed;
        
        //一直获取topic的元数据信息,直到获取成功,若获取时间超过maxWaitMs,则抛出异常
        do {
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }
            metadata.add(topic);
            int version = metadata.requestUpdate();
            sender.wakeup();
            try {
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs));
            }
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - begin;
            if (elapsed >= maxWaitMs) {  //判断执行时间是否大于maxWaitMs
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            metadata.maybeThrowException();
            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));
    
        return new ClusterAndWaitTime(cluster, elapsed);
    }

    能够导致阻塞的代码是方法:waitOnmetadata

    通过KafkaProducer 执行send的过程中需要先获取metadata,而这是一个不断循环的 *** 作,直到获取成功,或者抛出异常。

     

    其实Kafka本意这么实现并没有问题,因为你要发送消息的前提就是能获取到border和topic的信息,问题在于这个send对外暴露的是Future的方法,但是内部实现却是有阻塞的,那么在有些时候没有考虑到这种情况,一旦出现border或者topic异常,将会阻塞系统线程,导致系统响应变慢,直到奔溃。

    当然首先确保Kafka的集群没什么问题,设置都正确的前提下在考虑如下的解决方式,解决方式也很好理解,就是利用线程池发送。这样就算阻塞了,也是阻塞线程池里面的线程,至少主业务逻辑不会出现问题。

    三、发送数据量太多也会阻塞

    在新版的 Kafka Producer 中,设计了一个消息缓冲池,客户端发送的消息都会被存储到缓冲池中,同时 Producer 启动后还会开启一个 Sender 线程,不断地从缓冲池获取消息并将其发送到 Broker,如下图所示:

    因此在新版的 Kafka Producer 中废弃掉异步发送的方法了,仅保留了一个 send 方法,同时返回一个 Futrue 对象,需要同步等待发送结果,就使用 Futrue#get 方法阻塞获取发送结果。而我在项目中直接调用 send 方法,为何还会发送阻塞呢?

    我们在构建 Kafka Producer 时,会有一个自定义缓冲池大小的参数 buffer.memory,默认大小为 32M,因此缓冲池的大小是有限制的,我们不妨想一下,缓冲池内存资源耗尽了会怎么样?

    Kafka 源码的注释是非常详细的,RecordAccumulator 类是 Kafka Producer 缓冲池的核心类,而 RecordAccumulator 类就有那么一段注释:

    The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.

    大概的意思是:

    当缓冲池的内存块用完后,消息追加调用将会被阻塞,直到有空闲的内存块。

    由于性能监控项目每分钟需要发送几百万条消息,只要 Kafka 集群负载很高或者网络稍有波动,Sender 线程从缓冲池捞取消息的速度赶不上客户端发送的速度,就会造成客户端发送被阻塞。

     

    参考文档:

    1、https://segmentfault.com/a/1190000019808896

    2、https://segmentfault.com/a/1190000024441682

    3、https://stackoverflow.com/questions/63714401/org-apache-kafka-common-errors-timeoutexception-topic-not-present-in-metadata-a

    4、生产或消费消息时,报Topic {{topic_name}} not present in metadata after 60000 ms错误_分布式消息服务Kafka版_故障排除_华为云

    欢迎分享,转载请注明来源:内存溢出

    原文地址: https://outofmemory.cn/zaji/5718594.html

  • (0)
    打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
    上一篇 2022-12-18
    下一篇 2022-12-17

    发表评论

    登录后才能评论

    评论列表(0条)

    保存