kafka消费者接收分区测试

kafka消费者接收分区测试,第1张

kafka消费者接收分区测试 【README】
  • 本文演示了当有新消费者加入组后,其他消费者接收分区情况;
  • 本文还模拟了 broker 宕机的情况;
  • 本文使用的是最新的 kafka3.0.0 ;
  • 本文测试案例,来源于 消费者接收分区的5种模型,建议先看模型,refer2   https://blog.csdn.net/PacosonSWJTU/article/details/121853461https://blog.csdn.net/PacosonSWJTU/article/details/121853461

【1】kafka测试环境准备

1)kafka集群 

  • 3个broker,分别为 centos201, centos202, centos203 ,id分别为 1,2,3 ;
  • topic, 3个分区,2个副本;

 2)生产者代码;

public class MyProducer {
    public static void main(String[] args) {
        
        Properties props = new Properties();
        
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
        
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);
        
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 超时时间
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);

        
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);
        
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        
//        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName()));
        
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3600 * 1000);

        
        KafkaProducer producer = new KafkaProducer<>(props);
        
        int order = 1;
        for (int i = 0; i < 100000; i++) {
            for (int j = 0; j < 3; j++) {
                Future future = producer.send(new ProducerRecord("hello10",j, "", String.format("[%s] ", order++) + " > " + DataFactory.INSTANCE.genChar(5)));
                try {
                    System.out.println("[生产者] 分区【" + future.get().partition() + "】-offset【" + future.get().offset() + "】");
                } catch (Exception e) {
                }
            }
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        producer.close();
        System.out.println("kafka生产者写入数据完成");
    }
}

生产者,会向每个分区发送1条消息,发送完成后,睡眠500ms; 共计循环 10w次; 共计5w秒;计划耗时 10+小时;(这里其他同学可以自行设置为其他值)

3)4个消费者;编号为1,2,3,4

public class MyConsumer1 {
	public static void main(String[] args) {
		
		Properties props = new Properties();
		
		
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092,centos202:9092,centos203:9092");
		
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
		
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello10G1"); // group.id
		 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 默认值是 lastest
		
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

		
		KafkaConsumer consumer = new KafkaConsumer<>(props); 
		
		consumer.subscribe(Arrays.asList("hello10"));

		
//		for (TopicPartition partition : consumer.assignment()) {
//			consumer.seek(partition, 1);
//		}

		// 消费消息
		try {
			// 死循环
			while(!Thread.interrupted()) {
				try {
					System.out.println(DateUtils.getNowTimestamp() + " 消费者1-等待消费消息");
					TimeUnit.MILLISECONDS.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				// 消费消息-获取数据
				ConsumerRecords consumerRds  = consumer.poll(100);
				// 遍历 ConsumerRecords
				for(ConsumerRecord rd : consumerRds) {
					System.out.println("消费者1-分区【" + rd.partition() + "】offset【" + rd.offset() + "】 -> " + DateUtils.getNowTimestamp() + rd.key() + "--" + rd.value());
				}
				consumer.commitSync(); // 同步提交
			}
		} finally {
			// 记得关闭消费者
			consumer.close();
		}
	}
}

这样的消费者有4个,分别编号为 消费者 1,2,3,4 ;我的意思是4个不同的消费者类,以便打印日志标识;

我的消费者消费的是 lastest 最新产生的消费,这里可以自行设置为其他值,如 earlies;

4)添加日志配置,不打印 debug日志(因为kafka消费者debug日志很多)

新建 logback.xml ,设置仅打印info以上级别日志;



    

5)为了直观展示消费详情,我会用命令行启动4个不同消费者,而用idea启动生产者;但编译都通过maven;


【2】kafka测试 【2.1】测试1:当有新消费者加入后,整个消费者组成员接收分区情况; 

写在前面: 文末会po出命令行启动消费者的命令及参数;

消费者接收分区消息模型,参见

step0)启动 生产者,发送消息到kafka;

step2)命令行启动消费者1,消息消费日志如下:

