提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录- Springboot中使⽤Kafka
- 1.引⼊依赖
- 2.编写配置⽂件-----yml
- 3.消息⽣产者
- 4.消费者
- 5.消费者中配置消费主题、分区和偏移量
Springboot中使⽤Kafka 1.引⼊依赖
2.编写配置⽂件-----ymlorg.springframework.kafka spring-kafka
server: port: 8080 spring: kafka: bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 producer: retries: 3 batch-size: 16384 buffer-memory: 33554432 acks: 1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交 # TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交 # COUNT # TIME | COUNT 有⼀个条件满⾜时提交 # COUNT_TIME # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交 # MANUAL # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种 # MANUAL_IMMEDIATE ack-mode: MANUAL_IMMEDIATE redis: host: 172.16.253.213.消息⽣产者
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/msg") public class MyKafkaController { private final static String TOPIC_NAME = "my-replicated-topic"; @Autowired private KafkaTemplate4.消费者kafkaTemplate; @RequestMapping("/send") public String sendMessage() { kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a message!"); return "send success!"; } }
package com.example.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class MyConsumer { @KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup1") public void listenGroup(ConsumerRecord5.消费者中配置消费主题、分区和偏移量record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //手动提交offset ack.acknowledge(); } //不同的方式 @KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup2") public void listensGroup(ConsumerRecords records, Acknowledgment ack) { for (ConsumerRecord record : records) { System.out.printf(record.value()); } //手动提交offset ack.acknowledge(); } }
package com.example.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class MyConsumer { @KafkaListener(groupId = "testGroup", topicPartitions = { @TopicPartition(topic = "topic1", partitions = {"0", "1"}),//concurrency就是同组下的消费者个数,就是并发消费数,建议小于等于分区总数 @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))}, concurrency = "3") public void listenGroupPro(ConsumerRecordrecord, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //手动提交offset ack.acknowledge(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)