ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。后来新增了一个Beats,它是一个轻量级的日志收集处理工具(Agent),Beats占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。
- Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
- Logstash是开源的数据收集引擎。它可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。
- Kibana是一个开源的分析与可视化平台,它可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。
- Beats是一个轻量级日志采集器,早期的ELK架构中使用Logstash收集、解析日志,但是Logstash对内存、cpu、io等资源消耗比较高。相比Logstash,Beats所占系统的CPU和内存几乎可以忽略不计。Beats集合有7个成员工具,其中Packetbeat是负责收集网络流量日志的。
Logstash:https://www.elastic.co/cn/downloads/logstash
Elasticsearch:https://www.elastic.co/cn/downloads/elasticsearch
Kibana:https://www.elastic.co/cn/downloads/kibana
Beats:https://www.elastic.co/cn/downloads/beats/
Beats-Packetbeat:https://www.elastic.co/cn/downloads/beats/packetbeat
- 系统:openEuler 20.03 LTS SP2
- 网卡1:ens33
- IP地址:172.25.53.160/24
- 网卡2:ens37
- IP地址:无 作为审计接口
# 创建目录 mkdir -p /opt/softs # 进入软件包目录 cd /opt/softs # 上传安装包或者下载安装包 wget https://artifacts.elastic.co/downloads/logstash/logstash-7.15.2-x86_64.rpm wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.15.2-x86_64.rpm wget https://artifacts.elastic.co/downloads/kibana/kibana-7.15.2-x86_64.rpm # 安装logstash rpm -ivh logstash-7.15.2-x86_64.rpm # 安装elasticsearch rpm -ivh elasticsearch-7.15.2-x86_64.rpm # 安装kibana rpm -ivh kibana-7.15.2-x86_64.rpm四、配置Elasticsearch
# 移动数据目录 mv /var/lib/elasticsearch /opt/ # 修改配置文件 vi /etc/elasticsearch/elasticsearch.yml
- elasticsearch.yml参考
# 数据目录 path.data: /opt/elasticsearch # 日志目录 path.logs: /var/log/elasticsearch # 集群名称 cluster.name: "cluster01" # 集群模式:单节点 discovery.type: "single-node" # 绑定IP为所有IP network.bind_host: 0.0.0.0 # 开启安全管理 xpack.security.enabled: true
# 启动elasticsearch服务 systemctl start elasticsearch.service # 设置密码 /usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive
内置用户名:
- elastic 超级用户
- apm_system APM监控用户
- kibana_system Kibana用户
- logstash_system Logstash用户
- beats_system Beats用户
- remote_monitoring_user 远程监控用户
# 移动logstash目录 mv /var/lib/logstash /opt/ # 修改服务配置文件 vi /etc/logstash/logstash.yml
- logstash.yml参考
# 数据目录 path.data: /opt/logstash # 日志目录 path.logs: /var/log/logstash
# 创建日志配置文件 touch /etc/logstash/conf.d/syslog.conf # 编辑日志配置文件 vi /etc/logstash/conf.d/syslog.conf
- syslog.conf参考
# 日志输入 input { # 监听TCP UDP 1514端口接收syslog日志 syslog { type => syslog port => 1514 timezone => "Asia/Shanghai" } # 监听TCP UDP 2055端口接收netflow日志 syslog { type => netflow port => 2055 codec => netflow timezone => "Asia/Shanghai" } } # 日志处理过滤 filter { # 判断日志输入部分类型为syslog的日志 if [type] == "syslog"{ # grok过滤插件 主要用来提取字段内的内容生成新字段 具体参考下文grok插件章节 grok { match =>{ "message" =>".*source-ip=%{IPV4:src_ip}.*source-port=%{POSINT:src_port}.*destination-ip=%{IPV4:dst_ip}.*destination-port=%{POSINT:dst_port}.*time=(?
- grok插件
grok插件会根据内置正则或者自定义正则提取字段内的内容生成新字段。
官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
例子
原始数据:source-ip=192.168.0.1 source-port=12345
表达式:source-ip=%{IPV4:src_ip}s+source-port=%{POSINT:src_port}
会提取IP地址192.168.0.1放入src_ip字段
会提取12345端口放入src_port字段
其中IPV4和POSINT为预定义规则
- grok内置规则
参考:https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME} INT (?:[+-]?(?:[0-9]+)) base10NUM (?[+-]?(?:(?:[0-9]+(?:.[0-9]+)?)|(?:.[0-9]+))) NUMBER (?:%{base10NUM}) base16NUM (?(?"(?>\.|[^\"]+)+"|""|(?>'(?>\.|[^\']+)+')|''|(?>`(?>\.|[^\`]+)+`)|``)) UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12} # Networking MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC}) CISCOMAC (?:(?:[A-Fa-f0-9]{4}.){2}[A-Fa-f0-9]{4}) WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) COMMonMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2}) IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:)))(%.+)? IPV4 (?/(?>[w_%!$@:.,-]+|\.)*)+ TTY (?:/dev/(pts|tty([pq])?)(w+)?/?(?:[0-9]+)) WINPATH (?>[A-Za-z]+:|\)(?:\[^\?*]*)+ URIPROTO [A-Za-z]+(+[A-Za-z+]+)? URIHOST %{IPORHOST}(?::%{POSINT:port})? # uripath comes loosely from RFC1738, but mostly from what Firefox # doesn't turn into %XX URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_-]*)+ #URIPARAM ?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)? URIPARAM ?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?-[]]* URIPATHPARAM %{URIPATH}(?:%{URIPARAM})? URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})? # Months: January, Feb, 3, 03, 12, December MonTH b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)b MonTHNUM (?:0?[1-9]|1[0-2]) MONTHNUM2 (?:0[1-9]|1[0-2]) MonTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) # Days: Monday, Tue, Thu, etc... DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?) # Years? YEAR (?>dd){1,2} HOUR (?:2[0123]|[01]?[0-9]) MINUTE (?:[0-5][0-9]) # '60' is a leap second in most time standards and thus is valid. SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR} ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})) ISO8601_SECOND (?:%{SECOND}|60) TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? DATE %{DATE_US}|%{DATE_EU} DATESTAMP %{DATE}[- ]%{TIME} TZ (?:[PMCE][SD]T|UTC) DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ} DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE} DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR} DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND} # Syslog Dates: Month Day HH:MM:SS SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} PROG (?:[w._/%-]+) SYSLOGPROG %{PROG:program}(?:[%{POSINT:pid}])? SYSLOGHOST %{IPORHOST} SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}> HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT} # Shortcuts QS %{QUOTEDSTRING} # Log formats SYSLOGbase %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: COMMonAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{data:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} # Log Levels LOGLEVEL ([Aa]lert|alert|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
- date插件
date插件主要用来提取字段内的日期时间转换成相应的格式存入某些字段,缺省存入@timestamp字段。
官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html - date内置规则
# 测试配置文件是否有错误 /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/syslog.conf --config.test_and_exit # 临时关闭防火墙 systemctl stop firewalld # 取消/etc/logstash/conf.d/syslog.conf配置文件内输出部分的控制台输出注释 # 以配置文件热加载的形式临时启动 测试是否输入输出日志 /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/syslog.conf --config.reload.automatic
- 使用telnet或其他调试工具发送tcp或udp包到1514端口测试是否正常输入输出
测试数据:
<123>Dec 8 2021 06:10:48 USG6600E:vsys=public, protocol=6, source-ip=172.16.1.2, source-port=63354, destination-ip=172.16.1.1, destination-port=80, time=2021/12/8 14:10:48.
# 成功解析后Ctrl+C关闭临时启动 # /etc/logstash/conf.d/syslog.conf配置文件内输出部分的控制台输出注释掉 # 启动logstash服务 systemctl start logstash.service六、配置Kibana
# 修改配置文件 vi /etc/kibana/kibana.yml
- kibana.yml参考
# 监听端口 server.port: 5601 # 绑定IP server.host: "0.0.0.0" # elasticsearch用户名 elasticsearch.username: "kibana_system" # elasticsearch密码 elasticsearch.password: "用户密码" # 开启中文 i18n.locale: "zh-CN" # 设置URL全称 server.publicbaseUrl: "http://172.25.53.160:5601"
# 启动服务 systemctl start kibana.service七、配置防火墙
# 开启防火墙 systemctl start firewalld # 放行端口 firewall-cmd --zone=public --add-port=1514/udp --permanent firewall-cmd --zone=public --add-port=1514/tcp --permanent firewall-cmd --zone=public --add-port=2055/udp --permanent firewall-cmd --zone=public --add-port=2055/tcp --permanent firewall-cmd --zone=public --add-port=9200/tcp --permanent firewall-cmd --zone=public --add-port=9300/tcp --permanent firewall-cmd --zone=public --add-port=5601/tcp --permanent # 重载防火墙 firewall-cmd --reload
- 如果需要从tcp 80端口访问kibana需要配置端口转发(1514转发到514同理)
- iptables防火墙端口转发
# tcp80端口转发到5601 iptables -t nat -A PREROUTING -i ens33 -p tcp --dport 80 -j REDIRECT --to-port 5601 # 保存配置 iptables-save八、登录Kibana
- 浏览器访问 http://172.25.53.160:5601 输入用户名elastic密码******登录
- 点击自己浏览
- 使用telnet或其他调试工具发送tcp或udp包到1514端口测试是否正常输入输出
测试数据:
<123>Dec 8 2021 06:10:48 USG6600E:vsys=public, protocol=6, source-ip=172.16.1.2, source-port=63354, destination-ip=172.16.1.1, destination-port=80, time=2021/12/8 14:10:48.
- 左侧菜单栏点击 Management - Stack Management
- 左侧二级菜单点击 数据 - 索引管理
- 可以看到已经自动创建索引syslog-xxxxxx
- 由于是单节点部署不满足副本分片1所以修改副本分片为0所以状态为黄色
- 点击 索引名 - 编辑 修改index.number_of_replicas的值为0后点击保存 状态恢复为绿色
- 点击左侧二级菜单 Kibana - 索引模式
- 创建索引模式
- 输入名称syslog-*匹配所有syslog日志
- 时间戳字段选择 @timestamp
- 点击创建索引模式
- 一级菜单Analytics - Discover可以查看日志数据
- 点击一级菜单 Observability - 日志
- 点击右上角的设置按钮
- 点击 使用Kibana索引模式
- 选择之前创建的索引模式
- 根据配置日志的列
- 应用设置
- 点击二级菜单 Logs - Stream可以查看、搜索日志
# 创建目录 mkdir -p /opt/softs # 进入软件包目录 cd /opt/softs # 上传安装包或者下载安装包 wget https://artifacts.elastic.co/downloads/beats/packetbeat/packetbeat-7.16.0-x86_64.rpm # 安装packetbeat rpm -ivh packetbeat-7.16.0-x86_64.rpm9.2 配置Packetbeat
# 修改配置文件 vi /etc/packetbeat/packetbeat.yml
- packetbeat.yml参考
…… # 配置审计网卡接口 可以是all packetbeat.interfaces.device: ens37 …… # 对接kibana # =================================== Kibana =================================== host: "localhost:5601" …… # 对接elasticsearch # ---------------------------- Elasticsearch Output ---------------------------- output.elasticsearch: # elasticsearch地址和端口 hosts: ["localhost:9200"] # 用户名 username: "elastic" # 密码 password: "用户密码" ……
# 启动packetbeat服务 systemctl start packetbeat.service # 切换至packetbeat目录 cd /usr/share/packetbeat/bin # 添加面板 packetbeat setup --dashboards
- 登录Kibana 一级菜单 Analytics - Dashboard 可以查看Packetbeat自带的面板
- [Packetbeat] Overview ECS面板
# 开机自动启动elasticsearch systemctl enable elasticsearch.service # 开机自动启动logstash systemctl enable logstash.service # 开机自动启动kibana systemctl enable kibana.service # 开机自动启动packetbeat systemctl enable packetbeat.service
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)