消费者1接收了3个分区消息; 

 step2)命令行启动消费者2,群组消费日志如下:

消费者1接收了个分区2消息;

消费者2接收了分区0和分区2的消息;

  step3)命令行继续启动消费者3,群组消费日志如下:

消费者1接收了个分区2消息;
消费者2接收了分区0的消息;
消费者3接收了分区1的消息;

 step4)命令行继续启动消费者4, 日志如下:

消费者1接收了个分区2消息;
消费者2接收了分区0的消息;
消费者3接收了分区1的消息;

消费者4空闲;

 


【2】 模拟kafka broker 宕机

写在前面,模拟宕机前先查看 topic 详情

(图1)

step1) 停止掉 201 broker的服务

情况1:topic的分区没有受影响,但leader 副本选举为3,比较本图和图1,看差别; 

 情况2:所有消费者全部阻塞,直到超时全部抛出异常;

等待 kafka集群的控制器,首领副本选择完成后,又可以接收消费者请求; 

  • 补充1: 这里有一小段时间延时,即当有broker宕机后,需要重新选举控制器,首领副本等;而且会发生分区再均衡;

 step2)重启 201;消费日志:如下:

消费者1接收了个分区1消息;
消费者2接收了分区2的消息;
消费者3空闲;
消费者4接收了分区0的消息;

 之所以 消费者3空闲,消费者4忙碌,是因为 broker 动态上下线,导致了分区再均衡使得分区所有权从消费者A转到消费者B(201宕机前,是消费者3忙碌,消费者4空闲);

【小结】

1,要保证kafka消息可靠性,需要 生产者,broker,消费者3方的全力配合;

2,本文这里仅记录了一部分 kafka集群异常的情况;


【附录】

命令行启动消费者命令及参数;仅供参考;因为路径肯定不一样;

其实,这是拷贝idea的执行日志里的命令,如下:

 

