一、安装kafka(前提是已安装了zookeeper),这里介绍docker安装方式
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_ConNECT=xxxx:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
-e KAFKA_BROKER_ID=0 broker标识,集群中用来区分不同的broker
-e KAFKA_ZOOKEEPER_ConNECT=10.9.44.11:2181/ 配置zookeeper地址
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092 把kafka的地址端口注册给zookeeper
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置端口
注意:如果连接程序与kafka服务不在同一服务器上KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092这里的ip地址要设置成公网ip,否则在验证时会报:Connection to node 0 could not be established. Broker may not be available的错误。
二、导入依赖
org.springframework.kafka spring-kafkaorg.springframework.kafka spring-kafka-testtest
三、修改application.yml配置(配置详解看这位老哥的这里)
spring: kafka: bootstrap-servers: http://xxxx:9092 # 指定kafka server的地址,集群配多个,中间,逗号隔开 producer: # 生产者配置 retries: 0 batch-size: 16384 # 每次批量发送消息的数量,produce积累到一定数据,一次发送 buffer-memory: 33554432 # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据 acks: 1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # 消费者配置 group-id: test2 # 指定默认消费者group id auto-offset-reset: latest # 消费位置起始点 enable-auto-commit: true # 设置自动提交offset auto-commit-interval: 1000 #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
application.properties配置
spring.kafka.bootstrap-servers=http://xxxx:9092 # 指定kafka server的地址,集群配多个,中间,逗号隔开 # 生产者配置 spring.kafka.producer.retries=0 # 每次批量发送消息的数量,produce积累到一定数据,一次发送 spring.kafka.producer.batch-size=16384 # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.acks=1 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 消费者配置 spring.kafka.consumer.group-id=test2 # 指定默认消费者group id spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.enable-auto-commit=true # 设置自动提交offset spring.kafka.consumer.auto-commit-interval=1000 #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
四、启动类加上@EnableKafka注解
五、消息发送者
@Slf4j @RestController @RequestMapping("/webapi") public class SysServerBusiController { @Autowired private KafkaTemplatekafkaTemplate; } @GetMapping("/send") public String kafkaSendMessage(String topic, String message) { log.info("==========kafka发送消息topic:{}, message:{}", topic, message); kafkaTemplate.send(topic, message); return JSONUtil.toJsonStr("send success"); } }
六、消费者
@Slf4j @Component public class KafkaConsumer { @KafkaListener(topics = "test") private void onMessage(String message){ log.info("===========sys消费:{}", message); } }
七、测试验证
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)