springboot使用kafka发送日志到elk

springboot使用kafka发送日志到elk,第1张

springboot使用kafka发送日志到elk

单机单节点 篇

win10环境(jdk1.8)

一、环境搭建

1,安装elasticsearch

2,安装kibana

3,安装logstash

4,安装kafka

二、项目搭建并运行验证

1,搭建项目并运行验证

三、安装过程记录

1,简单说明

一.1-3 为elk通用方案的内容,使用任何语言项目都可参考

一.4 为使用kafka作为消息队列异步收集项目日志并传递到logstash,有使用该方案的可参考

二.1 为springboot使用log4j将日志传递到kafka的实例

2,安装步骤

2.1 elk官网下载三个应用的安装包(elasticsearch,kibana,logstash)

https://www.elastic.co/cn/

elasticsearch 7.16.3:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.16.3-windows-x86_64.zip

kibana 7.16.3:

https://artifacts.elastic.co/downloads/kibana/kibana-7.16.3-windows-x86_64.zip

logstash 7.16.3:

https://artifacts.elastic.co/downloads/logstash/logstash-7.16.3-windows-x86_64.zip

2.2 部署运行elk

以 D:elk 路径为elk环境的路径

路径下新建三个文件夹 elasticsearch        kinaba        logstash

将下载的三个包分别解压到对用的路径下

2.2.1 启动配置elasticsearch

启动

        终端或者powershell中执行 D:elkelasticsearchbinelasticsearch.bat

配置 最简配 不做配置

2.2.2 启动配置kibana

编辑

        kibana/conf/kibana.yml

        elasticsearch.hosts: "http://localhost:9200"

启动

        终端或者powershell中执行 D:elkkibanabinkibana.bat

2.2.3 启动配置logstash

编辑

        D:elklogstashconfiglogstash.conf

        

input {
  kafka {
    bootstrap_servers => "127.0.0.1:9092"
    group_id => "log_topic"
    client_id => "logstash_01"
    auto_offset_reset => "latest"
    topics => ["kafka_log_topic"]	
    add_field => {"logs_type" => "springboot"}		
    codec => json { charset => "UTF-8" }
  }
}

output {
  elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "springboot"
    #user => "elastic"
    #password => "changeme"
  }
}

启动

        D:        cd elklogstash

        

.binlogstash.bat -f .configlog4j2.properties

验证:

访问 localhost:5601 即可进入kibana页面

3. kafka安装启动

        kafka官网 https://kafka.apache.org/

        下载 https://dlcdn.apache.org/kafka/2.8.1/kafka_2.12-2.8.1.tgz

        以路径D:kafka为例

        解压在该路径下

编辑

        configzookeeper.properties

       

dataDir=D:kafkadata

        

       configserver.properties

log.dirs=D:kafkalogka
zookeeper.connect=localhost:2181

        启动

        进入kafka路径下 分别用两个终端或者powershell执行下列命令启动zk和kafka

.binwindowszookeeper-server-start.bat .configzookeeper.properties
.binwindowskafka-server-start.bat .configserver.properties

        创建log信息的topic(路径不变)

.binwindowskafka-topics.bat --zookeeper localhost:2181 --create --topic kafka_log_topic  --partitions 1 --replication-factor 1

看到控制台输出:Created topic kafka_log_topic_2. 则成功

四、建立项目并启动验证

4.1 创建springboot项目

添加依赖:


            org.springframework.boot
            spring-boot-starter-web
            2.5.6
        
        
            org.springframework.kafka
            spring-kafka
        
        
            com.github.danielwegener
            logback-kafka-appender
            0.2.0-RC2
        
        
            org.springframework.boot
            spring-boot-starter-log4j2
        

controll

er

@RestController
@RequestMapping(value = "/log")
public class LogController {

    Logger logger = LoggerFactory.getLogger(LogController.class);
    private int num = 1;

    @RequestMapping(value = "/log", method = RequestMethod.GET)
    public String log() throws InterruptedException {
        while (true) {
            Thread.sleep(100);
            logger.info("just a log");
            if (num++ % 100 == 0) {
                break;
            }
        }
        return "ok";
    }

}

添加kafka配置

application.properties

logging.config=classpath:log4j.xml
# kafka服务器地址,多个集群用逗号分隔
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=default_consumer_group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

log4j.xml




    
        
        
        log
        kafka_log_topic
        localhost:9092
    

    
        
        
            
            
            
            
        
        
            
        
        
            
            ${bootstrap_servers}
            
            
        
    

    
        
            
            
            
        
         
        
    

启动项目

浏览器访问 localhost:8080/log/log 记录日志消息

4.2 验证

打开 http://localhost:5601/app/management​​​​​​

点击左侧栏 Kibana->Index Patterns 创建检索的索引

可以看到右边有springboot索引候选

输入springboot*  Timestamp field随便选一个 创建即完成

访问 ​​​​​​http://localhost:5601/app/discover#/​​​​​​

选择 刚建立的索引即可看到springboot项目的日志可以在这里检索。

五、elk kafka涉及知识点

待补充

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705579.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存