java -classpath D:Javajdk1.8.0_172jrelibcharsets.jar;D:Javajdk1.8.0_172jrelibdeploy.jar;D:Javajdk1.8.0_172jrelibextaccess-bridge-64.jar;D:Javajdk1.8.0_172jrelibextcldrdata.jar;D:Javajdk1.8.0_172jrelibextdnsns.jar;D:Javajdk1.8.0_172jrelibextjaccess.jar;D:Javajdk1.8.0_172jrelibextjfxrt.jar;D:Javajdk1.8.0_172jrelibextlocaledata.jar;D:Javajdk1.8.0_172jrelibextnashorn.jar;D:Javajdk1.8.0_172jrelibextsunec.jar;D:Javajdk1.8.0_172jrelibextsunjce_provider.jar;D:Javajdk1.8.0_172jrelibextsunmscapi.jar;D:Javajdk1.8.0_172jrelibextsunpkcs11.jar;D:Javajdk1.8.0_172jrelibextzipfs.jar;D:Javajdk1.8.0_172jrelibjavaws.jar;D:Javajdk1.8.0_172jrelibjce.jar;D:Javajdk1.8.0_172jrelibjfr.jar;D:Javajdk1.8.0_172jrelibjfxswt.jar;D:Javajdk1.8.0_172jrelibjsse.jar;D:Javajdk1.8.0_172jrelibmanagement-agent.jar;D:Javajdk1.8.0_172jrelibplugin.jar;D:Javajdk1.8.0_172jrelibresources.jar;D:Javajdk1.8.0_172jrelibrt.jar;D:workbench_ideastudy4vwvwstudy22targetclasses;D:software_clustermvn_repo.m2repositoryorgspringframeworkbootspring-boot-starter-web2.5.4spring-boot-starter-web-2.5.4.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkbootspring-boot-starter2.5.4spring-boot-starter-2.5.4.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkbootspring-boot2.5.4spring-boot-2.5.4.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkbootspring-boot-autoconfigure2.5.4spring-boot-autoconfigure-2.5.4.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkbootspring-boot-starter-logging2.5.4spring-boot-starter-logging-2.5.4.jar;D:software_clustermvn_repo.m2repositorychqoslogbacklogback-classic1.2.5logback-classic-1.2.5.jar;D:software_clustermvn_repo.m2repositorychqoslogbacklogback-core1.2.5logback-core-1.2.5.jar;D:software_clustermvn_repo.m2repositoryorgapachelogginglog4jlog4j-to-slf4j2.14.1log4j-to-slf4j-2.14.1.jar;D:software_clustermvn_repo.m2repositoryorgapachelogginglog4jlog4j-api2.14.1log4j-api-2.14.1.jar;D:software_clustermvn_repo.m2repositoryorgslf4jjul-to-slf4j1.7.32jul-to-slf4j-1.7.32.jar;D:software_clustermvn_repo.m2repositoryjakartaannotationjakarta.annotation-api1.3.5jakarta.annotation-api-1.3.5.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkspring-core5.3.9spring-core-5.3.9.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkspring-jcl5.3.9spring-jcl-5.3.9.jar;D:software_clustermvn_repo.m2repositoryorgyamlsnakeyaml1.28snakeyaml-1.28.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkbootspring-boot-starter-json2.5.4spring-boot-starter-json-2.5.4.jar;D:software_clustermvn_repo.m2repositorycomfasterxmljacksoncorejackson-databind2.12.4jackson-databind-2.12.4.jar;D:software_clustermvn_repo.m2repositorycomfasterxmljacksoncorejackson-annotations2.12.4jackson-annotations-2.12.4.jar;D:software_clustermvn_repo.m2repositorycomfasterxmljacksoncorejackson-core2.12.4jackson-core-2.12.4.jar;D:software_clustermvn_repo.m2repositorycomfasterxmljacksondatatypejackson-datatype-jdk82.12.4jackson-datatype-jdk8-2.12.4.jar;D:software_clustermvn_repo.m2repositorycomfasterxmljacksondatatypejackson-datatype-jsr3102.12.4jackson-datatype-jsr310-2.12.4.jar;D:software_clustermvn_repo.m2repositorycomfasterxmljacksonmodulejackson-module-parameter-names2.12.4jackson-module-parameter-names-2.12.4.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkbootspring-boot-starter-tomcat2.5.4spring-boot-starter-tomcat-2.5.4.jar;D:software_clustermvn_repo.m2repositoryorgapachetomcatembedtomcat-embed-core9.0.52tomcat-embed-core-9.0.52.jar;D:software_clustermvn_repo.m2repositoryorgapachetomcatembedtomcat-embed-el9.0.52tomcat-embed-el-9.0.52.jar;D:software_clustermvn_repo.m2repositoryorgapachetomcatembedtomcat-embed-websocket9.0.52tomcat-embed-websocket-9.0.52.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkspring-web5.3.9spring-web-5.3.9.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkspring-beans5.3.9spring-beans-5.3.9.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkspring-webmvc5.3.9spring-webmvc-5.3.9.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkspring-aop5.3.9spring-aop-5.3.9.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkspring-context5.3.9spring-context-5.3.9.jar;D:software_clustermvn_repo.m2repositoryorgspringframeworkspring-expression5.3.9spring-expression-5.3.9.jar;D:software_clustermvn_repo.m2repositoryorgapachekafkakafka-clients3.0.0kafka-clients-3.0.0.jar;D:software_clustermvn_repo.m2repositorycomgithublubenzstd-jni1.5.0-2zstd-jni-1.5.0-2.jar;D:software_clustermvn_repo.m2repositoryorglz4lz4-java1.7.1lz4-java-1.7.1.jar;D:software_clustermvn_repo.m2repositoryorgxerialsnappysnappy-java1.1.8.1snappy-java-1.1.8.1.jar;D:software_clustermvn_repo.m2repositoryorgslf4jslf4j-api1.7.32slf4j-api-1.7.32.jar;D:software_clustermvn_repo.m2repositoryorgslf4jslf4j-simple1.7.25slf4j-simple-1.7.25.jar kafka.consumer.MyConsumer2

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5665586.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存