Spark Streaming 是什么
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、
Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语
如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。
Spark Streaming 的特点
易于使用。Spark Streaming提供了很多高级 *** 作算子,允许以编写批处理作业的方式编写流式作业。它支持Java、Scala和Python语言。
易于与Spark体系整合。通过在Spark Core上运行Spark Streaming,可以在Spark Streaming中使用与Spark RDD相同的代码进行批处理,构建强大的交互应用程序,而不仅仅是数据分析。
接下来进行一个简单的streaming实 *** :
1. 准备netcat-win3.2-1.12压缩包,将其解压到D盘(自己选择位置,在解压前新建文件夹netcat)
2. 添加环境变量
3. 测试netcat是否成功运行,打开两个命令窗口,第一个执行nc -L –p 9000 -v 第二个执行nc localhost 9000,如图所示:
能够接受和发送消息则说明netcat安装成功
4. 打开Idea,创建streaming包,新建StreamingWordCount.scala
5. 在pom.xml中添加依赖
org.apache.spark spark-streaming_2.123.0.0
6. 接下来编写代码
package com.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamWordCount { def main(args: Array[String]): Unit = { //TODO 创建环境对象 val conf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount") val ssc = new StreamingContext(conf,Seconds(3)) //TODO 逻辑处理 //获取端口数据 val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val wordToOne = words.map((_,1)) val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_+_) wordToCount.print() // 由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭 // 如果main方法执行完毕,应用程序也会自动结束。所以不能让main执行完毕 //ssc.stop() // 1. 启动采集器 ssc.start() // 2. 等待采集器的关闭 ssc.awaitTermination() } }
7. 先不运行IDEA中代码,打开cmd窗口,输入命令nc - lp 9999连接到9999端口
8. 此时运行IDEA中代码,在连接到9999端口后输入所需要的单词数,这里以yanghao yanghao为例
在IDEA运行窗口会显示
最后,因为streaming是一个流处理 *** 作,所以会随着时间来切分数据,每隔一段时间处理一次
(如有错误,还请各位大佬指点指点)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)