前面我在https://blog.csdn.net/yyj12138/article/details/124499741中有讲到如何来搭建ELK,这次来谈谈如何将springboot项目的日志推到ES管理。
我们知道一般来说稍微大一点的项目都不会只部署一台机器,那么分布式情况下对日志的管理就比较麻烦,没办法每次排查问题都去各服务器筛选一遍,那样太麻烦了。
logstash为我们提供了file(文件监控)、redis(从redis接收)、mq(从mq接收)、tcp/http(接收tcp或http协议的数据)、beats等等
其中文件监控和tcp/http接收不建议使用,性能不是很好,这里我介绍一下从rabbitMq接收日志。
1、springboot工程的配置1)引入依赖
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
2)logback-spring.xml配置
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<define name="ip" class="com.example.boot.config.IpPropertyDefiner"/>
<springProperty scope="context" name="env" source="spring.profiles.active"/>
<springProperty scope="context" name="appName" source="spring.application.name"/>
<property name="key" value="^@$^"/>
<property name="log.level" value="info"/>
<property name="logstash.pattern" value="${appName}${key}${env}${key}${ip}${key}%d{ISO8601}${key}%t${key}%p${key}%c${key}%m"/>
<appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
<layout>
<pattern>${logstash.pattern}pattern>
layout>
<host>iphost>
<port>portport>
<username>adminusername>
<password>adminpassword>
<applicationId>dev-bootapplicationId>
<routingKeyPattern>logstash-rkproutingKeyPattern>
<declareExchange>truedeclareExchange>
<exchangeType>directexchangeType>
<exchangeName>logstash-exchangeexchangeName>
<generateId>truegenerateId>
<charset>UTF-8charset>
<durable>truedurable>
<deliveryMode>PERSISTENTdeliveryMode>
appender>
<appender name="ASYNC_ES_INFO" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0discardingThreshold>
<queueSize>512queueSize>
<appender-ref ref="AMQP"/>
appender>
<springProfile name="test">
<root level="${log.level}">
<appender-ref ref="ASYNC_ES_INFO"/>
root>
springProfile>
configuration>
这里为了获取当前服务器的ip编写了一个类IpPropertyDefiner,为了在分布式环境下能定位到日志来自哪台机器
@Slf4j
public class IpPropertyDefiner extends PropertyDefinerBase {
@Override
public String getPropertyValue() {
InetAddress ia;
try {
ia = InetAddress.getLocalHost();
return ia.getHostAddress();
} catch (UnknownHostException e) {
log.error("日志属性ip获取出错");
}
return "";
}
}
想必你有注意到pattern的格式了吧,为什么是这种格式,后面会讲到
你也可以加上输出到文件的日志配置,做一个备份,关于输出到文件这里我就不具体描述了
2、logstash的配置input {
rabbitmq {
host => "ip"
port => port
user => admin
password => admin
durable => true
queue => "logstash-queue"
codec => plain
type => appLog
}
}
# 过滤、格式化数据
filter {
mutate{
# 日志格式使用特殊字符隔开,便于格式化
# 这里格式化为数组
split => ["message","^@$^"]
add_field => {
"appName" => "%{[message][0]}"
}
add_field => {
"env" => "%{[message][1]}"
}
add_field => {
"ip" => "%{[message][2]}"
}
add_field => {
"logTime" => "%{[message][3]}"
}
add_field => {
"thread" => "%{[message][4]}"
}
add_field => {
"level" => "%{[message][5]}"
}
add_field => {
"class" => "%{[message][6]}"
}
add_field => {
"msg" => "%{[message][7]}"
}
}
# 你还可以使用下面的日期格式化将日志的时间覆盖es的@timestamp
#date {
# match => ["logTime", "ISO8601"]
# target => "@timestamp"
#}
# 移除你不想推给es的字段
mutate {
remove_field => ["@version", "message"]
}
}
# es推送配置
output {
# 判断如果是不规范的数据,同一推送到other索引下
if [type] == "appLog" and [appName] != "" and [env] != ""{
elasticsearch {
hosts => "10.1.59.128:9200"
index => "%{env}_%{appName}_log_%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => "10.1.59.128:9200"
index => "other_log_%{+YYYY.MM.dd}"
}
}
}
filter对日志使用mutate进行格式化,使用特殊字符分割并set到对应的字段中,特殊字符对应logback中的配置,output中的判断[type] == “appLog” and [appName] != “” and [env] != “”,type对应input中的type,防止从其他地方输入的内容污染日志,后面两个主要是区分项目和环境,此时配置已经支持多项目多环境建立不同的索引了。
对日志内容的格式化有很多种方式,比如还可以使用ruby脚本或者gork匹配,logback的日志输出格式还可以配置成json格式,但是我测试发现日志有特殊字符时,会打乱json格式,导致logstash处理json出错。
如果你也想使用ruby脚本,可参考:
ruby {
code => "
message = event.get('message')
jsonMsg = JSON.parse(message)
event.set('appName', jsonMsg['appName'])
event.set('ip', jsonMsg['ip'])
event.set('thread', jsonMsg['thread'])
event.set('level', jsonMsg['level'])
event.set('logger', jsonMsg['logger'])
event.set('trace', jsonMsg['trace'])
event.set('msg', jsonMsg['msg'])
event.set('timeTmp', jsonMsg['logTime'])
"
}
3、启动与检查
此时启动logstash和springboot项目,会发现mq队列和交换机已经创建好了,有可能交换机和队列没有关联,只需要手动关联一次即可。关联完成以后可以请求一次然后去kibana可视化查看日志。如何查看和创建索引参考https://blog.csdn.net/yyj12138/article/details/124499741
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)