1 简介2 实例3 架构与抽象4 转化 *** 作5 输出 *** 作6 24/7不间断运行
1 简介许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用、训练机器学习模型的应用,还有自动检测异常的应用。Spark Streaming是Spark为这些应用而设计的模型。Spark Streaming使用离散化流作为抽象表示,叫做DStream。DStream可以从各种输入源创建,比如Flume、Kafka或者HDFS。创建出来的DStream支持两种 *** 作,一种是转化 *** 作(transformation),会生成一个新的DStream,另外一种是输出 *** 作(output operation),可以把数据写入外部系统中。
2 实例让我们先来看一个简单的例子。我们会从一台服务器的7777端口上收到一个以换行符分隔的多行文本,要从中筛选出包含单词 error 的 行,并打印出来。Spark Streaming 程序最好以使用 Maven 或者 sbt 编译出来的独立应用的形式运行。Spark Streaming 虽然是 Spark 的一部分,它在 Maven 中也以独立工件的形式提供,你也需要在 工程中添加一些额外的 import 声明。
我们从创建 StreamingContext开始,它是流计算功能的主要入口。StreamingContext 会在底层创建出SparkContext,用来处理数据。其构造函数还接收用来指定多长时间处理一次新数据的批次间隔(batch interval)作为输入,这里我们把它设为1秒。接着,调 用socketTextStream() 来创建出基于本地 7777 端口上收到的文本数据的 DStream。然后把 DStream通过filter() 进行转化,只得到包含“error”的行。最后,使用输出 *** 作 print() 把一些筛选出来的行打印出来。如下是基于Scala进行流式筛选 *** 作:
val ssc = new StreamingContext(conf, Second(1)) val lines = ssc.socketTextStream("localhost", 7777) val errorLines = lines.filter(_.contains("error")) errorLines.print()
。要开始接收数据,必须显式调用 StreamingContext的start()方法。这样,Spark Streaming 就会开始把Spark作业不断交给下面的 SparkContext 去调度执行。执行会在另一个线程中进行,所以需要调用awaitTermination 来等待流计算完成,来防止应用退出。
ssc.start() ssc.awaitTermination()3 架构与抽象
Spark Streaming 使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。Spark Streaming 从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。每个输入批次都形成一个RDD,以 Spark 作业的方式处理并生成其他的 RDD。
高层次的架构图如下:
我们已经讲到过,Spark Streaming的编程抽象是离散化流,也就是 DStream。它是一个 RDD序列,每个 RDD 代表数据流中一个时间片内的数据。如下图所示:
你可以从外部输入源创建 DStream,也可以对其他 DStream 应用进行转化 *** 作得到新的DStream。DStream 支持许多RDD的转化 *** 作。另外,DStream还有 “有状态”的转化 *** 作,可以用来聚合不同时间片内的数据。上述例子中的DStream及其转化关系如下图所示:
除了转化 *** 作以外,DStream还支持输出 *** 作,比如在示例中使用的 print()。输出 *** 作 和 RDD 的行动 *** 作的概念类似。Spark 在行动 *** 作中将数据写入外部系统中,而 Spark Streaming 的输出 *** 作在每个时间区间中周期性执行,每个批次都生成输出。
Spark Streaming 对DStream 提供的容错性与 Spark为RDD 所提供的容错性一致:只要输入数据还在,它就可以使用 RDD 谱系重算出任意状态 (比如重新执行处理输入数据的 *** 作)。默认情况下,收到的数据分别存在于两个节点上,这样 Spark 可以容忍一个工作节点的故障。不过,如果只用谱系图来恢复的话,重算有可能会花很长时间,因为需要处理从 程序启动以来的所有数据。因此,Spark Streaming 也提供了检查点机制,可以把状态阶段 性地存储到可靠文件系统中(例如 HDFS或者S3)。一般来说,你需要每处理 5-10 个批次 的数据就保存一次。在恢复数据时,Spark Streaming 只需要回溯到上一个检查点即可。
DStream的转化 *** 作可以分为==无状态(stateless)和有状态(stateful)==两种:
在无状态转化 *** 作中,每个批次的处理不依赖于之前批次的数据。第3章和第4章中所 讲的常见的RDD转化 *** 作,例如map()、filter()、reduceByKey() 等,都是无状态转化 *** 作。相对地,有状态转化 *** 作需要使用之前批次的数据或者是中间结果来计算当前批次的数 据。有状态转化 *** 作包括基于滑动窗口的转化 *** 作和追踪状态变化的转化 *** 作。
无状态转化 *** 作就是把简单的 RDD 转化 *** 作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。
DStream 的有状态转化 *** 作是跨时间区间跟踪数据的 *** 作;也就是说,一些先前批次的数据也被用来在新的批次中计算结果。主要的两种类型是滑动窗口和updateStateByKey(),前者以一个时间阶段为滑动窗口进行 *** 作,后者则用来跟踪每个键的状态变化(例如构建 一个代表用户会话的对象)。
5 输出 *** 作
输出 *** 作指定了对流数据经转化 *** 作得到的数据所要执行的 *** 作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出 *** 作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出 *** 作,整个context 就都不会启动。常用的一种调试性输出 *** 作是print(),它会在每个批次中抓取 DStream 的前十个元素打印出来。Spark Streaming 对于 DStream 有 与 Spark 类似的 save() *** 作,它们接受一个目录作为参数来存储文件,还支持通过可选参数来设置文件的后缀名。Spark Streaming 对于 DStream 有 与 Spark 类似的 save() *** 作,它们接受一个目录作为参数来存储文件,还支持通过可选参数来设置文件的后缀名。在Scala中将DStream保存为文本文件如下:
requestCount.saveAsTextFiles("outputDir","txt")
还有一个更为通用的saveAsHadoopFiles() 函数,接收一个 Hadoop 输出格式作为参数。最后,还有一个通用的输出 *** 作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和 transform() 有些类似,都可以让我们访问任意 RDD。在foreachRDD() 中,可以重用我们在Spark 中实现的所有行动 *** 作。foreachRDD() 也可以提供给我 们当前批次的时间,允许我们把不同时间的输出结果存到不同的位置。
6 24/7不间断运行Spark Streaming 的一大优势在于它提供了强大的容错性保障。只要输入数据存储在可靠的系统中,Spark Streaming 就可以根据输入计算出正确的结果,提供“精确一次”执行的语义。检查点机制是我们在Spark Streaming 中用来保障容错性的主要机制。它可以使 Spark Streaming 阶段性地把应用数据存储到诸如 HDFS 或 Amazon S3 这样的可靠存储系统中, 以供恢复时使用。检查点机制主要为以下两个目的服务:
控制发生失败时需要重算的状态数。Spark Streaming可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样Spark Streaming 就可以读取之前运行的程序处理数据的进度,并从那里继续。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)