Kafka Rebalance测试

Kafka Rebalance测试,第1张

Kafka Rebalance测试

Kafka Rebalance测试

关于kafka的Rebalance机制,其实就是规定同一个consumer group下所有的consumer如何协调工作的,分配订阅Topic分区的。Rebalance发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费实例都会停止工作,等待 Rebalance 过程完成。

关于触发Rebalance的条件主要有3个,今天主要测试下第一种情况引起的Rebalance:

  1. 组内consumer成员数量发生变化。
  2. 订阅的topic发生变化。
  3. 订阅的topic的分区数发生变化。

首先我这里创建了一个有3个分区的topic

代码中每隔20秒创建一个KafkaConsumer,Consumer属于同一个group,都订阅topic_1123,总共创建了4个消费者,最后关闭了消费者2

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;

public class KafkaConsumerTest {

    static {
        //设置log级别
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
        List loggerList = loggerContext.getLoggerList();
        loggerList.forEach(logger -> logger.setLevel(Level.INFO));
        loggerContext.getLogger(ConsumerConfig.class).setLevel(Level.ERROR);
    }

    private static final Logger log = (Logger) LoggerFactory.getLogger(KafkaConsumerTest.class);

    private static boolean flag = true;

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.130:9092");
        //发送心跳请求频率的参数,这个值设置得越小,Consumer 实例发送心跳请求的频率就越高
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 * 2);
        //此时间内服务端未收到consumer心跳请求,服务端认为消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认3s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000 * 6);
        //获取消息后提交偏移量的最大时间(默认5分钟),超时服务端会认为消费者失效,触发Rebalance,并且提交offset会失败
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);
        //是否自动提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //每次Poll的最大数量。注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。默认500
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        //消息的反序列化方式。
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //属于同一个组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");

        // kafka安全认证相关配置,kafka没有配置认证授权可忽略
        props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
        props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName());
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";");

        log.info("=========================================================第一个消费者加入=========================================================");
        new Thread(() -> {
            KafkaConsumer consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("topic_1123"));
            while (true) {
                ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));

                for (ConsumerRecord consumerRecord : consumerRecords) {
                    log.info("消费者1:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value());
                }
                // 同步提交offset,会阻塞消费者线程直至位移提交完成
                consumer.commitSync();
            }
        }).start();


        //睡一会儿
        Thread.sleep(20 * 1000);
       log.info("=========================================================第二个消费者加入=========================================================");
        //第二个消费者加入
        new Thread(() -> {
            KafkaConsumer consumer2 = new KafkaConsumer<>(props);
            consumer2.subscribe(Collections.singletonList("topic_1123"));
            while (flag) {
                ConsumerRecords consumerRecords = consumer2.poll(Duration.ofSeconds(1));

                for (ConsumerRecord consumerRecord : consumerRecords) {
                    log.info("消费者2:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value());
                }
                // 同步提交offset,会阻塞消费者线程直至位移提交完成
                consumer2.commitSync();
            }
            consumer2.close();
        }).start();

        //睡一会儿
        Thread.sleep(20 * 1000);

        log.info("=========================================================第三个消费者加入=========================================================");
        //第三个消费者加入

        new Thread(() -> {
            KafkaConsumer consumer3 = new KafkaConsumer<>(props);
            consumer3.subscribe(Collections.singletonList("topic_1123"));
            while (true) {
                ConsumerRecords consumerRecords = consumer3.poll(Duration.ofSeconds(1));

                for (ConsumerRecord consumerRecord : consumerRecords) {
                    log.info("消费者3:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value());
                }
                // 同步提交offset,会阻塞消费者线程直至位移提交完成
                consumer3.commitSync();
            }
        }).start();

        //睡一会儿
        Thread.sleep(20 * 1000);

        log.info("=========================================================第四个消费者加入=========================================================");
        //第四个消费者加入

        new Thread(() -> {
            KafkaConsumer consumer4 = new KafkaConsumer<>(props);
            consumer4.subscribe(Collections.singletonList("topic_1123"));
            while (true) {
                ConsumerRecords consumerRecords = consumer4.poll(Duration.ofSeconds(1));

                for (ConsumerRecord consumerRecord : consumerRecords) {
                    log.info("消费者4:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value());
                }
                // 同步提交offset,会阻塞消费者线程直至位移提交完成
                consumer4.commitSync();
            }
        }).start();

        //睡一会儿
        Thread.sleep(20 * 1000);

        log.info("=========================================================关闭一个消费者=========================================================");
        //关闭第二个消费者
        flag = false;
    }
}

结果分析:
1、第一个consumer创建时,先是申请加入组joining group,然后分配消费方案——3个分区都是consumer1负责消费。

2、在第二个consumer创建时,consumer2会申请加入组joining group,因为组内已经有consumer1存在,所以会触发rebalance,撤销consumer1之前分配的分区,consumer1也会重新加入组,参与新一轮的分配策略。最后,consumer1消费0和1分区,consumer2消费2分区。


3、在第三个consumer创建时,可以看到和consumer2加入组的逻辑基本一致,consumer3加入joining group,触发rebalance,撤销consumer1分配的分区,consumer1加入组,撤销consumer2分配的分区,consumer2加入组,Coordinator重新分配消费策略。最后,consumer1消费0分区,consumer2消费1分区,consumer3消费2分区。

4、在第四个consumer创建时,仍然会申请joining group,触发rebalance,其他已经在组内的consumer会撤销之前的订阅,重新加入组,等待Coordinator重新分配消费策略。但是,最后可以发现consumer-group_test-4并未分配到分区,即不会消息到任何消息。

5、最后,我关闭了consumer2,仍然触发了rebalance,consumer1、3、4重新加入组参与分区分配,consumer1消费0分区,consumer3消费1分区,consumer4消费2分区。

结论:
其实整个rebalance过程Coordinator进行控制的,Consumer正常的添加和销毁导致rebalance都是正常的,但是在某些情况下,Coordinator会错误的任务Consumer已经“死亡”,从而将Consumer提出Group,触发rebalance。
在Consumer端有几个参数是影响Coordinator判断的关键:

  • session.timeout.ms:当Consumer完成分区分配后,会定期地向Coordinator发送心跳请求,表明它还活着,如果Coordinator在超过session.timeout.ms时间还没有收到Consumer发送的心跳请求,那么会认为此Consumer已经“死亡”,从而将Consumer踢出Group。此值默认值为10s。
  • heartbeat.interval.ms:控制发送心跳请求频率的参数,这个值设置得越小,Consumer实例发送心跳请求的频率就越高,默认值3秒。此值要小于session.timeout.ms,一般设置为session.timeout.ms的1/3。这个值设置得越小,Consumer实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance
  • max.poll.interval.ms :它限定了Consumer端应用程序两次调用poll方法的最大时间间隔。它的默认值是5分钟,表示你的Consumer程序如果在5分钟之内无法消费完poll方法返回的消息,那么Consumer会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮Rebalance。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5606719.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存