SpringbootKaka批量消费报Listener method could not be invoked with the incoming message

SpringbootKaka批量消费报Listener method could not be invoked with the incoming message,第1张

SpringbootKaka批量消费报Listener method could not be invoked with the incoming message

结论:在配置文件中加上spring.kafka.listener.type=batch 。

完整报错信息如下:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.kafka.gateway.consumer.heartBeatConsumer.listensGroupPro(org.apache.kafka.clients.consumer.ConsumerRecords,org.springframework.kafka.support.Acknowledgment)]
Bean [com.kafka.gateway.consumer.heartBeatConsumer@3cceaa40]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}], failedMessage=GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}], failedMessage=GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2636) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeonMessage(KafkaMessageListenerContainer.java:2606) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeonMessage(KafkaMessageListenerContainer.java:2567) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2481) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2403) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2282) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1956) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1351) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1342) [spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1234) [spring-kafka-2.8.0.jar:2.8.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_211]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[na:1.8.0_211]
	at java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:1.8.0_211]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211]
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:374) ~[spring-kafka-2.8.0.jar:2.8.0]
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.0.jar:2.8.0]
		at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.0.jar:2.8.0]
		at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.0.jar:2.8.0]
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeonMessage(KafkaMessageListenerContainer.java:2586) [spring-kafka-2.8.0.jar:2.8.0]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}], failedMessage=GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352) ~[spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeonMessage(KafkaMessageListenerContainer.java:2586) [spring-kafka-2.8.0.jar:2.8.0]
	... 12 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload={"index":"1000","ip":"A6-B1-C1-D6-FE-34","timestamp":1638263084,"type":"6","value":[]}, headers={kafka_offset=71, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@585c68b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=key, kafka_receivedTopic=heartbeat4, kafka_receivedTimestamp=1638495075056, kafka_acknowledgment=Acknowledgment for heartbeat4-0@71, kafka_groupId=gateway2}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.8.0.jar:2.8.0]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347) ~[spring-kafka-2.8.0.jar:2.8.0]
	... 15 common frames omitted

我的消费端写法如下:

@KafkaListener(topics = "heartbeat4", groupId = "gateway2")
    public void listensGroupPro(ConsumerRecords records, Acknowledgment ack) {

当然我最一开始肯定不是这么简单的写法,只不过后面这个bug,我逐步去掉了一些配置。

我看报错是类型转换异常,我就又换成了

@KafkaListener(topics = "heartbeat4", groupId = "gateway2")
    public void listensGroupPro(List> records, Acknowledgment ack) {
        System.out.println(records);
        for (ConsumerRecord record : records) {
            System.out.println(record.value());
        }
        // 手动提交offset,里面的逻辑是采用的同步提交,尝试3次
        ack.acknowledge();
    }

可是还是报错,是在for循环里面报的错,并且离谱的是sout可以正常打印,但是打断点发现我的字符串竟然被拆分了!

 

 这显然不对啊,我后面又尝试把String换成?,还是不行,debug里面的源码也是找的迷路,看了很多博客,都是正常使用,其实我最一开始也是这样没问题的,不过昨天一重启批量消费就不行了!后来看很多博客配置,他们都是在yml里面正常配置,然后去自己手写一些工厂类读取配置然后注入容器。

factory.setBatchListener(batchListener);

我觉得这样的话,springkafka出的就没有意义了。后来果然让我找到了这个配置

spring.kafka.listener.type=batch 

而默认是single的!

我也不知道为什么要这样默认,不过改掉之后就可以批量消费了。最不能理解的是为什么一开始我没有加却可以批量消费正常使用,然后突然拉闸。

生产端代码:

@RestController
@RequestMapping("/mmm")
public class TestProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @GetMapping(value = "/aaa")
    public  void aaa(){
        for (int i = 0; i < 10 ; i++) {
            kafkaTemplate.send("heartbeat4", "key", "{"index":"1000","ip":"666666666666666","timestamp":1638263084,"type":"6","value":[]}");
            System.out.println("发送成功");
        }
    }
}

完整配置:

spring:
  kafka:
    bootstrap-servers: 你的server
    producer:
      retries: 3
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500

    listener:
      # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上
      # 次提交时间⼤于TIME时提交
      # TIME
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理
      # record数量⼤于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有⼀个条件满⾜时提交
      # COUNT_TIME
      # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调
      # ⽤Acknowledgment.acknowledge()后提交
      # MANUAL
      # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE
      type: batch

如有疑问可以评论区提问。

如需转载,请务必声明原处。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5638731.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存