SparkStreaming概述Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。
数据处理延迟方式 实时:数据处理在毫秒级别,秒 离线:数据处理延迟以小时,天为单位 数据处理的方式 流式:一个一个数据进行处理 批处理:一批一批数据进行处理 SparkCore: 离线数据分析框架|批处理 SparkStreaming: 基于SparkCore来完成实时数据处理分析(执行场景在离线批处理和实时流式之间,可以称为准实 时,微批次数据处理框架)SparkStreaming架构 背压机制
调整数据采集能力与数据消费能力
Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。 为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。 通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。Dstream入门 WordCount案例实 ***
需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
添加依赖关系
org.apache.spark spark-streaming_2.123.0.0
idea代码
package com.pihao.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCount02 { def main(args: Array[String]): Unit = { //TODO SparkStreaming流式数据处理 //TODO 建立环境 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming") //SparkStreaming环境对象的第二个参数表示数据采集周期 val ssc = new StreamingContext(sparkConf,Seconds(3)) //TODO 执行 *** 作 //创建采集器(一行一行) val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999) //监听本机的9999端口 //拿到一个一个的单词 val wordDS: DStream[String] = socketDS.flatMap(_.split(" ")) val wordToCountDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_+_) wordToCountDS.print() //启动采集器 ssc.start() //等待结束 ssc.awaitTermination() } }
在hadoop102启动nc服务,然后启动idea
# nc -lk 9999 # nc -lp 9999 [atguigu@hadoop102 ~]$ nc -lk 9999 pihao hello worldDstream创建 RDD队列
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。
案例实 ***
循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount
object RDDStream { def main(args: Array[String]) { //1.初始化Spark配置信息 val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream") //2.初始化SparkStreamingContext val ssc = new StreamingContext(conf, Seconds(4)) //3.创建RDD队列 val rddQueue = new mutable.Queue[RDD[Int]]() //4.创建QueueInputDStream val inputStream = ssc.queueStream(rddQueue,oneAtATime = false) //5.处理队列中的RDD数据 val mappedStream = inputStream.map((_,1)) val reducedStream = mappedStream.reduceByKey(_ + _) //6.打印结果 reducedStream.print() //7.启动任务 ssc.start() //8.循环创建并向RDD队列中放入RDD for (i <- 1 to 5) { rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10) Thread.sleep(2000) } ssc.awaitTermination() } }自定义数据源
自定义一个数据源,获取其中的数据
package com.pihao.streaming import java.util.concurrent.TimeUnit import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.receiver.Receiver import scala.util.Random object SparkStreaming_DIY { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming") val ssc = new StreamingContext(sparkConf,Seconds(3)) //执行 *** 作 val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver()) //使用自己定义的接收器 ds.print() ssc.start() ssc.awaitTermination() } class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){ private var flag = true //开启接受 override def onStart(): Unit = { new Thread(new Runnable { override def run(): Unit = { while(flag){ val i: Int = new Random().nextInt(100) store(i+"") TimeUnit.NANOSECONDS.sleep(500) } } }).start() } //关闭接收 override def onStop(): Unit = { flag = false } } }Kafka数据源 版本选型
1.ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用 2.DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。(采集和计算都在一个节点,容易控制)Kafka 0-10 Direct 模式
需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做计算,最终打印到控制台。
提前创建kafka主题
启动zookeeper集群 启动kafka集群 # 创建sparkstreaming的topic [atguigu@hadoop102 ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic sparkstreaming --partitions 3 --replication-factor 2 [atguigu@hadoop102 ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list __consumer_offsets sparkstreaming [atguigu@hadoop102 ~]$
添加依赖
org.apache.spark spark-streaming-kafka-0-10_2.123.0.0 com.fasterxml.jackson.core jackson-core2.10.1
编写代码
package com.pihao.streaming import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} object KafkaStream { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming") val ssc = new StreamingContext(sparkConf,Seconds(3)) //TODO 执行 *** 作 //定义kafka的连接配置 val kafkaPara: Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_ConFIG -> "spark", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) //Kafka专门用于实时数据生成,提供了很多封装类 //ConsumerRecord是一个KV val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, //任务与采集器的位置关系,当前表示自动选择,也可以指定哪个broker ConsumerStrategies.Subscribe[String, String](Set("sparkstreaming"), kafkaPara)) //获取ConsumerRecord中的数据 val kafkaData: DStream[String] = kafkaDStream.map(_.value()) kafkaData.print() ssc.start() ssc.awaitTermination() } }
Dstream转换接收成功
无状态化 *** 作DStream上的 *** 作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换 *** 作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
Transform无状态转化 *** 作就是把简单的RDD转化 *** 作应用到每个批次上,也就是转化DStream中的每一个RDD。
Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次(周期性执行)。其实也就是对DStream中的RDD应用转换。
object Transform { def main(args: Array[String]): Unit = { //创建SparkConf val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount") //创建StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(3)) //创建DStream val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("host", 9999) //在某些情况,DS对象不能执行所有的 *** 作 //如果想要进行特殊 *** 作,那么可以直接通过底层的RDD进行 val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => { val words: RDD[String] = rdd.flatMap(_.split(" ")) val wordAndOne: RDD[(String, Int)] = words.map((_, 1)) val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _) value }) //打印 wordAndCountDStream.print //启动 ssc.start() ssc.awaitTermination() } }join
两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相同。
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object JoinTest { def main(args: Array[String]): Unit = { //1.创建SparkConf val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JoinTest") //2.创建StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(5)) //3.从端口获取数据创建流 val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999) val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("linux2", 8888) //4.将两个流转换为KV类型 val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1)) val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a")) //5.流的JOIN val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream) //6.打印 joinDStream.print() //7.启动任务 ssc.start() ssc.awaitTermination() } }有状态转化 *** 作
UpdateStateByKey有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。需要保存之前每次数据的聚合 *** 作
package com.pihao.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreaming_State { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming") val ssc = new StreamingContext(sparkConf,Seconds(3)) //检查点应该设置在hdfs中比较稳妥 ssc.checkpoint("cp") //TODO 执行 *** 作 val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999) val wordDS: DStream[String] = socketDS.flatMap(_.split(" ")) val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1)) //reduceByKey方法属于无状态 *** 作的方法 //updateStateByKey方法属于有状态 *** 作的方法 //方法参数中的第一个参数,表示相同key的value序列 //方法参数中的第二个参数,表示相同key缓冲区的数据值 val state: DStream[(String, Int)] = wordToOneDS.updateStateByKey( (seq: Seq[Int], buffer: Option[Int]) => { val newValue: Int = seq.sum + buffer.getOrElse(0) Option(newValue) } ) state.print() ssc.start() ssc.awaitTermination() } }WindowOperations
所谓的窗口 *** 作,其实就是将多个采集周期的数据一次性进行处理,而不是一个采集周期处理一次。
SparkStreaming总的窗口范围不能切断采集周期,可以是采集周期的整数倍
SparkStreaming总的窗口滑动幅度也应该是采集周期的整数倍
package com.pihao.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object SparkStreaming_window{ def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming") val ssc = new StreamingContext(sparkConf,Seconds(3)) //TODO 执行 *** 作 val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999) val wordDS: DStream[String] = socketDS.flatMap(_.split(" ")) val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1)) //window方法可以传递一个参数,表示窗口的范围(时间周期,应该为采集周期的整数倍) //window方法可以传递两个参数, //第一个表示窗口的范围(时间周期,应该为采集周期的整数倍) //第二个表示窗口滑动的步长(时间周期,也是采集周期的整数倍,不传默认是一个一个采集周期) //窗口计算的时间以窗口步长做基础的 val windowDS: DStream[(String, Int)] = wordToOneDS.window(Seconds(6),Seconds(3)) val resultDS: DStream[(String, Int)] = windowDS.reduceByKey(_+_) resultDS.print() ssc.start() ssc.awaitTermination() } }增量计算
增量计算 = 当前窗口的值+ 新进入到窗口的数据-排除掉窗口的数据
使用场合:适合在窗口范围大,滑动浮动小的场合,这样就会有大量的重复数据
package com.pihao.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object Sparkstream_window2 { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming") val ssc = new StreamingContext(sparkConf,Seconds(3)) ssc.checkpoint("cp") //TODO 执行 *** 作 val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999) val wordDS: DStream[String] = socketDS.flatMap(_.split(" ")) val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1)) //增量计算 //需要设定检查点路径 val windowDS: DStream[(String, Int)] = wordToOneDS.reduceByKeyAndWindow( (x: Int, y: Int) => { x + y }, (x: Int, y: Int) => { x - y }, Seconds(6), Seconds(3) ) windowDS.print() ssc.start() ssc.awaitTermination() } }Dstream输出
输出 *** 作指定了对流数据经转化 *** 作得到的数据所要执行的 *** 作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出 *** 作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出 *** 作,整个context就都不会启动。
#1 print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的 *** 作叫print()。 #2 saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。 #3 saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。 #4 saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。 #5 foreachRDD(func):这是最通用的输出 *** 作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。 通用的输出 *** 作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动 *** 作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。 注意: 1)连接不能写在driver层面(序列化) 2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失; 增加foreachPartition,在分区创建(获取)。优雅的关闭
流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。
使用外部文件系统来控制内部程序关闭。
package com.pihao.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState} import org.apache.spark.streaming.dstream.{ ReceiverInputDStream} object SparkStreaming_Close { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming") val ssc = new StreamingContext(sparkConf,Seconds(3)) //TODO 执行 *** 作 val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999) socketDS.print() //stop()方法主要用于停止数据采集和Driver的调度 //不可能启动采集后马上停止,所以一般在业务更新或者逻辑更新的场合停止 //在main线程中停止是不现实的。应该启动一个新的线程来执行停止 *** 作 new Thread(new Runnable { override def run(): Unit = { while(true){ Thread.sleep(10000) //时间 //这个时间应该是判断数据处理是否可以继续的标记 //这个标记应该多线程都可以访问 //这个标记一般情况放在第三方的系统中(mysql,redis) val state: StreamingContextState = ssc.getState() //装填 if (state == StreamingContextState.ACTIVE){ ssc.stop(true,true) //优雅的关闭 System.exit(0) //关闭线程 } } } }).start() ssc.start() ssc.awaitTermination() } }案例
模拟实时生成数据,然后统计分析
package com.pihao.main import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import scala.collection.mutable.ListBuffer import scala.util.Random object SparkStreaming_MockData { def main(args: Array[String]): Unit = { // 创建配置对象 val prop = new Properties() // 添加配置 prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092") prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String,String](prop) val topic = "sparkstreaming" val areas = ListBuffer("华北","华东","华南") val citys = ListBuffer("北京","上海","深圳") while(true){ Thread.sleep(2000) //生成数据 for (i <- 1 to new Random().nextInt(20)){ val area: String = areas(new Random().nextInt(3)) val city: String = citys(new Random().nextInt(3)) val userId = new Random().nextInt(10) val adId = new Random().nextInt(10) val message = s"${System.currentTimeMillis()} ${area} ${city} ${userId} ${adId}" val record = new ProducerRecord[String,String](topic,message) producer.send(record) } } } }
package com.pihao.main import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} object KafkaStream { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming") val ssc = new StreamingContext(sparkConf,Seconds(3)) //TODO 执行 *** 作 //定义kafka的连接配置 val kafkaPara: Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_ConFIG -> "spark", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer" ) //Kafka专门用于实时数据生成,提供了很多封装类 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, //任务与采集器的位置关系,当前表示自动选择,也可以指定哪个broker ConsumerStrategies.Subscribe[String, String](Set("sparkstreaming"), kafkaPara)) //获取广告点击的数据 val kafkaData: DStream[AdvClickData] = kafkaDStream.map( data => { val kafkaVal: String = data.value() val datas: Array[String] = kafkaVal.split(" ") AdvClickData(datas(0),datas(1),datas(2),datas(3),datas(4)) } ) // todo 1.周期性获取数据库中最新的黑名单数据 val reduceDS: DStream[((String, String, String), Int)] = kafkaData.transform( rdd => { // todo 2.判断当前周期内的数据是否已经在黑名单中 // 查询数据库 *** 作select val blackList = List[String]() val filterRDD: RDD[AdvClickData] = rdd.filter( data => !blackList.contains(data.userId) ) //todo 3.将一个周期内的数据进行统计 filterRDD.map( data => { //将数据组装返回 timestamp这里要换成日期格式 ((data.timestamp, data.userId, data.adId), 1) } ).reduceByKey(_ + _) } ) //todo 4.判断统计的结果是否超过阈值 reduceDS.foreachRDD( rdd => { rdd.foreach { case ((timestamp, userId, adId), sum) => { //如果当前采集周期内的sum都大于100,那么将userId就直接拉入黑名单 if (sum>=100){ //insert into }else{ val oldStateval = 0 //select 根据userId查询mysql中旧的值 val newStateval = 0 + sum if (newStateval >= 100){ // insert into }else{ // update 根据数据库newStateval } } } } } ) ssc.start() ssc.awaitTermination() } case class AdvClickData(timestamp: String, area: String, city: String, userId: String, adId: String) }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)