使用ELK保存Syslog、Netflow日志和审计网络接口流量

使用ELK保存Syslog、Netflow日志和审计网络接口流量,第1张

使用ELK保存Syslog、Netflow日志和审计网络接口流量 简介

ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。后来新增了一个Beats,它是一个轻量级的日志收集处理工具(Agent),Beats占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。

  • Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
  • Logstash是开源的数据收集引擎。它可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。
  • Kibana是一个开源的分析与可视化平台,它可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。
  • Beats是一个轻量级日志采集器,早期的ELK架构中使用Logstash收集、解析日志,但是Logstash对内存、cpu、io等资源消耗比较高。相比Logstash,Beats所占系统的CPU和内存几乎可以忽略不计。Beats集合有7个成员工具,其中Packetbeat是负责收集网络流量日志的。
一、下载

Logstash:https://www.elastic.co/cn/downloads/logstash
Elasticsearch:https://www.elastic.co/cn/downloads/elasticsearch
Kibana:https://www.elastic.co/cn/downloads/kibana
Beats:https://www.elastic.co/cn/downloads/beats/
Beats-Packetbeat:https://www.elastic.co/cn/downloads/beats/packetbeat

二、环境
  • 系统:openEuler 20.03 LTS SP2
  • 网卡1:ens33
  • IP地址:172.25.53.160/24
  • 网卡2:ens37
  • IP地址:无 作为审计接口
三、 安装Logstash、Elasticsearch、Kibana
# 创建目录
mkdir -p /opt/softs
# 进入软件包目录
cd /opt/softs
# 上传安装包或者下载安装包
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.15.2-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.15.2-x86_64.rpm
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.15.2-x86_64.rpm
# 安装logstash
rpm -ivh logstash-7.15.2-x86_64.rpm
# 安装elasticsearch
rpm -ivh elasticsearch-7.15.2-x86_64.rpm
# 安装kibana
rpm -ivh kibana-7.15.2-x86_64.rpm
四、配置Elasticsearch
# 移动数据目录
mv /var/lib/elasticsearch /opt/
# 修改配置文件
vi /etc/elasticsearch/elasticsearch.yml
  • elasticsearch.yml参考
# 数据目录
path.data: /opt/elasticsearch
# 日志目录
path.logs: /var/log/elasticsearch
# 集群名称
cluster.name: "cluster01"
# 集群模式:单节点
discovery.type:  "single-node"
# 绑定IP为所有IP
network.bind_host: 0.0.0.0
# 开启安全管理
xpack.security.enabled: true
# 启动elasticsearch服务
systemctl start elasticsearch.service
# 设置密码
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive

内置用户名:

  • elastic 超级用户
  • apm_system APM监控用户
  • kibana_system Kibana用户
  • logstash_system Logstash用户
  • beats_system Beats用户
  • remote_monitoring_user 远程监控用户
五、配置Logstash
# 移动logstash目录
mv /var/lib/logstash /opt/
# 修改服务配置文件
vi /etc/logstash/logstash.yml
  • logstash.yml参考
# 数据目录
path.data: /opt/logstash
# 日志目录
path.logs: /var/log/logstash
# 创建日志配置文件
touch /etc/logstash/conf.d/syslog.conf
# 编辑日志配置文件
vi /etc/logstash/conf.d/syslog.conf
  • syslog.conf参考
# 日志输入
input {
  # 监听TCP UDP 1514端口接收syslog日志
  syslog  {
    type => syslog
    port => 1514
    timezone => "Asia/Shanghai"
  }
  # 监听TCP UDP 2055端口接收netflow日志
  syslog  {
    type => netflow
    port => 2055
    codec => netflow
    timezone => "Asia/Shanghai"
  }
}

