Mothra是Apache Spark大型数据分析引擎中用于处理网络流量数据的库和工具的集合
Mothra被设计用来存储和处理大量的网络流数据以及支持的信息集合
- 库提供的功能:
- 用于读取IPFIX和SiLK流量数据以及一些额外的SiLK数据文件的Apache Spark数据源
- 提供有用的功能,用于处理网络数据中常见的信息,例如IP地址、TCP标志、端口号等。
- 提供的工具包括:
- 将IPFIX和SiLK数据加载到HDFS存储以供后续分析
- 在加载IPFIX数据时进行分区以支持更有效的查询
IPFIX和SiLK流记录(见下文)通常以所谓的“五元组”为中心:
- source IP address
- destination IP address
- source port
- destination port
- transport layer protocol
下载mothra的jar包及样例数据
- mothra下载地址是:mothra download
- 样例数据下载地址:fccx-sample.ipfix
spark-shell --jars mothra-1.5.1-bin/mothra_2.11-1.5.1-full.jar
import org.cert.netsa.mothra.datasources._
val df = spark.read.ipfix("file:///usr/local/zeppelin-0.10.0-bin-all/fccx-sample.ipfix")
df.show(1,0,true)
df.printSchema()
org.cert.netsa.util.versionInfo("mothra")
org.cert.netsa.util.versionInfo.detail("mothra")
读取数据
读取ipfix数据
ipfix的数据可以通过 yaf 产生,
注意:yaf.conf中需要开启如下2项配置:
- YAF_ROTATE_LOCATION=
If present, and YAF_IPFIX_PROTO is not present, write IPFIX files to the given file directory - YAF_ROTATE_TIME=30
Rotate time. If present, and YAF_ROTATE_LOCATION is present, rotate files every YAF_ROTATE_TIME seconds. Default is 120.
如何读取ipfix数据,详见datasources ipfix
注意:
- IPFIXFields.everything是显示IPFIX中的所有属性信息
- IPFIXFields 的用法详见与IPFIX数据源相关的字段集合定义,主要的属性集如下:
- core: Core IPFIX flow fields, including time and the 5-tuple.
- default: Default flow fields, including core fields, label fields, counts, and TCP flags, all as described above.
- everything: Accessible collection of all fields useful for working with YAF3 in particular.
import org.cert.netsa.mothra.datasources._
import org.cert.netsa.mothra.datasources.ipfix.IPFIXFields
val df = spark.read.fields(IPFIXFields.everything).ipfix("file:///usr/local/zeppelin-0.10.0-bin-all/yaf-data/*")
df.show(1,0,true)
df.printSchema()
import spark.implicits._
df.select("*").limit(10).show()
df.select("startTime","endTime","sourceIPAddress","sourcePort","destinationIPAddress","destinationPort","protocolIdentifier","packetCount","octetCount").where($"octetCount" > 10).limit(10).show()
df.write.json("file:///usr/local/zeppelin-0.10.0-bin-all/json1")
读取silk数据
如何读取ipfix数据,详见datasources silk
SiLK属性集定义参见SilkFields
import org.cert.netsa.mothra.datasources._
import org.cert.netsa.mothra.datasources.silk.flow.SilkFields
val df = spark.read.fields(SilkFields.all).silkFlow("file:///usr/local/zeppelin-0.10.0-bin-all/silk-data")
df.show(1,0,true)
df.printSchema()
# 统计协议分布
df.createOrReplaceTempView("t_netflow")
spark.sql("select protocol,count(*) as total from t_netflow group by protocol").show()
# 按来源IP统计
spark.sql("select sIP,count(*) as total from t_netflow group by sIP order by total desc limit 10").show()
# 按目的IP统计
spark.sql("select dIP,count(*) as total from t_netflow group by dIP order by total desc limit 10").show()
注意:mothra读取silk 仓库,产生的dataframe的schema如下图:
spark读取silk数据产生dataframe,统计数量时,报SilkDataFormatException: Magic number does not match,
查看org.cert.netsa.io.silk.Header.scala
源码 294行,原因是silk文件的前4位不等于magicNumber【magicNumber是0xdeadbeef】
- 对应的源码如下:
private[this] def readHeader(in: DataInputStream): Header = {
val magic = in.readInt()
if ( magic != magicNumber ) {
throw new SilkDataFormatException("Magic number does not match")
}
val fileFlags = in.readByte()
-
解决方案:
SiLK仓库目录中包含silk.conf配置文件,需要将这个文件删除。 -
详细报错信息:
scala> df.count()
[Stage 7:> (0 + 1) / 1]22/04/21 10:33:14 ERROR SilkFileRelation: Failure opening stream for 'file:/usr/local/zeppelin-0.10.0-bin-all/silk-data/silk.conf'
org.cert.netsa.io.silk.SilkDataFormatException: Magic number does not match
at org.cert.netsa.io.silk.Header$.readHeader(Header.scala:294)
at org.cert.netsa.io.silk.Header$.readFrom(Header.scala:283)
at org.cert.netsa.io.silk.RWRecReader$.ofInputStream(RWRecReader.scala:123)
at org.cert.netsa.mothra.datasources.silk.flow.SilkFileRelation$$anonfun$buildScan$$anonfun$apply$$anonfun$apply.apply(SilkFileRelation.scala:97)
at org.cert.netsa.mothra.datasources.silk.flow.SilkFileRelation$$anonfun$buildScan$$anonfun$apply$$anonfun$apply.apply(SilkFileRelation.scala:97)
at scala.util.Try$.apply(Try.scala:192)
at org.cert.netsa.mothra.datasources.silk.flow.SilkFileRelation$$anonfun$buildScan$$anonfun$apply.apply(SilkFileRelation.scala:96)
at org.cert.netsa.mothra.datasources.silk.flow.SilkFileRelation$$anonfun$buildScan$$anonfun$apply.apply(SilkFileRelation.scala:88)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
res8: Long = 52052
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)