1.sparkStreaming
流式处理框架,是Spark API的扩展,RDD最终封装到DStream中
2.第一个wordcount
pom依赖
org.apache.spark spark-streaming_2.123.0.0 provided
import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object SparkStreaming01 { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("streaming01") conf.setMaster("local[2]") //一个线程接收数据,一个线程处理数据 val sc: SparkContext = new SparkContext(conf) val ssc = new StreamingContext(sc, Durations.seconds(10)) //设置每10秒执行一次处理数据 val lines = ssc.socketTextStream("hadoop102", 999) //设置了虚拟机和端口号 //统计所有单词出现次数 val words=lines.flatMap(line=>line.split(" ")) val pairWords=words.map(word=>new Tuple2(word,1)) val result= pairWords.reduceByKey((v1,v2)=>{v1 + v2}) //output operation类算子 result.print() ssc.start() //启动sparkStreaming ssc.awaitTermination() } }
数据来源
3.foreachRDD算子
1.foreachRDD可以获取DStream中的RDD,可以对RDD使用RDD的算子 *** 作,但是一定要使用RDD的action算子触发执行
result.foreachRDD((rdd: RDD[(String, Int)]) => { val rdd1: RDD[String] = rdd.map(tp => { println("======="+tp) tp._1 + "=" + tp._2 }) rdd1.count() })
4.transform
transformation类算子,对Dstream做RDD到RDD的任意 *** 作
5.updateStateByKey
transformation类算子,对每一个key的状态进行更新
6.Driver HA
第一:提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver。
第二:代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)