kafka 基于注解配置 单个应用实例创建多个 consumer线程,消费多个分区

kafka 基于注解配置 单个应用实例创建多个 consumer线程,消费多个分区,第1张

kafka 基于注解配置 单个应用实例创建多个 consumer线程,消费多个分区 需求说明:

根据kafka的生产消费模型,kafka的topic可以创建成多个分区,一个消费者组内,一个消费者是可以对应多个分区的,但是通常出于性能考虑,最好让一个分区能对应到一个consumer。

一种实现思路是:topic有多少个分区,然后consumer节点就启动多少个,这样多少会有些浪费。如果是微服务应用,这个应用只做消费数据这一件事情,分配1-2个G的内存资源还是有些浪费的。

另外一种思路就是:单个应用实例启动多个线程,然后多个线程分别对应一个分区。

代码配置实现

这里使用 spring-kafka 框架进行配置。

    
        org.springframework.kafka
        spring-kafka
        2.1.7.RELEASE
    

配置类代码

package com.xxx.es.job.config;


import com.xxx.es.job.mq.PointsTransactionDetailKafkaListenerV2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;


@Configurable
@Component
@EnableKafka
public class KafkaConsumerConfig {

    //Properties properties = PropertiesUtils.read("kafka.properties");

    public KafkaConsumerConfig() {
        System.out.println("kafka消费者配置加载...");
    }

    public Map consumerProperties() {
        Map props = new HashMap<>();
        //Kafka服务地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        //消费组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "point3");
        //设置
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //设置间隔时间
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //Key反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //Value反序列化
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //从头开始消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }


    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

    
    @Bean
    public KafkaListenerContainerFactory> kafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);   // 设置这里不会生效。
        System.out.println("factory.setConcurrency(3)");
        factory.getContainerProperties().setPollTimeout(3000);

        return factory;
    }

    
    @Bean
    public PointsTransactionDetailKafkaListenerV2 kafkaConsumerListener() {
        return new PointsTransactionDetailKafkaListenerV2();
    }

    
    @Bean
    public ConcurrentMessageListenerContainer container(
            ConsumerFactory consumerFactory, PointsTransactionDetailKafkaListenerV2 kafkaConsumerListener) {
//        ContainerProperties containerProperties = new ContainerProperties("points_dev_scj_test.points_sharding_00.points_transaction_detail");
        ContainerProperties containerProperties = new ContainerProperties("partition_test");
        containerProperties.setMessageListener(kafkaConsumerListener);
        containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        ConcurrentMessageListenerContainer listenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
        listenerContainer.setConcurrency(3);  //关键点,设置这里才能生效,这里会创建3个消费者线程
        return listenerContainer;
    }


}

需要注意的是:设置上图中的下面这一部分 是不会生效的,也就是不会创建多个消费者线程。

@Bean
public KafkaListenerContainerFactory> kafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(3);  // 这里设置的setConcurrency(3) 不会生效,
    System.out.println("factory.setConcurrency(3)");
    factory.getContainerProperties().setPollTimeout(3000);

    return factory;
}

启动项目,可以在控制台看到打印的日志。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5717931.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存