需求:读取hdfs上的Herry.txt文件,进行词频统计
package com.zch.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object sparkStreaming01_FileWordCount { def main(args: Array[String]): Unit = { //1.初始化 Spark 配置信息 val sparkConf = new SparkConf().setMaster("local[*]") .setAppName("StreamWordCount") //2.初始化 SparkStreamingContext val ssc = new StreamingContext(sparkConf, Seconds(5)) //3.监控文件夹创建 DStream val dirStream = ssc.textFileStream("hdfs://zhaohui01:8020/Herry.txt") //4.将每一行数据做切分,形成一个个单词 val wordStreams = dirStream.flatMap(_.split("t")) //5.将单词映射成元组(word,1) val wordAndoneStreams = wordStreams.map((_, 1)) //6.将相同的单词次数做统计 val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _) //7.打印 wordAndCountStreams.print() //8.启动 SparkStreamingContext ssc.start() ssc.awaitTermination() } }2、创建DStream Queue
package com.zch.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object sparkStreaming02_Queue { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .setMaster("local[*]") .setAppName("queue") val ssc = new StreamingContext(sparkConf, Seconds(3)) // 创建RDD队列 val rddQueue = new mutable.Queue[RDD[Int]]() // 创建QueueInputDStream val inputStream = ssc.queueStream(rddQueue, oneAtATime = false) val mappedStream = inputStream.map((_, 1)) val reducedStream = mappedStream.reduceByKey(_ + _) reducedStream.print() ssc.start() for(i <- 1 to 10){ rddQueue += ssc.sparkContext.makeRDD(1 to 300 , 10) Thread.sleep(2000) } } }3、自定义数据源
需要继承Receiver,并实现onStart和onStop方法来自定义数据源
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){ private var flag = true override def onStart(): Unit = { new Thread (() => while (flag){ val string ="采集的数据为:"+ new Random().nextInt(10).toString store(string) Thread.sleep(500) }).start() } override def onStop(): Unit = { flag = false } }4、Kafka数据源(重点)
需求:通过SparkStreaming从Kafka读取数据
1.导入依赖
org.apache.spark spark-streaming-kafka-0-10_2.123.1.2
2.编写代码
package com.zch.spark.streaming import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object sparkStreaming04_Kafka { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .setMaster("local[*]") .setAppName("kafka") val ssc = new StreamingContext(sparkConf, Seconds(3)) val kafkaParam: Map[String, String] = Map[String, String]( ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "zhaohui01:9092,zhaohui02:9092,zhaohui03:9092", ConsumerConfig.GROUP_ID_ConFIG -> "__consumer", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", ) val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("first01"), kafkaParam) ) kafkaDS.map(_.value()).print() ssc.start() ssc.awaitTermination() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)