springboot整合两套kafka环境实践

springboot整合两套kafka环境实践,第1张

springboot整合两套kafka环境实践 项目背景

今天突然接到一个需求-现有AB两套完全隔离的系统,分别对应不同的两套kafka集群cluster1和cluster2。需要实现用一个子模块实现从cluster1消费topic同时把消费到的数据produce到cluster2。抠抠索索在百度上看了几篇博客然后开始有了自己的沉淀。目前公司的项目有公用common-kafka模块,可以集成一套kafka环境。讨论一番过后,决定公司默认的kafka配置用于做消费,自己重新写一个templet做produce。接下来直接上干货。

pom引入

  org.springframework.kafka
     spring-kafka
     2.1.4.RELEASE
 
使用默认配置消费kafka
 @KafkaListeners(value = {@KafkaListener(groupId = "xxxx", topicPartitions = {@TopicPartition(topic = "${kafka.topic.realtime:xxx}", partitions = {"0"})})})
 public void listen1(List> records) {
      toQueue(records);
  }

以上代码不做讲解,相信了解的童鞋都知道这样 *** 作。接下来开始进入核心阶段

@Slf4j
@Configuration
@Data
public class K1KafkaConfiguration {
    @Value("${kafka1.bootstrap-servers}")
    private String consumerBootstrapServers;
    @Value("${kafka1.consumer.group-id}")
    private String groupId;
    @Value("${kafka1.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${kafka1.consumer.enable-auto-commit}")
    private Boolean enableAutoCommit;
    @Value("${kafka1.bootstrap-servers}")
    private String producerBootstrapServers;
    @Bean
    @Primary
    KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public Map consumerConfigs() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public Map producerConfigs1() {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory producerFactory1() {
        return new DefaultKafkaProducerFactory<>(producerConfigs1());
    }
    @Bean(name = "kafkaTemplateQT")
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory1());
    }
}

tips:

  1. 此处请务必加@Bean(name = “kafkaTemplateQT”) 因为引用的时候如果使用@Autowired是使用的A环境的配置
  2. producerFactory1需要修改该处注入的名称,开始我以为这里有类的概念,不会出问题,怎料一直都是引用A环境的,浪费了好会儿才找到是这的问题。因为此处是按照bean去拿的。producerFactory A环境先初始化,因此此处需要修改名称。
use
@Resource(name = "kafkaTemplateQT")  //此处也可以用@Qualifier
private KafkaTemplate kafkaTemplate;

kafkaTemplate.send(pushTopic, record.value());

今日感触,未实现的时候感觉要实现有点难,当真正实现了就会觉得也不是那么困难啦。知识在于分享~如有铁子发现有bug,还望多多指正!

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5117782.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存