springboot2整合kafka

springboot2整合kafka,第1张

springboot2整合kafka

一、安装kafka(前提是已安装了zookeeper),这里介绍docker安装方式

docker run  -d --name kafka 
-p 9092:9092 
-e KAFKA_BROKER_ID=0 
-e KAFKA_ZOOKEEPER_ConNECT=xxxx:12181 
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092 
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 
-t wurstmeister/kafka

-e KAFKA_BROKER_ID=0 broker标识,集群中用来区分不同的broker

-e KAFKA_ZOOKEEPER_ConNECT=10.9.44.11:2181/ 配置zookeeper地址

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092 把kafka的地址端口注册给zookeeper

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置端口

注意:如果连接程序与kafka服务不在同一服务器上KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:9092这里的ip地址要设置成公网ip,否则在验证时会报:Connection to node 0 could not be established. Broker may not be available的错误。

二、导入依赖


	org.springframework.kafka
	spring-kafka


	org.springframework.kafka
	spring-kafka-test
	test

三、修改application.yml配置(配置详解看这位老哥的这里)

spring:
  kafka:
    bootstrap-servers: http://xxxx:9092 # 指定kafka server的地址,集群配多个,中间,逗号隔开
    producer: # 生产者配置
      retries: 0
      batch-size: 16384 # 每次批量发送消息的数量,produce积累到一定数据,一次发送
      buffer-memory: 33554432 # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # 消费者配置
      group-id: test2 # 指定默认消费者group id
      auto-offset-reset: latest # 消费位置起始点
      enable-auto-commit: true # 设置自动提交offset
      auto-commit-interval: 1000 #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

application.properties配置

spring.kafka.bootstrap-servers=http://xxxx:9092 # 指定kafka server的地址,集群配多个,中间,逗号隔开
# 生产者配置
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384 
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432 
spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消费者配置
spring.kafka.consumer.group-id=test2 # 指定默认消费者group id
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true # 设置自动提交offset
spring.kafka.consumer.auto-commit-interval=1000 #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

四、启动类加上@EnableKafka注解

五、消息发送

@Slf4j
@RestController
@RequestMapping("/webapi")
public class SysServerBusiController {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    }

    @GetMapping("/send")
    public String kafkaSendMessage(String topic, String message) {
        log.info("==========kafka发送消息topic:{}, message:{}", topic, message);
        kafkaTemplate.send(topic, message);
        return JSONUtil.toJsonStr("send success");
    }
}

六、消费者

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "test")
    private void onMessage(String message){
        log.info("===========sys消费:{}", message);
    }
}

七、测试验证

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5677978.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存