文章目录本文来说下SpringBoot整合kafka之kafka分区实战
准备工作程序代码程序测试本文小结
准备工作
当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send(“topic1”, normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下
初始化配置信息
import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaInitialConfiguration { @Bean public NewTopic initialTopic() { return new NewTopic("topic.quick.initial",8, (short) 1 ); } @Bean public NewTopic initialTopic2() { return new NewTopic("topic.quick.initial",11, (short) 1 ); } }
程序代码
@Slf4j @RestController @RequestMapping("/api/kafka") @Api(tags = "kafka测试开发") public class KafkaController { @Autowired private KafkaTemplatekafkaTemplate; @Resource(name = "initialTopic") private NewTopic newTopic; @GetMapping("/callbackOne") @ApiOperation(value = "带回调的生产者") public void sendMessage2(@RequestParam("message") @ApiParam(value="消息",required = true) String callbackMessage) { log.info("========================================>>>"); log.info(newTopic.name()); // 带回调的生产者 kafkaTemplate.send(newTopic.name(), callbackMessage).addCallback(success -> { // 消息发送到的topic String topic = success.getRecordmetadata().topic(); // 消息发送到的分区 int partition = success.getRecordmetadata().partition(); // 消息在分区内的offset long offset = success.getRecordmetadata().offset(); log.info("发送消息成功:" + topic + "-" + partition + "-" + offset + "-" + callbackMessage); }, failure -> { log.info("发送消息失败:" + failure.getMessage()); }); } }
消费者
@Component @Slf4j public class KafkaConsumer { // 消费监听 @KafkaListener(topics = {"topic.quick.initial"}) public void onMessage1(ConsumerRecord, ?> record){ // 消费的哪个topic、partition的消息,打印出消息内容 log.info("==============================================>"); StringBuffer sb = new StringBuffer(); // 主题 sb.append(record.topic() + "-"); // 分区 sb.append(record.partition() + "-"); // 需要消费的值 sb.append(record.value() + "-"); // 位移 sb.append(record.offset()); log.info("消费者进行消费:"+ sb); } }
程序测试
使用swagger来进行程序测试
本文小结
本文简单进行了SpringBoot整合kafka之kafka分区实战。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)