elastic apm与filebeat 同时通过kafka传入logstash

elastic apm与filebeat 同时通过kafka传入logstash,第1张

同时创建一样的开头名字的topic,logstash可通过正侧匹配,再通过if判断不同的过滤创建不同的索引,如下
filebeat配置:

filebeat.inputs:
- input_type: log
  paths: 
     - /alipay-health.log
  fields: 
    host.name: 192.168.10.10
    fields_under_root: true   
    serv: java
    tags: test
- input_type: log
  paths:
    - /nginx.log
  fields:
    host.name: 192.168.10.10
    fields_under_root: true 
    serv: nginx
    tags: test
- input_type: log
  paths:
    - /error.log
  fields:
    host.name: 192.168.10.10
    fields_under_root: true
    serv: nginxerror
    tags: test
#----------------------------- kafka output --------------------------------
output.kafka:
  enabled: true
  hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"]
  topic: "elk-filebeat"

apm-server配置:找到 out kafka模块的地方

#------------------------------ Kafka output ------------------------------
output.kafka:
  # Boolean flag to enable or disable the output module.
  enabled: true
  hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"]
  topic: "elk-apm"

接下来就是logstash接收传入es了,配置如下:

input {
  kafka {
    codec => "json"
    bootstrap_servers => "kafka1:9092","kafka2:9092","kafka3:9092"
    topics_pattern => "elk-.*"
    consumer_threads => 12
    group_id => "logstash123"
  }
}
filter {
 if [@metadata][beat] == "apm" {
        if [processor][event] == "sourcemap" {
            mutate {
                add_field => { "[@metadata][index]" => "%{[@metadata][beat]}-%{[@metadata][version]}-%{[processor][event]}" } 
            }
        } else {
            mutate {
                add_field => { "[@metadata][index]" => "%{[@metadata][beat]}-%{[@metadata][version]}-%{[processor][event]}-%{+yyyy.MM.dd}" } 
            }
        }
    }
     if [fields][service] == "java"{
      mutate {
            remove_field => ["@version"]
            remove_field => ["agent"]
            remove_field => ["ecs"]
            remove_field => ["log"]
            remove_field => ["tags"]
            remove_field => ["host"]
       }
    grok {
        patterns_dir => [ "/opt/patterns" ]
        match => { "message" => "%{LEVEL:level1} %{JAVALOGMESSAGE:doc}" }
    }
#在目录下/opt/patterns创建grok规则
#创建文件名nginx
#内容添加内容放在最后贴出       
        date {
                match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                remove_field => "timestamp"
     #           remove_field => "message"
        }
}
     if [fields][service] == "nginx"{
    grok {
        patterns_dir => [ "/opt/patter" ]
        match => { "message" => "%{NGINXACC}" }
    }
#grok匹配鬼册同上
    mutate {
      convert => [ "elapsed", "float" ]
      convert => [ "serverelapsed", "float" ]
    }  

}
}

output {
      if [fields][service] == "java"{
        elasticsearch {
                hosts => ["es1:9200/"]
                index => "javafilebeat-%{+YYYY.MM.dd}"
                user => "elastic"
                password => "123"
       }
      }
      if [fields][service] == "nginx"{
        elasticsearch {
                hosts => ["es1:9200/"]
                index => "nginxfilebeat-%{+YYYY.MM.dd}"
                user => "elastic"
                password => "123
      }
      }
      if [fields][service] == "nginxerror"{
        elasticsearch {
                hosts => ["es1:9200/"]
                index => "ngerror-%{+YYYY.MM.dd}"
                user => "elastic"
                password => "123"
      }
      }
      if [@metadata][beat] == "apm"{
        elasticsearch {
                hosts => ["es1:9200/"]
                index => "%{[@metadata][index]}"
                user => "elastic"
                password => "123"

}
}
}

创建/opt/patterns/java 日志用的log4j 只匹配了日志等级

LEVEL (INFO)|(DEBUG)|(ERROR)|(WARN)|(FATAL)

创建/opt/patter/nginx nginx请设置nginx格式

WZ ([^ ]*)
URIPARAM [A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]*
NGINXACC %{IP:remote_ip} \- \- \[%{HTTPDATE:timestamp}\] "%{WORD:method} (%{URIPATH:request}|-|) (%{URIPARAM:requestParam}|-)" %{NUMBER:status} %{NUMBER:bytes} %{QS:referer} %{QS:agent} %{NUMBER:elapsed} %{NUMBER:serverelapsed} %{QS:xforward}

nginx日志格式

log_format  logstash  '$remote_addr - $remote_user [$time_local] "$request_method $uri $query_string" $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_time $upstream_response_time "$http_x_forwarded_for" ';
#不要调整格式直接复制,可以用kibana里面的开发工具,调试grok
因为我们生产需要往各种系统传日志所以两种日志,看看就好

log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                  '$status $body_bytes_sent "$http_referer" '
                  '"$http_user_agent" ""$server_addr"" '
                  '"$request_length" "$request_time" ';

log_format  logstash  '$remote_addr - $remote_user [$time_local] "$request_method $uri $query_string" $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_time $upstream_response_time "$http_x_forwarded_for" ';

access_log  /var/log/nginx/access.log  main;
access_log  /var/log/nginx/nginx.log logstash;

ps:
迁移过一次整体elk环境,出现了一个问题配置kibana apm模块提示
问题如下:
search_phase_execution_exception: [illegal_argument_exception] Reason: Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. Please use a keyword field instead. Alternatively, set fielddata=true on [service.environment] in order to load field data by uninverting the inverted index. Note that this can use significant memory. (500)
解决办法:
不通过kafka直接传一下数据,在使用kafka问题可以解决

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/723695.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存