-
linux环境(vm环境)
-
docker环境
-
zookeeper 环境
kafka的工作依赖于zookeeper,在搭建kafka集群时,必须搭建好zookeeper集群,准备三台服务器或虚拟机比较麻烦,为了简化使用docker环境。
参考zookeeper集群(docker)搭建
如图三个zookeeper容器组成的集群
-
拉取镜像
docker pull wurstmeister/kafka
-
创建容器
docker run -d --name=kafka1 --restart=always -p 9092:9092 --network=my-net -e KAFKA_ADVERTISED_HOST_NAME=192.168.48.131 -e HOST_IP=192.168.48.131:9092 -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2182,zookeeper3:2183 -e KAFKA_BROKER_ID=0 wurstmeister/kafka:latest
参数说明:
- –network: 使用docker 自定义的网络通道
- KAFKA_ADVERTISED_HOST_NAME:宿主机地址
- KAFKA_ADVERTISED_PORT:宿主机端口
- KAFKA_ZOOKEEPER_CONNECT:zookeeper集群地址
- KAFKA_BROKER_ID:broked.id集群中必须唯一
- HOST_IP:暴露的宿主机地址
如上创建三个容器
注:修改容器名称与端口号
3、kafka集群监控使用KafkaOffsetMonitor-assembly-0.4.6.jar对kafka集群监控
1、在/opt/module/下创建kafka-offset-console文件夹
2、将上传的jar包放入刚创建的目录下
3、在/opt/module/kafka-offset-console目录下创建mobile-logs文件夹
4、在/opt/module/kafka-offset-console目录下创建启动脚本start.sh
java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --kafkaBrokers 192.168.48.131:9092,192.168.48.131:9093,192.168.48.131:9094 --kafkaSecurityProtocol PLAINTEXT --zk 192.168.48.131:2181,192.168.48.131:2182,192.168.48.131:2183 --port 8086 --refresh 10.seconds --retain 2.days --dbName offsetapp_kafka &
5、启动监控
./start.sh
6、在主机访问测试
致此集群搭建完成;
二、springboot整合 1、导入依赖2、配置文件org.springframework.kafka spring-kafka
server.port=8080 #============== kafka =================== spring.kafka.bootstrap-servers=192.168.48.131:9092,192.168.48.131:9093,192.168.48.131:9094 #=============== provider ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.interceptor.class=com.example.demo.Interceptor.TimeInterceptor,com.example.demo.Interceptor.CounterInterceptor spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= spring.kafka.consumer.group-id=user-log-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer3、配置类
@Configuration public class KafkaConfigration { @Autowired private KafkaProperties properties; @Value("#{'${spring.kafka.producer.interceptor.class}'.split(',')}") private ArrayList4、Controller层interceptors; @Bean public ProducerFactory, ?> kafkaProducerFactory(ObjectProvider customizers) { Map map = this.properties.buildProducerProperties(); map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors); DefaultKafkaProducerFactory, ?> factory = new DefaultKafkaProducerFactory<>(map); String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix(); if (transactionIdPrefix != null) { factory.setTransactionIdPrefix(transactionIdPrefix); } customizers.orderedStream().forEach((customizer) -> customizer.customize(factory)); return factory; } }
@Autowired private KafkaTemplate kafkaTemplate; //从前端接收消息,并调用生产者封装的api发送消息 @GetMapping("/sendMassage/{massage}") public String sendMassage(@PathVariable("massage") String massage){ kafkaTemplate.send("first", JSON.toJSONString(massage)); return "消息已发送"; }5、消费消息
@KafkaListener(topics = {"first"}) public String receMassage(ConsumerRecord,?> consumerRecord){ //判断是否为null Optional> kafkaMessage = Optional.ofNullable(consumerRecord.value()); if(kafkaMessage.isPresent()){ //得到Optional实例中的值 Object message = kafkaMessage.get(); System.err.println("消费消息:"+message); } return null; }6、拦截器
在拦截器中对消息进行处理
1、时间拦截器@Component public class TimeInterceptor implements ProducerInterceptor2、计数拦截器{ @Override public void configure(Map map) { } @Override public ProducerRecord onSend(ProducerRecord producerRecord) { return new ProducerRecord(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(), new SimpleDateFormat("yyyy/MM/dd HH-mm-ss").format(System.currentTimeMillis()) + "," + producerRecord.value().toString()); } @Override public void onAcknowledgement(Recordmetadata recordmetadata, Exception e) { } @Override public void close() { } }
@Component public class CounterInterceptor implements ProducerInterceptor7、日志配置{ private int errorCounter = 0; private int successCounter = 0; @Override public ProducerRecord onSend(ProducerRecord producerRecord) { return producerRecord; } @Override public void onAcknowledgement(Recordmetadata recordmetadata, Exception e) { // 统计成功和失败的次数 if (e == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存结果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } @Override public void configure(Map map) { } }
8、测试logback %d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n true poslog/%d{yyyy-MM-dd}/%d{yyyy-MM-dd}.log %d{yyyy-MM-dd HH:mm:ss} -%msg%n %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)