(1)创建Topic
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --create --replication-factor 3 --partitions 1 --topic springboot
(2)SpringBoot项目测试
需求:将分区数目增加至为8
package net.testclass.testclasskafka; import org.apache.kafka.clients.admin.*; import org.junit.jupiter.api.Test; import java.util.*; import java.util.concurrent.ExecutionException; public class KafkaAdminTest { // 1、定义topic名称 private static final String TOPIC_NAME = "springboot"; // 2、设置admin客户端 public static AdminClient initAdiminClient(){ Properties properties = new Properties(); //指定服务ip properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092"); //创建客户端 AdminClient adminClient = AdminClient.create(properties); return adminClient; } // 3、获取topic @Test public void descTopic() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdiminClient(); // 3.1 增加分区到12个 NewPartitions newPartitions = NewPartitions.increaseTo(8); // 3.2、创建Map并将topic名称、增加分区数传入其中 MapinfoMap = new HashMap<>(); infoMap.put(TOPIC_NAME,newPartitions); // 3.3、将Map传入接口 CreatePartitionsResult createPartitionsResult= adminClient.createPartitions(infoMap); // 3.4、抛异常 createPartitionsResult.all().get(); } }
(3)查看更改后的topic配置信息
bin/kafka-topics.sh --describe --zookeeper hadoop102:2181/kafka --topic springboot2、SpringBoot项目整合Spring-kafka依赖发送消息
项目目录如下
(1)添加依赖
4.0.0 org.springframework.boot spring-boot-starter-parent2.5.8 net.testclass testclass-kafka0.0.1-SNAPSHOT testclass-kafka Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-testtest org.springframework.kafka spring-kafkaorg.springframework.boot spring-boot-maven-plugin
(2)application配置文件修改增加生产者信息
server: port: 8080 logging: config: classpath:logback.xml spring: kafka: bootstrap-servers: 192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092 producer: # 消息重发的次数 retries: 3 # 一个批次可以使用的内存大小 batch-size: 16384 # 设置生产者内存缓冲区的大小 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all
(3)编写Controller
package net.testclass.testclasskafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @RestController public class UserController { // 1、声明topic private static final String TOPIC_NAME="springboot"; // 2、注入kafka @Autowired private KafkaTemplatekafkaTemplate; // 3、模拟发送消息 @ResponseBody @GetMapping("/api/v1/{num}") public void sendMessage(@PathVariable("num") String num){ kafkaTemplate.send(TOPIC_NAME,"这是一个消息,num="+num).addCallback(success->{ // 消息发送的topic String topic = success.getRecordmetadata().topic(); // 消息发送的分区 int partition = success.getRecordmetadata().partition(); // 消息在分区内的offset long offset = success.getRecordmetadata().offset(); System.out.println(""+ topic + "-" + partition + "-" + offset); }, failure -> { System.out.println("发送消息失败:" + failure.getMessage()); }); } }
(4)编写Application启动类
package net.testclass.testclasskafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
(5)发送消息
在google浏览器中输入:http://localhost:8080/api/v1/222
(6)运行结果
项目目录
(1)application配置文件
server: port: 8080 logging: config: classpath:logback.xml spring: kafka: bootstrap-servers: 192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092 # 配置生产者 producer: # 消息重发的次数 retries: 3 # 一个批次可以使用的内存大小 batch-size: 16384 # 设置生产者内存缓冲区的大小 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all # 配置消费者 consumer: # 自动提交的时间间隔,在SpringBoot2.x版本是值的类型为Duration,需要付恶化特定的格式,如1S,1M,1H,1D auto-commit-interval: 1S # 指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该做如何处理 auto-offset-reset: earliest # 是否自动提交偏移量,默认值是ture,为了避免出现重复数据,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-serializer: org.apache.kafka.common.serialization.StringDeSerializer # 值的反序列化方式 value-serializer: org.apache.kafka.common.serialization.StringDeSerializer listener: # 手工ack,调用ack后立刻提交offset ack-mode: manual_immediate # 容器运行的线程数 concurrency: 4
(2)编写消费监听器
package net.testclass.testclasskafka.mq; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component public class MQListener { @KafkaListener(topics = {"springboot"},groupId = "G1") public void onMessage(ConsumerRecord,?>record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){ System.out.println("消费消息"+record.topic() + ",partition:" + ",value" + record.value()); ack.acknowledge(); } }
(3)调试运行
在浏览器中输入://localhost:8080/api/v1/1996
(4)运行结果
4、SpringBoot整合kafka事务消息实战(注解式事务)(1)application配置文件编写
server: port: 8080 logging: config: classpath:logback.xml spring: kafka: bootstrap-servers: 192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092 # 配置生产者 producer: # 消息重发的次数 retries: 3 # 一个批次可以使用的内存大小 batch-size: 16384 # 设置生产者内存缓冲区的大小 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer # acks: all # 事务id transaction-id-prefix: xdclass-tran # 配置消费者 consumer: # 自动提交的时间间隔,在SpringBoot2.x版本是值的类型为Duration,需要付恶化特定的格式,如1S,1M,1H,1D auto-commit-interval: 1S # 指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该做如何处理 auto-offset-reset: earliest # 是否自动提交偏移量,默认值是ture,为了避免出现重复数据,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-serializer: org.apache.kafka.common.serialization.StringDeSerializer # 值的反序列化方式 value-serializer: org.apache.kafka.common.serialization.StringDeSerializer listener: # 手工ack,调用ack后立刻提交offset ack-mode: manual_immediate # 容器运行的线程数 concurrency: 4
(2)编写UserController
package net.testclass.testclasskafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @RestController public class UserController { // 1、声明topic private static final String TOPIC_NAME="springboot"; // 2、注入kafka @Autowired private KafkaTemplatekafkaTemplate; // 3、模拟发送消息 @GetMapping("/api/v1/{num}") public void sendMessage(@PathVariable("num") String num){ kafkaTemplate.send(TOPIC_NAME,"这是一个消息,num="+num).addCallback(success->{ // 消息发送的topic String topic = success.getRecordmetadata().topic(); // 消息发送的分区 int partition = success.getRecordmetadata().partition(); // 消息在分区内的offset long offset = success.getRecordmetadata().offset(); System.out.println("发送成功:topic="+ topic + ",partition=" + partition + ",offset=" + offset); }, failure -> { System.out.println("发送消息失败:" + failure.getMessage()); }); } // 4、注解方式的事务消息 @GetMapping("/api/v1/tran1") @Transactional(rollbackFor = RuntimeException.class) public void sendMessage1(int num){ kafkaTemplate.send(TOPIC_NAME,"这是个事务消息1 i="+num); if(num == 0){ throw new RuntimeException(); } kafkaTemplate.send(TOPIC_NAME,"这是个事务消息2 i="+num); } }
(3)调试运行一
在浏览器中输入://localhost:8080/api/v1/tran1?num=1996
(4)运行结果一
两个消息都发送出去
(5)调试运行二
在浏览器中输入://localhost:8080/api/v1/tran1?num=0
(6)运行结果二
两个消息都没发送出去(事务回滚)
5、SpringBoot整合kafka事务消息实战(声明式事务)(1)application配置文件编写
server: port: 8080 logging: config: classpath:logback.xml spring: kafka: bootstrap-servers: 192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092 # 配置生产者 producer: # 消息重发的次数 retries: 3 # 一个批次可以使用的内存大小 batch-size: 16384 # 设置生产者内存缓冲区的大小 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer # acks: all # 事务id transaction-id-prefix: xdclass-tran # 配置消费者 consumer: # 自动提交的时间间隔,在SpringBoot2.x版本是值的类型为Duration,需要付恶化特定的格式,如1S,1M,1H,1D auto-commit-interval: 1S # 指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该做如何处理 auto-offset-reset: earliest # 是否自动提交偏移量,默认值是ture,为了避免出现重复数据,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-serializer: org.apache.kafka.common.serialization.StringDeSerializer # 值的反序列化方式 value-serializer: org.apache.kafka.common.serialization.StringDeSerializer listener: # 手工ack,调用ack后立刻提交offset ack-mode: manual_immediate # 容器运行的线程数 concurrency: 4
(2)编写UserController
package net.testclass.testclasskafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @RestController public class UserController { // 1、声明topic private static final String TOPIC_NAME="springboot"; // 2、注入kafka @Autowired private KafkaTemplatekafkaTemplate; // 3、模拟发送消息 @GetMapping("/api/v1/{num}") public void sendMessage(@PathVariable("num") String num){ kafkaTemplate.send(TOPIC_NAME,"这是一个消息,num="+num).addCallback(success->{ // 消息发送的topic String topic = success.getRecordmetadata().topic(); // 消息发送的分区 int partition = success.getRecordmetadata().partition(); // 消息在分区内的offset long offset = success.getRecordmetadata().offset(); System.out.println("发送成功:topic="+ topic + ",partition=" + partition + ",offset=" + offset); }, failure -> { System.out.println("发送消息失败:" + failure.getMessage()); }); } // 4、声明方式的事务消息 @GetMapping("/api/v1/tran1") public void sendMessage1(int num){ kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback () { @Override public Object doInOperations(KafkaOperations kafkaOperations){ kafkaTemplate.send(TOPIC_NAME,"这是个事务消息1 i="+num); if(num == 0){ throw new RuntimeException(); } kafkaTemplate.send(TOPIC_NAME,"这是个事务消息2 i="+num); return true; } }); } }
(3)调试运行一
在浏览器中输入://localhost:8080/api/v1/tran1?num=1996
(4)运行结果一
两个消息都发送出去
(5)调试运行二
在浏览器中输入://localhost:8080/api/v1/tran1?num=0
(6)运行结果二
两个消息都没发送出去(事务回滚)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)