根据kafka的生产消费模型,kafka的topic可以创建成多个分区,一个消费者组内,一个消费者是可以对应多个分区的,但是通常出于性能考虑,最好让一个分区能对应到一个consumer。
一种实现思路是:topic有多少个分区,然后consumer节点就启动多少个,这样多少会有些浪费。如果是微服务应用,这个应用只做消费数据这一件事情,分配1-2个G的内存资源还是有些浪费的。
另外一种思路就是:单个应用实例启动多个线程,然后多个线程分别对应一个分区。
代码配置实现这里使用 spring-kafka 框架进行配置。
org.springframework.kafka spring-kafka2.1.7.RELEASE
配置类代码
package com.xxx.es.job.config; import com.xxx.es.job.mq.PointsTransactionDetailKafkaListenerV2; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Configurable @Component @EnableKafka public class KafkaConsumerConfig { //Properties properties = PropertiesUtils.read("kafka.properties"); public KafkaConsumerConfig() { System.out.println("kafka消费者配置加载..."); } public MapconsumerProperties() { Map props = new HashMap<>(); //Kafka服务地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); //消费组 props.put(ConsumerConfig.GROUP_ID_CONFIG, "point3"); //设置 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //设置间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //Key反序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //Value反序列化 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //从头开始消费 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerProperties()); } @Bean public KafkaListenerContainerFactory > kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(3); // 设置这里不会生效。 System.out.println("factory.setConcurrency(3)"); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public PointsTransactionDetailKafkaListenerV2 kafkaConsumerListener() { return new PointsTransactionDetailKafkaListenerV2(); } @Bean public ConcurrentMessageListenerContainer container( ConsumerFactory consumerFactory, PointsTransactionDetailKafkaListenerV2 kafkaConsumerListener) { // ContainerProperties containerProperties = new ContainerProperties("points_dev_scj_test.points_sharding_00.points_transaction_detail"); ContainerProperties containerProperties = new ContainerProperties("partition_test"); containerProperties.setMessageListener(kafkaConsumerListener); containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); ConcurrentMessageListenerContainer listenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties); listenerContainer.setConcurrency(3); //关键点,设置这里才能生效,这里会创建3个消费者线程 return listenerContainer; } }
需要注意的是:设置上图中的下面这一部分 是不会生效的,也就是不会创建多个消费者线程。
@Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(3); // 这里设置的setConcurrency(3) 不会生效, System.out.println("factory.setConcurrency(3)"); factory.getContainerProperties().setPollTimeout(3000); return factory; }
启动项目,可以在控制台看到打印的日志。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)