Spark 案例实 *** (Spark Streaming之WordCount)

Spark 案例实 *** (Spark Streaming之WordCount),第1张

Spark 案例实 *** (Spark Streaming之WordCount)

Spark Streaming 是什么

Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、

Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语

如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。

Spark Streaming 的特点

易于使用。Spark Streaming提供了很多高级 *** 作算子,允许以编写批处理作业的方式编写流式作业。它支持Java、Scala和Python语言。

易于与Spark体系整合。通过在Spark Core上运行Spark Streaming,可以在Spark Streaming中使用与Spark RDD相同的代码进行批处理,构建强大的交互应用程序,而不仅仅是数据分析。

接下来进行一个简单的streaming实 *** :

1. 准备netcat-win3.2-1.12压缩包,将其解压到D盘(自己选择位置,在解压前新建文件夹netcat)

2. 添加环境变量

3. 测试netcat是否成功运行,打开两个命令窗口,第一个执行nc  -L –p 9000  -v  第二个执行nc localhost 9000,如图所示:

能够接受和发送消息则说明netcat安装成功

4. 打开Idea,创建streaming包,新建StreamingWordCount.scala

5. 在pom.xml中添加依赖


     org.apache.spark
     spark-streaming_2.12
     3.0.0

 6. 接下来编写代码

package com.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    //TODO  创建环境对象
    val conf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    val ssc = new StreamingContext(conf,Seconds(3))
    //TODO 逻辑处理
    //获取端口数据
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordToOne = words.map((_,1))
    val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_+_)
    wordToCount.print()

    // 由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭
    // 如果main方法执行完毕,应用程序也会自动结束。所以不能让main执行完毕
    //ssc.stop()
    // 1. 启动采集器
    ssc.start()
    // 2. 等待采集器的关闭
    ssc.awaitTermination()
  }
}

7. 先不运行IDEA中代码,打开cmd窗口,输入命令nc - lp 9999连接到9999端口

8. 此时运行IDEA中代码,在连接到9999端口后输入所需要的单词数,这里以yanghao yanghao为例

在IDEA运行窗口会显示

 

最后,因为streaming是一个流处理 *** 作,所以会随着时间来切分数据,每隔一段时间处理一次

(如有错误,还请各位大佬指点指点)

 

 

 

 

 

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5677073.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存