网络安全系列-十七: Mothra 用于处理网络流量数据的库和工具的入门示例

网络安全系列-十七: Mothra 用于处理网络流量数据的库和工具的入门示例,第1张

什么是Mothra?

Mothra是Apache Spark大型数据分析引擎中用于处理网络流量数据的库和工具的集合
Mothra被设计用来存储和处理大量的网络流数据以及支持的信息集合

  • 库提供的功能:
    • 用于读取IPFIX和SiLK流量数据以及一些额外的SiLK数据文件的Apache Spark数据源
    • 提供有用的功能,用于处理网络数据中常见的信息,例如IP地址、TCP标志、端口号等。
  • 提供的工具包括:
    • 将IPFIX和SiLK数据加载到HDFS存储以供后续分析
    • 在加载IPFIX数据时进行分区以支持更有效的查询

IPFIX和SiLK流记录(见下文)通常以所谓的“五元组”为中心:

  • source IP address
  • destination IP address
  • source port
  • destination port
  • transport layer protocol
在spark-shell中的初体验

下载mothra的jar包及样例数据

  • mothra下载地址是:mothra download
  • 样例数据下载地址:fccx-sample.ipfix
spark-shell --jars mothra-1.5.1-bin/mothra_2.11-1.5.1-full.jar
import org.cert.netsa.mothra.datasources._
val df = spark.read.ipfix("file:///usr/local/zeppelin-0.10.0-bin-all/fccx-sample.ipfix")
df.show(1,0,true)
df.printSchema()
org.cert.netsa.util.versionInfo("mothra")
org.cert.netsa.util.versionInfo.detail("mothra")

读取数据 读取ipfix数据

ipfix的数据可以通过 yaf 产生,
注意:yaf.conf中需要开启如下2项配置:

  • YAF_ROTATE_LOCATION=
    If present, and YAF_IPFIX_PROTO is not present, write IPFIX files to the given file directory
  • YAF_ROTATE_TIME=30
    Rotate time. If present, and YAF_ROTATE_LOCATION is present, rotate files every YAF_ROTATE_TIME seconds. Default is 120.

如何读取ipfix数据,详见datasources ipfix

注意:

  • IPFIXFields.everything是显示IPFIX中的所有属性信息
  • IPFIXFields 的用法详见与IPFIX数据源相关的字段集合定义,主要的属性集如下:
    • core: Core IPFIX flow fields, including time and the 5-tuple.
    • default: Default flow fields, including core fields, label fields, counts, and TCP flags, all as described above.
    • everything: Accessible collection of all fields useful for working with YAF3 in particular.
import org.cert.netsa.mothra.datasources._
import org.cert.netsa.mothra.datasources.ipfix.IPFIXFields
val df = spark.read.fields(IPFIXFields.everything).ipfix("file:///usr/local/zeppelin-0.10.0-bin-all/yaf-data/*")
df.show(1,0,true)
df.printSchema()

import spark.implicits._
df.select("*").limit(10).show()

df.select("startTime","endTime","sourceIPAddress","sourcePort","destinationIPAddress","destinationPort","protocolIdentifier","packetCount","octetCount").where($"octetCount" > 10).limit(10).show()

df.write.json("file:///usr/local/zeppelin-0.10.0-bin-all/json1")
读取silk数据

如何读取ipfix数据,详见datasources silk

SiLK属性集定义参见SilkFields

import org.cert.netsa.mothra.datasources._
import org.cert.netsa.mothra.datasources.silk.flow.SilkFields
val df = spark.read.fields(SilkFields.all).silkFlow("file:///usr/local/zeppelin-0.10.0-bin-all/silk-data")
df.show(1,0,true)
df.printSchema()

# 统计协议分布
df.createOrReplaceTempView("t_netflow")
spark.sql("select protocol,count(*) as total from t_netflow group by protocol").show()

# 按来源IP统计
spark.sql("select sIP,count(*) as total from t_netflow group by sIP order by total desc limit 10").show()

# 按目的IP统计
spark.sql("select dIP,count(*) as total from t_netflow group by dIP order by total desc limit 10").show()

注意:mothra读取silk 仓库,产生的dataframe的schema如下图:

遇到的坑 读取silk数据产生SilkDataFormatException

spark读取silk数据产生dataframe,统计数量时,报SilkDataFormatException: Magic number does not match,
查看org.cert.netsa.io.silk.Header.scala源码 294行,原因是silk文件的前4位不等于magicNumber【magicNumber是0xdeadbeef】

  • 对应的源码如下:
private[this] def readHeader(in: DataInputStream): Header = {
    val magic = in.readInt()
    if ( magic != magicNumber ) {
      throw new SilkDataFormatException("Magic number does not match")
    }
    val fileFlags = in.readByte()
  • 解决方案:
    SiLK仓库目录中包含silk.conf配置文件,需要将这个文件删除。

  • 详细报错信息:

scala> df.count()
[Stage 7:>                                                          (0 + 1) / 1]22/04/21 10:33:14 ERROR SilkFileRelation: Failure opening stream for 'file:/usr/local/zeppelin-0.10.0-bin-all/silk-data/silk.conf'
org.cert.netsa.io.silk.SilkDataFormatException: Magic number does not match
        at org.cert.netsa.io.silk.Header$.readHeader(Header.scala:294)
        at org.cert.netsa.io.silk.Header$.readFrom(Header.scala:283)
        at org.cert.netsa.io.silk.RWRecReader$.ofInputStream(RWRecReader.scala:123)
        at org.cert.netsa.mothra.datasources.silk.flow.SilkFileRelation$$anonfun$buildScan$$anonfun$apply$$anonfun$apply.apply(SilkFileRelation.scala:97)
        at org.cert.netsa.mothra.datasources.silk.flow.SilkFileRelation$$anonfun$buildScan$$anonfun$apply$$anonfun$apply.apply(SilkFileRelation.scala:97)
        at scala.util.Try$.apply(Try.scala:192)
        at org.cert.netsa.mothra.datasources.silk.flow.SilkFileRelation$$anonfun$buildScan$$anonfun$apply.apply(SilkFileRelation.scala:96)
        at org.cert.netsa.mothra.datasources.silk.flow.SilkFileRelation$$anonfun$buildScan$$anonfun$apply.apply(SilkFileRelation.scala:88)
        at scala.collection.Iterator$$anon.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
res8: Long = 52052

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/736437.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-28
下一篇 2022-04-28

发表评论

登录后才能评论

评论列表(0条)

保存