关于kafka的Rebalance机制,其实就是规定同一个consumer group下所有的consumer如何协调工作的,分配订阅Topic分区的。Rebalance发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。
关于触发Rebalance的条件主要有3个,今天主要测试下第一种情况引起的Rebalance:
- 组内consumer成员数量发生变化。
- 订阅的topic发生变化。
- 订阅的topic的分区数发生变化。
首先我这里创建了一个有3个分区的topic
代码中每隔20秒创建一个KafkaConsumer,Consumer属于同一个group,都订阅topic_1123,总共创建了4个消费者,最后关闭了消费者2
import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ScramMechanism; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.*; public class KafkaConsumerTest { static { //设置log级别 LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); ListloggerList = loggerContext.getLoggerList(); loggerList.forEach(logger -> logger.setLevel(Level.INFO)); loggerContext.getLogger(ConsumerConfig.class).setLevel(Level.ERROR); } private static final Logger log = (Logger) LoggerFactory.getLogger(KafkaConsumerTest.class); private static boolean flag = true; public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.130:9092"); //发送心跳请求频率的参数,这个值设置得越小,Consumer 实例发送心跳请求的频率就越高 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 * 2); //此时间内服务端未收到consumer心跳请求,服务端认为消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认3s。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000 * 6); //获取消息后提交偏移量的最大时间(默认5分钟),超时服务端会认为消费者失效,触发Rebalance,并且提交offset会失败 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000); //是否自动提交offset props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //每次Poll的最大数量。注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。默认500 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); //消息的反序列化方式。 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //属于同一个组的消费实例,会负载消费消息。 props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test"); // kafka安全认证相关配置,kafka没有配置认证授权可忽略 props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName()); props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";"); log.info("=========================================================第一个消费者加入========================================================="); new Thread(() -> { KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("topic_1123")); while (true) { ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord consumerRecord : consumerRecords) { log.info("消费者1:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value()); } // 同步提交offset,会阻塞消费者线程直至位移提交完成 consumer.commitSync(); } }).start(); //睡一会儿 Thread.sleep(20 * 1000); log.info("=========================================================第二个消费者加入========================================================="); //第二个消费者加入 new Thread(() -> { KafkaConsumer consumer2 = new KafkaConsumer<>(props); consumer2.subscribe(Collections.singletonList("topic_1123")); while (flag) { ConsumerRecords consumerRecords = consumer2.poll(Duration.ofSeconds(1)); for (ConsumerRecord consumerRecord : consumerRecords) { log.info("消费者2:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value()); } // 同步提交offset,会阻塞消费者线程直至位移提交完成 consumer2.commitSync(); } consumer2.close(); }).start(); //睡一会儿 Thread.sleep(20 * 1000); log.info("=========================================================第三个消费者加入========================================================="); //第三个消费者加入 new Thread(() -> { KafkaConsumer consumer3 = new KafkaConsumer<>(props); consumer3.subscribe(Collections.singletonList("topic_1123")); while (true) { ConsumerRecords consumerRecords = consumer3.poll(Duration.ofSeconds(1)); for (ConsumerRecord consumerRecord : consumerRecords) { log.info("消费者3:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value()); } // 同步提交offset,会阻塞消费者线程直至位移提交完成 consumer3.commitSync(); } }).start(); //睡一会儿 Thread.sleep(20 * 1000); log.info("=========================================================第四个消费者加入========================================================="); //第四个消费者加入 new Thread(() -> { KafkaConsumer consumer4 = new KafkaConsumer<>(props); consumer4.subscribe(Collections.singletonList("topic_1123")); while (true) { ConsumerRecords consumerRecords = consumer4.poll(Duration.ofSeconds(1)); for (ConsumerRecord consumerRecord : consumerRecords) { log.info("消费者4:offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value()); } // 同步提交offset,会阻塞消费者线程直至位移提交完成 consumer4.commitSync(); } }).start(); //睡一会儿 Thread.sleep(20 * 1000); log.info("=========================================================关闭一个消费者========================================================="); //关闭第二个消费者 flag = false; } }
结果分析:
1、第一个consumer创建时,先是申请加入组joining group,然后分配消费方案——3个分区都是consumer1负责消费。
2、在第二个consumer创建时,consumer2会申请加入组joining group,因为组内已经有consumer1存在,所以会触发rebalance,撤销consumer1之前分配的分区,consumer1也会重新加入组,参与新一轮的分配策略。最后,consumer1消费0和1分区,consumer2消费2分区。
3、在第三个consumer创建时,可以看到和consumer2加入组的逻辑基本一致,consumer3加入joining group,触发rebalance,撤销consumer1分配的分区,consumer1加入组,撤销consumer2分配的分区,consumer2加入组,Coordinator重新分配消费策略。最后,consumer1消费0分区,consumer2消费1分区,consumer3消费2分区。
4、在第四个consumer创建时,仍然会申请joining group,触发rebalance,其他已经在组内的consumer会撤销之前的订阅,重新加入组,等待Coordinator重新分配消费策略。但是,最后可以发现consumer-group_test-4并未分配到分区,即不会消息到任何消息。
5、最后,我关闭了consumer2,仍然触发了rebalance,consumer1、3、4重新加入组参与分区分配,consumer1消费0分区,consumer3消费1分区,consumer4消费2分区。
结论:
其实整个rebalance过程Coordinator进行控制的,Consumer正常的添加和销毁导致rebalance都是正常的,但是在某些情况下,Coordinator会错误的任务Consumer已经“死亡”,从而将Consumer提出Group,触发rebalance。
在Consumer端有几个参数是影响Coordinator判断的关键:
- session.timeout.ms:当Consumer完成分区分配后,会定期地向Coordinator发送心跳请求,表明它还活着,如果Coordinator在超过session.timeout.ms时间还没有收到Consumer发送的心跳请求,那么会认为此Consumer已经“死亡”,从而将Consumer踢出Group。此值默认值为10s。
- heartbeat.interval.ms:控制发送心跳请求频率的参数,这个值设置得越小,Consumer实例发送心跳请求的频率就越高,默认值3秒。此值要小于session.timeout.ms,一般设置为session.timeout.ms的1/3。这个值设置得越小,Consumer实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance
- max.poll.interval.ms :它限定了Consumer端应用程序两次调用poll方法的最大时间间隔。它的默认值是5分钟,表示你的Consumer程序如果在5分钟之内无法消费完poll方法返回的消息,那么Consumer会主动发起 “离开组” 的请求,Coordinator 也会开启新一轮Rebalance。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)