结论:在配置文件中加上spring.kafka.listener.type=batch 。
完整报错信息如下:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void com.kafka.gateway.consumer.heartBeatConsumer.listensGroupPro(org.apache.kafka.clients.consumer.ConsumerRecords,org.springframework.kafka.support.Acknowledgment)] Bean [com.kafka.gateway.consumer.heartBeatConsumer@3cceaa40]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}], failedMessage=GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}], failedMessage=GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2636) [spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeonMessage(KafkaMessageListenerContainer.java:2606) [spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeonMessage(KafkaMessageListenerContainer.java:2567) [spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2481) [spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2403) [spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2282) [spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1956) [spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1351) [spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1342) [spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1234) [spring-kafka-2.8.0.jar:2.8.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_211] at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[na:1.8.0_211] at java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:1.8.0_211] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211] Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:374) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeonMessage(KafkaMessageListenerContainer.java:2586) [spring-kafka-2.8.0.jar:2.8.0] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}], failedMessage=GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeonMessage(KafkaMessageListenerContainer.java:2586) [spring-kafka-2.8.0.jar:2.8.0] ... 12 common frames omitted Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.3.13.jar:5.3.13] at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.3.13.jar:5.3.13] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.3.13.jar:5.3.13] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-5.3.13.jar:5.3.13] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.0.jar:2.8.0] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.0.jar:2.8.0] ... 15 common frames omitted
我的消费端写法如下:
@KafkaListener(topics = "heartbeat4", groupId = "gateway2") public void listensGroupPro(ConsumerRecordsrecords, Acknowledgment ack) {
当然我最一开始肯定不是这么简单的写法,只不过后面这个bug,我逐步去掉了一些配置。
我看报错是类型转换异常,我就又换成了
@KafkaListener(topics = "heartbeat4", groupId = "gateway2") public void listensGroupPro(List> records, Acknowledgment ack) { System.out.println(records); for (ConsumerRecord record : records) { System.out.println(record.value()); } // 手动提交offset,里面的逻辑是采用的同步提交,尝试3次 ack.acknowledge(); }
可是还是报错,是在for循环里面报的错,并且离谱的是sout可以正常打印,但是打断点发现我的字符串竟然被拆分了!
这显然不对啊,我后面又尝试把String换成?,还是不行,debug里面的源码也是找的迷路,看了很多博客,都是正常使用,其实我最一开始也是这样没问题的,不过昨天一重启批量消费就不行了!后来看很多博客配置,他们都是在yml里面正常配置,然后去自己手写一些工厂类读取配置然后注入容器。
factory.setBatchListener(batchListener);
我觉得这样的话,springkafka出的就没有意义了。后来果然让我找到了这个配置
spring.kafka.listener.type=batch
而默认是single的!
我也不知道为什么要这样默认,不过改掉之后就可以批量消费了。最不能理解的是为什么一开始我没有加却可以批量消费正常使用,然后突然拉闸。
生产端代码:
@RestController @RequestMapping("/mmm") public class TestProducer { @Autowired private KafkaTemplatekafkaTemplate; @GetMapping(value = "/aaa") public void aaa(){ for (int i = 0; i < 10 ; i++) { kafkaTemplate.send("heartbeat4", "key", "{"index":"1000","ip":"666666666666666","timestamp":1638263084,"type":"6","value":[]}"); System.out.println("发送成功"); } } }
完整配置:
spring: kafka: bootstrap-servers: 你的server producer: retries: 3 batch-size: 16384 buffer-memory: 33554432 acks: 1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上 # 次提交时间⼤于TIME时提交 # TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理 # record数量⼤于等于COUNT时提交 # COUNT # TIME | COUNT 有⼀个条件满⾜时提交 # COUNT_TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调 # ⽤Acknowledgment.acknowledge()后提交 # MANUAL # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种 # MANUAL_IMMEDIATE ack-mode: MANUAL_IMMEDIATE type: batch
如有疑问可以评论区提问。
如需转载,请务必声明原处。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)