使用@KafkaListener 注解进行消费时,出现如下报错:
Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecord]
原因是默认没有开启批量监听的,解决办法是设置注解的 containerFactory 属性。
完整代码如下
1)、批量 消费监听工厂类
@Configuration public class KafkaConfiguration { @Bean public KafkaListenerContainerFactory> batchFactory(KafkaProperties properties) { MapconsumerProperties = properties.buildConsumerProperties(); ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProperties)); factory.setBatchListener(true); // 开启批量监听 return factory; } }
2)、消费监听
@Component @RequiredArgsConstructor public class GxdcKafkaConsumer { private final GxdcService gxdcService; @KafkaListener(topics = {"${spring.kafka.consumer.topics.ods_dc_count_result}"}, containerFactory = "batchFactory") public void listen(ConsumerRecordsrecords, Consumer consumer) { if (records.isEmpty()) { return; } // 消息逻辑处理 for (ConsumerRecord record : records) { switch (GxdcKeyEnum.getInstance(record.key())) { case DC_POINT: BikePointInfo pointInfo = JSON.parseObject(record.value(), BikePointInfo.class); gxdcService.saveBikePointInfo(pointInfo); break; case DC_ORDER: OrderSummaryInfo summaryInfo = JSON.parseObject(record.value(), OrderSummaryInfo.class); gxdcService.saveOrderSummaryInfo(summaryInfo); break; default: } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)