SpringBoot整合kafka之kafka分区实战

SpringBoot整合kafka之kafka分区实战,第1张

SpringBoot整合kafka之kafka分区实战

本文来说下SpringBoot整合kafka之kafka分区实战

文章目录

准备工作程序代码程序测试本文小结


准备工作

当然我们也可以不手动创建topic,在执行代码kafkaTemplate.send(“topic1”, normalMessage)发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下

初始化配置信息

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaInitialConfiguration {

    
    @Bean
    public NewTopic initialTopic() {

        return new NewTopic("topic.quick.initial",8, (short) 1 );
    }


    
    @Bean
    public NewTopic initialTopic2() {

        return new NewTopic("topic.quick.initial",11, (short) 1 );
    }

}

程序代码

生产者

@Slf4j
@RestController
@RequestMapping("/api/kafka")
@Api(tags = "kafka测试开发")
public class KafkaController {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Resource(name = "initialTopic")
    private NewTopic newTopic;

    @GetMapping("/callbackOne")
    @ApiOperation(value = "带回调的生产者")
    public void sendMessage2(@RequestParam("message") @ApiParam(value="消息",required = true) String callbackMessage) {

        log.info("========================================>>>");
        log.info(newTopic.name());

        // 带回调的生产者
        kafkaTemplate.send(newTopic.name(), callbackMessage).addCallback(success -> {

            // 消息发送到的topic
            String topic = success.getRecordmetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordmetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordmetadata().offset();
            log.info("发送消息成功:" + topic + "-" + partition + "-" + offset + "-" + callbackMessage);
        }, failure -> {
            log.info("发送消息失败:" + failure.getMessage());
        });
    }
    
}

消费者

@Component
@Slf4j
public class KafkaConsumer {

    // 消费监听
    @KafkaListener(topics = {"topic.quick.initial"})
    public void onMessage1(ConsumerRecord record){

        // 消费的哪个topic、partition的消息,打印出消息内容
        log.info("==============================================>");
        StringBuffer sb = new StringBuffer();
        // 主题
        sb.append(record.topic() + "-");
        // 分区
        sb.append(record.partition() + "-");
        // 需要消费的值
        sb.append(record.value() + "-");
        // 位移
        sb.append(record.offset());

        log.info("消费者进行消费:"+ sb);
    }
}

程序测试

使用swagger来进行程序测试


本文小结

本文简单进行了SpringBoot整合kafka之kafka分区实战。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705473.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存