同时创建一样的开头名字的topic,logstash可通过正侧匹配,再通过if判断不同的过滤创建不同的索引,如下
filebeat配置:
filebeat.inputs:
- input_type: log
paths:
- /alipay-health.log
fields:
host.name: 192.168.10.10
fields_under_root: true
serv: java
tags: test
- input_type: log
paths:
- /nginx.log
fields:
host.name: 192.168.10.10
fields_under_root: true
serv: nginx
tags: test
- input_type: log
paths:
- /error.log
fields:
host.name: 192.168.10.10
fields_under_root: true
serv: nginxerror
tags: test
#----------------------------- kafka output --------------------------------
output.kafka:
enabled: true
hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"]
topic: "elk-filebeat"
apm-server配置:找到 out kafka模块的地方
#------------------------------ Kafka output ------------------------------
output.kafka:
# Boolean flag to enable or disable the output module.
enabled: true
hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"]
topic: "elk-apm"
接下来就是logstash接收传入es了,配置如下:
input {
kafka {
codec => "json"
bootstrap_servers => "kafka1:9092","kafka2:9092","kafka3:9092"
topics_pattern => "elk-.*"
consumer_threads => 12
group_id => "logstash123"
}
}
filter {
if [@metadata][beat] == "apm" {
if [processor][event] == "sourcemap" {
mutate {
add_field => { "[@metadata][index]" => "%{[@metadata][beat]}-%{[@metadata][version]}-%{[processor][event]}" }
}
} else {
mutate {
add_field => { "[@metadata][index]" => "%{[@metadata][beat]}-%{[@metadata][version]}-%{[processor][event]}-%{+yyyy.MM.dd}" }
}
}
}
if [fields][service] == "java"{
mutate {
remove_field => ["@version"]
remove_field => ["agent"]
remove_field => ["ecs"]
remove_field => ["log"]
remove_field => ["tags"]
remove_field => ["host"]
}
grok {
patterns_dir => [ "/opt/patterns" ]
match => { "message" => "%{LEVEL:level1} %{JAVALOGMESSAGE:doc}" }
}
#在目录下/opt/patterns创建grok规则
#创建文件名nginx
#内容添加内容放在最后贴出
date {
match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
remove_field => "timestamp"
# remove_field => "message"
}
}
if [fields][service] == "nginx"{
grok {
patterns_dir => [ "/opt/patter" ]
match => { "message" => "%{NGINXACC}" }
}
#grok匹配鬼册同上
mutate {
convert => [ "elapsed", "float" ]
convert => [ "serverelapsed", "float" ]
}
}
}
output {
if [fields][service] == "java"{
elasticsearch {
hosts => ["es1:9200/"]
index => "javafilebeat-%{+YYYY.MM.dd}"
user => "elastic"
password => "123"
}
}
if [fields][service] == "nginx"{
elasticsearch {
hosts => ["es1:9200/"]
index => "nginxfilebeat-%{+YYYY.MM.dd}"
user => "elastic"
password => "123
}
}
if [fields][service] == "nginxerror"{
elasticsearch {
hosts => ["es1:9200/"]
index => "ngerror-%{+YYYY.MM.dd}"
user => "elastic"
password => "123"
}
}
if [@metadata][beat] == "apm"{
elasticsearch {
hosts => ["es1:9200/"]
index => "%{[@metadata][index]}"
user => "elastic"
password => "123"
}
}
}
创建/opt/patterns/java 日志用的log4j 只匹配了日志等级
LEVEL (INFO)|(DEBUG)|(ERROR)|(WARN)|(FATAL)
创建/opt/patter/nginx nginx请设置nginx格式
WZ ([^ ]*)
URIPARAM [A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]*
NGINXACC %{IP:remote_ip} \- \- \[%{HTTPDATE:timestamp}\] "%{WORD:method} (%{URIPATH:request}|-|) (%{URIPARAM:requestParam}|-)" %{NUMBER:status} %{NUMBER:bytes} %{QS:referer} %{QS:agent} %{NUMBER:elapsed} %{NUMBER:serverelapsed} %{QS:xforward}
nginx日志格式
log_format logstash '$remote_addr - $remote_user [$time_local] "$request_method $uri $query_string" $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_time $upstream_response_time "$http_x_forwarded_for" ';
#不要调整格式直接复制,可以用kibana里面的开发工具,调试grok
因为我们生产需要往各种系统传日志所以两种日志,看看就好
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" ""$server_addr"" '
'"$request_length" "$request_time" ';
log_format logstash '$remote_addr - $remote_user [$time_local] "$request_method $uri $query_string" $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_time $upstream_response_time "$http_x_forwarded_for" ';
access_log /var/log/nginx/access.log main;
access_log /var/log/nginx/nginx.log logstash;
ps:
迁移过一次整体elk环境,出现了一个问题配置kibana apm模块提示
问题如下:
search_phase_execution_exception: [illegal_argument_exception] Reason: Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. Please use a keyword field instead. Alternatively, set fielddata=true on [service.environment] in order to load field data by uninverting the inverted index. Note that this can use significant memory. (500)
解决办法:
不通过kafka直接传一下数据,在使用kafka问题可以解决
。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)