多个springboot项目 logstash推送日志到ES

多个springboot项目 logstash推送日志到ES,第1张

多个springboot项目 logstash推送日志到ES

前面我在https://blog.csdn.net/yyj12138/article/details/124499741中有讲到如何来搭建ELK,这次来谈谈如何将springboot项目的日志推到ES管理。

我们知道一般来说稍微大一点的项目都不会只部署一台机器,那么分布式情况下对日志的管理就比较麻烦,没办法每次排查问题都去各服务器筛选一遍,那样太麻烦了。

logstash为我们提供了file(文件监控)、redis(从redis接收)、mq(从mq接收)、tcp/http(接收tcp或http协议的数据)、beats等等

其中文件监控和tcp/http接收不建议使用,性能不是很好,这里我介绍一下从rabbitMq接收日志。

1、springboot工程的配置

1)引入依赖

<dependency>
    <groupId>org.springframework.bootgroupId>
    <artifactId>spring-boot-starter-amqpartifactId>
dependency>

2)logback-spring.xml配置


<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <define name="ip" class="com.example.boot.config.IpPropertyDefiner"/>
    <springProperty scope="context" name="env" source="spring.profiles.active"/>
    <springProperty scope="context" name="appName" source="spring.application.name"/>
    
    <property name="key" value="^@$^"/>
    <property name="log.level" value="info"/>
    <property name="logstash.pattern" value="${appName}${key}${env}${key}${ip}${key}%d{ISO8601}${key}%t${key}%p${key}%c${key}%m"/>
    
    <appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
        <layout>
            <pattern>${logstash.pattern}pattern>
        layout>
        <host>iphost>
        <port>portport>
        <username>adminusername>
        <password>adminpassword>
        <applicationId>dev-bootapplicationId>
        <routingKeyPattern>logstash-rkproutingKeyPattern>
        <declareExchange>truedeclareExchange>
        <exchangeType>directexchangeType>
        <exchangeName>logstash-exchangeexchangeName>
        <generateId>truegenerateId>
        <charset>UTF-8charset>
        <durable>truedurable>
        <deliveryMode>PERSISTENTdeliveryMode>
    appender>
    
    <appender name="ASYNC_ES_INFO" class="ch.qos.logback.classic.AsyncAppender">
        
        <discardingThreshold>0discardingThreshold>
        
        <queueSize>512queueSize>
        
        <appender-ref ref="AMQP"/>
    appender>
    
    <springProfile name="test">
        <root level="${log.level}">
            <appender-ref ref="ASYNC_ES_INFO"/>
        root>
    springProfile>
configuration>

这里为了获取当前服务器的ip编写了一个类IpPropertyDefiner,为了在分布式环境下能定位到日志来自哪台机器

@Slf4j
public class IpPropertyDefiner extends PropertyDefinerBase {
    @Override
    public String getPropertyValue() {
        InetAddress ia;
        try {
            ia = InetAddress.getLocalHost();
            return ia.getHostAddress();
        } catch (UnknownHostException e) {
            log.error("日志属性ip获取出错");
        }
        return "";
    }
}

想必你有注意到pattern的格式了吧,为什么是这种格式,后面会讲到

你也可以加上输出到文件的日志配置,做一个备份,关于输出到文件这里我就不具体描述了

2、logstash的配置
input {
    rabbitmq {
        host => "ip"
        port => port
        user => admin
        password => admin
        durable => true
        queue => "logstash-queue"
        codec => plain
        type => appLog
    }
} 

# 过滤、格式化数据
filter {
    mutate{
    # 日志格式使用特殊字符隔开,便于格式化
    # 这里格式化为数组
    split => ["message","^@$^"]
        add_field =>   {
            "appName" => "%{[message][0]}"
        }
        add_field =>   {
            "env" => "%{[message][1]}"
        }
        add_field =>   {
            "ip" => "%{[message][2]}"
        }
        add_field =>   {
            "logTime" => "%{[message][3]}"
        }
        add_field =>   {
            "thread" => "%{[message][4]}"
        }
        add_field =>   {
            "level" => "%{[message][5]}"
        }
        add_field =>   {
            "class" => "%{[message][6]}"
        }
        add_field =>   {
            "msg" => "%{[message][7]}"
        }
    }
    
    # 你还可以使用下面的日期格式化将日志的时间覆盖es的@timestamp
    #date {
    #    match => ["logTime", "ISO8601"]
    #    target => "@timestamp"
    #}
    
    # 移除你不想推给es的字段
    mutate  {
        remove_field => ["@version", "message"]
    }
}   

# es推送配置
output {
	# 判断如果是不规范的数据,同一推送到other索引下
    if [type] == "appLog" and [appName] != "" and [env] != ""{
        elasticsearch {
            hosts => "10.1.59.128:9200"
            index => "%{env}_%{appName}_log_%{+YYYY.MM.dd}"
        }
    } else {
        elasticsearch {
            hosts => "10.1.59.128:9200"
            index => "other_log_%{+YYYY.MM.dd}"
        }
    }
}

filter对日志使用mutate进行格式化,使用特殊字符分割并set到对应的字段中,特殊字符对应logback中的配置,output中的判断[type] == “appLog” and [appName] != “” and [env] != “”,type对应input中的type,防止从其他地方输入的内容污染日志,后面两个主要是区分项目和环境,此时配置已经支持多项目多环境建立不同的索引了。

对日志内容的格式化有很多种方式,比如还可以使用ruby脚本或者gork匹配,logback的日志输出格式还可以配置成json格式,但是我测试发现日志有特殊字符时,会打乱json格式,导致logstash处理json出错。

如果你也想使用ruby脚本,可参考:

ruby {
        code => "
            message = event.get('message')
            jsonMsg = JSON.parse(message)
            event.set('appName', jsonMsg['appName'])
            event.set('ip', jsonMsg['ip'])
            event.set('thread', jsonMsg['thread'])
            event.set('level', jsonMsg['level'])
            event.set('logger', jsonMsg['logger'])
            event.set('trace', jsonMsg['trace'])
            event.set('msg', jsonMsg['msg'])
            event.set('timeTmp', jsonMsg['logTime'])
        "
    }
3、启动与检查

此时启动logstash和springboot项目,会发现mq队列和交换机已经创建好了,有可能交换机和队列没有关联,只需要手动关联一次即可。关联完成以后可以请求一次然后去kibana可视化查看日志。如何查看和创建索引参考https://blog.csdn.net/yyj12138/article/details/124499741

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/904662.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-15
下一篇 2022-05-15

发表评论

登录后才能评论

评论列表(0条)

保存