# 日志处理过滤
filter  {
  # 判断日志输入部分类型为syslog的日志
  if [type] == "syslog"{
    # grok过滤插件 主要用来提取字段内的内容生成新字段 具体参考下文grok插件章节
    grok {
      match =>{
        "message" =>".*source-ip=%{IPV4:src_ip}.*source-port=%{POSINT:src_port}.*destination-ip=%{IPV4:dst_ip}.*destination-port=%{POSINT:dst_port}.*time=(?
  • grok插件

grok插件会根据内置正则或者自定义正则提取字段内的内容生成新字段。
官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

例子
原始数据:source-ip=192.168.0.1 source-port=12345
表达式:source-ip=%{IPV4:src_ip}s+source-port=%{POSINT:src_port}
会提取IP地址192.168.0.1放入src_ip字段
会提取12345端口放入src_port字段
其中IPV4和POSINT为预定义规则

  • grok内置规则
    参考:https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
base10NUM (?[+-]?(?:(?:[0-9]+(?:.[0-9]+)?)|(?:.[0-9]+)))
NUMBER (?:%{base10NUM})
base16NUM (?(?"(?>\.|[^\"]+)+"|""|(?>'(?>\.|[^\']+)+')|''|(?>`(?>\.|[^\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}

# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMonMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]d|1dd|[1-9]?d)(.(25[0-5]|2[0-4]d|1dd|[1-9]?d)){3}))|:)))(%.+)?
IPV4 (?/(?>[w_%!$@:.,-]+|\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\)(?:\[^\?*]*)+
URIPROTO [A-Za-z]+(+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_-]*)+
#URIPARAM ?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM ?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?-[]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

# Months: January, Feb, 3, 03, 12, December
MonTH b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)b
MonTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MonTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

# Years?
YEAR (?>dd){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}

# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[w._/%-]+)
SYSLOGPROG %{PROG:program}(?:[%{POSINT:pid}])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

# Shortcuts
QS %{QUOTEDSTRING}

# Log formats
SYSLOGbase %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMonAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{data:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

# Log Levels
LOGLEVEL ([Aa]lert|alert|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
  • date插件
    date插件主要用来提取字段内的日期时间转换成相应的格式存入某些字段,缺省存入@timestamp字段。
    官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html
  • date内置规则
字符释义例子ISO8601解析任何有效的ISO8601时间2011-04-19T03:44:01.103ZUNIX解析浮点或整形10位时间戳1326149001UNIX_MS解析整形13位时间戳1326149001000TAI64N解析TAI64N时间yyyy4位年份2021yy2位年份21M1位或2位月份2MM2位月份02MMM英文缩写月份FebMMMM完整的月份Februaryd1位或2位日5dd2位日05H1位或2位小时9HH2位小时09m1位或2位分钟8mm2位分钟08s1位或2位秒6ss2位秒06S十分之一秒4SS百分之一秒40SSS千分之一秒400Z时区-0700ZZ时区-07:00ZZZ时区标识Asia/Shanghaiw一年中的第几周1ww2位一年中的第几周01D一年中的第几天245e星期几7E,EE,EEE英文缩写星期几SunEEEE英文全称星期几Tuesday
# 测试配置文件是否有错误
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/syslog.conf --config.test_and_exit
# 临时关闭防火墙
systemctl stop firewalld
# 取消/etc/logstash/conf.d/syslog.conf配置文件内输出部分的控制台输出注释
# 以配置文件热加载的形式临时启动 测试是否输入输出日志
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/syslog.conf --config.reload.automatic
  • 使用telnet或其他调试工具发送tcp或udp包到1514端口测试是否正常输入输出

测试数据:

<123>Dec 8 2021 06:10:48 USG6600E:vsys=public, protocol=6, source-ip=172.16.1.2, source-port=63354, destination-ip=172.16.1.1, destination-port=80, time=2021/12/8 14:10:48.

# 成功解析后Ctrl+C关闭临时启动
# /etc/logstash/conf.d/syslog.conf配置文件内输出部分的控制台输出注释掉
# 启动logstash服务
systemctl start logstash.service
六、配置Kibana
# 修改配置文件
vi /etc/kibana/kibana.yml
  • kibana.yml参考
# 监听端口
server.port: 5601
# 绑定IP
server.host: "0.0.0.0"
# elasticsearch用户名
elasticsearch.username: "kibana_system"
# elasticsearch密码
elasticsearch.password: "用户密码"
# 开启中文
i18n.locale: "zh-CN"
# 设置URL全称
server.publicbaseUrl: "http://172.25.53.160:5601"

# 启动服务
systemctl start kibana.service
七、配置防火墙
# 开启防火墙
systemctl start firewalld
# 放行端口
firewall-cmd --zone=public --add-port=1514/udp --permanent
firewall-cmd --zone=public --add-port=1514/tcp --permanent
firewall-cmd --zone=public --add-port=2055/udp --permanent
firewall-cmd --zone=public --add-port=2055/tcp --permanent
firewall-cmd --zone=public --add-port=9200/tcp --permanent
firewall-cmd --zone=public --add-port=9300/tcp --permanent
firewall-cmd --zone=public --add-port=5601/tcp --permanent
# 重载防火墙
firewall-cmd --reload
  • 如果需要从tcp 80端口访问kibana需要配置端口转发(1514转发到514同理)
  • iptables防火墙端口转发
# tcp80端口转发到5601
iptables -t nat -A PREROUTING -i ens33 -p tcp --dport 80 -j REDIRECT --to-port 5601
# 保存配置
iptables-save
八、登录Kibana
  • 浏览器访问 http://172.25.53.160:5601 输入用户名elastic密码******登录
  • 点击自己浏览
  • 使用telnet或其他调试工具发送tcp或udp包到1514端口测试是否正常输入输出

测试数据:

<123>Dec 8 2021 06:10:48 USG6600E:vsys=public, protocol=6, source-ip=172.16.1.2, source-port=63354, destination-ip=172.16.1.1, destination-port=80, time=2021/12/8 14:10:48.

  • 左侧菜单栏点击 Management - Stack Management
  • 左侧二级菜单点击 数据 - 索引管理
  • 可以看到已经自动创建索引syslog-xxxxxx
  • 由于是单节点部署不满足副本分片1所以修改副本分片为0所以状态为黄色
  • 点击 索引名 - 编辑 修改index.number_of_replicas的值为0后点击保存 状态恢复为绿色
  • 点击左侧二级菜单 Kibana - 索引模式
  • 创建索引模式
  • 输入名称syslog-*匹配所有syslog日志
  • 时间戳字段选择 @timestamp
  • 点击创建索引模式

  • 一级菜单Analytics - Discover可以查看日志数据

  • 点击一级菜单 Observability - 日志
  • 点击右上角的设置按钮
  • 点击 使用Kibana索引模式
  • 选择之前创建的索引模式
  • 根据配置日志的列
  • 应用设置

  • 点击二级菜单 Logs - Stream可以查看、搜索日志

九、审计网络接口流量 9.1 安装Packetbeat
# 创建目录
mkdir -p /opt/softs
# 进入软件包目录
cd /opt/softs
# 上传安装包或者下载安装包
wget https://artifacts.elastic.co/downloads/beats/packetbeat/packetbeat-7.16.0-x86_64.rpm
# 安装packetbeat
rpm -ivh packetbeat-7.16.0-x86_64.rpm
9.2 配置Packetbeat
# 修改配置文件
vi /etc/packetbeat/packetbeat.yml
  • packetbeat.yml参考
……
# 配置审计网卡接口 可以是all
packetbeat.interfaces.device: ens37
……
# 对接kibana
# =================================== Kibana ===================================
  host: "localhost:5601"
……
# 对接elasticsearch
# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # elasticsearch地址和端口
  hosts: ["localhost:9200"]
  # 用户名
  username: "elastic"
  # 密码
  password: "用户密码"
……
# 启动packetbeat服务
systemctl start packetbeat.service
# 切换至packetbeat目录
cd /usr/share/packetbeat/bin
# 添加面板
packetbeat setup --dashboards
  • 登录Kibana 一级菜单 Analytics - Dashboard 可以查看Packetbeat自带的面板

  • [Packetbeat] Overview ECS面板

十、设置开机自动启动
# 开机自动启动elasticsearch
systemctl enable elasticsearch.service
# 开机自动启动logstash
systemctl enable logstash.service
# 开机自动启动kibana
systemctl enable kibana.service
# 开机自动启动packetbeat
systemctl enable packetbeat.service

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5656413.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存