kafka集群(docker环境)及springboot整合

kafka集群(docker环境)及springboot整合,第1张

kafka集群(docker环境)及springboot整合 kafka集群(docker环境)及springboot整合 kafka理论 一、kafka集群搭建 1、环境准备
  • linux环境(vm环境)

  • docker环境

  • zookeeper 环境

    kafka的工作依赖于zookeeper,在搭建kafka集群时,必须搭建好zookeeper集群,准备三台服务器或虚拟机比较麻烦,为了简化使用docker环境。

    参考zookeeper集群(docker)搭建

    如图三个zookeeper容器组成的集群

2、kafka集群搭建 1、集群规划

2、集群搭建
  • 拉取镜像

    docker pull wurstmeister/kafka
    

  • 创建容器

    docker run -d 
    --name=kafka1 
    --restart=always 
    -p 9092:9092 
    --network=my-net  
    -e KAFKA_ADVERTISED_HOST_NAME=192.168.48.131  
    -e HOST_IP=192.168.48.131:9092 
    -e KAFKA_ADVERTISED_PORT=9092 
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2182,zookeeper3:2183  
    -e KAFKA_BROKER_ID=0 
    wurstmeister/kafka:latest
    

    参数说明:

    • –network: 使用docker 自定义的网络通道
    • KAFKA_ADVERTISED_HOST_NAME:宿主机地址
    • KAFKA_ADVERTISED_PORT:宿主机端口
    • KAFKA_ZOOKEEPER_CONNECT:zookeeper集群地址
    • KAFKA_BROKER_ID:broked.id集群中必须唯一
    • HOST_IP:暴露的宿主机地址

如上创建三个容器

注:修改容器名称与端口号

3、kafka集群监控

使用KafkaOffsetMonitor-assembly-0.4.6.jar对kafka集群监控

1、在/opt/module/下创建kafka-offset-console文件夹

2、将上传的jar包放入刚创建的目录下

3、在/opt/module/kafka-offset-console目录下创建mobile-logs文件夹

4、在/opt/module/kafka-offset-console目录下创建启动脚本start.sh

java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar 
com.quantifind.kafka.offsetapp.OffsetGetterWeb 
--offsetStorage kafka 
--kafkaBrokers 192.168.48.131:9092,192.168.48.131:9093,192.168.48.131:9094 
--kafkaSecurityProtocol PLAINTEXT 
--zk 192.168.48.131:2181,192.168.48.131:2182,192.168.48.131:2183 
--port 8086 
--refresh 10.seconds 
--retain 2.days 
--dbName offsetapp_kafka &

5、启动监控

./start.sh

6、在主机访问测试

致此集群搭建完成;

二、springboot整合 1、导入依赖

	org.springframework.kafka
    spring-kafka

2、配置文件
server.port=8080
#============== kafka ===================
spring.kafka.bootstrap-servers=192.168.48.131:9092,192.168.48.131:9093,192.168.48.131:9094

#=============== provider  =======================
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.interceptor.class=com.example.demo.Interceptor.TimeInterceptor,com.example.demo.Interceptor.CounterInterceptor

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
spring.kafka.consumer.group-id=user-log-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3、配置类
@Configuration
public class KafkaConfigration {

    @Autowired
    private KafkaProperties properties;

    @Value("#{'${spring.kafka.producer.interceptor.class}'.split(',')}")
    private ArrayList interceptors;

    @Bean
    public ProducerFactory kafkaProducerFactory(ObjectProvider customizers) {
        Map map = this.properties.buildProducerProperties();
        map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(map);
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }
}
4、Controller层
@Autowired
private KafkaTemplate kafkaTemplate;
//从前端接收消息,并调用生产者封装的api发送消息
@GetMapping("/sendMassage/{massage}")
public String sendMassage(@PathVariable("massage") String massage){
    kafkaTemplate.send("first", JSON.toJSONString(massage));
    return "消息已发送";
}
5、消费消息
    @KafkaListener(topics = {"first"})
    public String receMassage(ConsumerRecord consumerRecord){
        //判断是否为null
        Optional kafkaMessage = Optional.ofNullable(consumerRecord.value());
        if(kafkaMessage.isPresent()){
            //得到Optional实例中的值
            Object message = kafkaMessage.get();
            System.err.println("消费消息:"+message);
        }
        return null;
    }
6、拦截器

在拦截器中对消息进行处理

1、时间拦截器
@Component
public class TimeInterceptor implements ProducerInterceptor {

    @Override
    public void configure(Map map) {

    }

    
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return new ProducerRecord(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(),
                new SimpleDateFormat("yyyy/MM/dd HH-mm-ss").format(System.currentTimeMillis()) + "," + producerRecord.value().toString());
    }

    @Override
    public void onAcknowledgement(Recordmetadata recordmetadata, Exception e) {

    }

    @Override
    public void close() {

    }
}
2、计数拦截器
@Component
public class CounterInterceptor implements ProducerInterceptor {

    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(Recordmetadata recordmetadata, Exception e) {
        // 统计成功和失败的次数
        if (e == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {
        // 保存结果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }

    @Override
    public void configure(Map map) {

    }
}
7、日志配置


    logback
    
    
        
        
            %d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
        
    

    
        true
        
            
                poslog/%d{yyyy-MM-dd}/%d{yyyy-MM-dd}.log
            
        
        
            
                %d{yyyy-MM-dd HH:mm:ss} -%msg%n
            
        
    
    
        
            
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
            
        
    

    
        
    
    
        
        
    

8、测试


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5135995.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-17
下一篇 2022-11-17

发表评论

登录后才能评论

评论列表(0条)

保存