今天突然接到一个需求-现有AB两套完全隔离的系统,分别对应不同的两套kafka集群cluster1和cluster2。需要实现用一个子模块实现从cluster1消费topic同时把消费到的数据produce到cluster2。抠抠索索在百度上看了几篇博客然后开始有了自己的沉淀。目前公司的项目有公用common-kafka模块,可以集成一套kafka环境。讨论一番过后,决定公司默认的kafka配置用于做消费,自己重新写一个templet做produce。接下来直接上干货。
pom引入使用默认配置消费kafkaorg.springframework.kafka spring-kafka2.1.4.RELEASE
@KafkaListeners(value = {@KafkaListener(groupId = "xxxx", topicPartitions = {@TopicPartition(topic = "${kafka.topic.realtime:xxx}", partitions = {"0"})})}) public void listen1(List> records) { toQueue(records); }
以上代码不做讲解,相信了解的童鞋都知道这样 *** 作。接下来开始进入核心阶段
@Slf4j @Configuration @Data public class K1KafkaConfiguration { @Value("${kafka1.bootstrap-servers}") private String consumerBootstrapServers; @Value("${kafka1.consumer.group-id}") private String groupId; @Value("${kafka1.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${kafka1.consumer.enable-auto-commit}") private Boolean enableAutoCommit; @Value("${kafka1.bootstrap-servers}") private String producerBootstrapServers; @Bean @Primary KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public Map producerConfigs1() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory producerFactory1() { return new DefaultKafkaProducerFactory<>(producerConfigs1()); } @Bean(name = "kafkaTemplateQT") public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory1()); } }
tips:
- 此处请务必加@Bean(name = “kafkaTemplateQT”) 因为引用的时候如果使用@Autowired是使用的A环境的配置
- producerFactory1需要修改该处注入的名称,开始我以为这里有类的概念,不会出问题,怎料一直都是引用A环境的,浪费了好会儿才找到是这的问题。因为此处是按照bean去拿的。producerFactory A环境先初始化,因此此处需要修改名称。
@Resource(name = "kafkaTemplateQT") //此处也可以用@Qualifier private KafkaTemplatekafkaTemplate; kafkaTemplate.send(pushTopic, record.value());
今日感触,未实现的时候感觉要实现有点难,当真正实现了就会觉得也不是那么困难啦。知识在于分享~如有铁子发现有bug,还望多多指正!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)