UpdateStateBykey会统计全局的key的状态,不管有没有数据输入,它会在每一个批次间隔返回之前的key的状态。updateStateBykey会对已存在的key进行state的状态更新,同时还会对每个新出现的key执行相同的更新函数 *** 作。如果通过更新函数对state更新后返回来为none,此时刻key对应的state状态会删除(state可以是任意类型的数据结构)。
适用场景:
UpdataStateBykey可以用来统计历史数据,每次输出所有的key值。列如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的返回量等指标。
适用实例条件:
- 首先会以DStream中的数据进行按key做reduce *** 作,然后再对各个批次的数据进行累加。
- updataStateByKey要求必须设置checkpoint点(设置中间结果文件夹)
- updataStateByKey方法中updataFunc就要传入的参数,Seq[V]表示当前key对应的所有值,Option[S]是当前key的历史状态,返回的是新的封装的数据。
def main(args: Array[String]): Unit = { // Logger.getLogger("org").setLevel(Level.WARN) val conf = new SparkConf() .setAppName("Test") .setMaster("local[*]") .set("spark.streaming.receiver.writeAheadLog.enable", "true") val ssc = new StreamingContext(conf, Seconds(5)) ssc.checkpoint("D:\mydata\checkpoint\test") //必须转换成可迭代类型 val topics = Array(Test_topic) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> bootstrapServers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) // val pattern = new Regex("[0-9]+") val pattern = "\d+".r // val str = "44: this is a test_topic 44 kafka data" // println((pattern findAllIn str).mkString(",")) val wordCountDS: DStream[(String, Int)] = kafkaDStream .map(record => (pattern findAllIn record.value).next()) .map(w => (w, 1)) .reduceByKey((a, b) => a + b) .updateStateByKey(update) .transform(rdd => { rdd.sortBy(t => t._2, ascending = false) }) wordCountDS.print() ssc.start() ssc.awaitTermination() } //new_state=("hello",1),old_state=("hello",3) //newValue=(1),oldstate=(3) def update(newValue: Seq[Int], oldstate: Option[Int]): Option[Int] = { val oldCount = oldstate.getOrElse(0) //oldCount(3) println(s"updateStateByKey: ${System.currentTimeMillis()}") Some(oldCount + newValue.sum) //newstage(4) }mapWithState(基于磁盘存储+缓存)
mapWithState也是用于对于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,类型于增量的感觉。
使用场景
mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里余额信息。
适用实例条件:
- 如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值
- 还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在fun中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。3. checkpoint不会必须的
def main(args: Array[String]): Unit = { //1、创建sparkConf val sparkConf = new SparkConf().setAppName("scala wordcount").setMaster("local[*]") //2、创建SparkContext val sc = new SparkContext(sparkConf) //3、创建StreamingContext val ssc = new StreamingContext(sc, Seconds(5)) //4、设置checkpoint地址 ssc.checkpoint("D:\mydata\checkpoint\test") //5、必须转换成可迭代类型 val topics = Array(Test_topic) //6、消费者相关参数配置 val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> bootstrapServers, ConsumerConfig.GROUP_ID_ConFIG -> groupId, ConsumerConfig.AUTO_OFFSET_RESET_ConFIG -> "earliest", ConsumerConfig.ENABLE_AUTO_COMMIT_ConFIG -> (false: java.lang.Boolean), ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> classOf[StringDeserializer] ) //7、通过KafkaUtils.createDirectStream对接kafka val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) //8、获取topic中的数据 val topicData: DStream[String] = kafkaDStream.map(_.value()) //9、切分每一行,每个单词计为1 val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_, 1)) //10、统计相同单词 val wordCount: DStream[(String, Int)] = wordAndOne.reduceByKey((a, b) => a + b) //11、统计变更数据 val mapWithStateDStream: MapWithStateDStream[String, Int, Int, (String, Int)] = wordCount.mapWithState(StateSpec.function(mappingFunc)) //12、打印预览(MappedType)结果 //mapWithStateDStream.print() // mapWithStateDStream.foreachRDD(rdd => { rdd.foreachPartition(p => { // p.foreach(line => { println(s"结果: ${line}") }) // }) }) // //当前状态的键值快照 // val stateSnapshots: DStream[(String, Int)] = wordCount.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots() // // stateSnapshots.print() //13、开启计算 ssc.start() ssc.awaitTermination() } // word="hello",value=1,state=("hello",3) val mappingFunc = (word: String, value: Option[Int], state: State[Int]) => { val sum = value.getOrElse(0) + state.getOption.getOrElse(0) //sum = 4 // val output: String = s"${word} -> ${sum}" //("hello-> 4") val output: (String, Int) = (word, sum) //("hello-> 4") state.update(sum) //更新key="hello"的状态为4 println(s"mapWithState: ${System.currentTimeMillis()}") output //返回("hello",4) }区别:
updataeStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。
mapWithState只返回变化后的key的值,这样做的好处是,我们可以只关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key 的数据。这样的话,即使数据量很大,checkpint也不会updateBykey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。
详细使用:https://www.jianshu.com/p/a54b142067e5
map和mapPartition的区别:map是对RDD的每一个元素使用一个方法 *** 作,mapPartitions是对每个partition的迭代器使用一个方法 *** 作。
MapPartitions的优点:使用MapPartitions *** 作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。通常体现在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。SparkSql或Dataframe默认会对程序进行mapPartition的优化。
MapPartitions的缺点:如果是普通的map *** 作,一次function的执行就处理一条数据,可以将已经处理完的数据从内存里面释放掉。
所以说普通的map *** 作通常不会导致内存的OOM异常。 但是MapPartitions *** 作,对于大量数据来说,如果直接将迭代器中数据取出来放内存,可能就OOM(内存溢出)。
foreachPartition是spark-core的action算子,foreachPartition是对每个partition中的iterator分别处理,通过将iterator传入function对进行数据的处理,也就是说在foreachPartition中函数处理的是分区迭代器,而非具体的数据,源码中的注释是:Applies a function func to each parition of this RDD.(将函数func应用于此RDD的每个分区)
foreach:foreach也是spark-core的action算子,与foreachPartition类似的是,foreach也是对每个partition中的iterator分别处理,通过对每个iterator迭代获取数据传给function进行数据的处理,也就是说在foreach中函数处理的是具体的数据,源码中的注释是:Applies a function fun to all elements of this RDD.(将函数func用于此RDD的所有元素).
foreachRDD与上面两个的区别:foreachRDD是sparkStreaming的OutputOperation算子。但是foreachRDD并不会触发立即处理,必须在碰到sparkcore的foreach或者foreachPartition算子后,才会触发action动作。同时要注意,function的应用在的driver端进行,而不是Executor端进行。
def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[*]") .getOrCreate() //隐式转换(spark自适应) import spark.implicits._ val sc = spark.sparkContext val sourceRDD: RDD[String] = sc.parallelize(List[String]( "hello", "hello", "hello", "spark", "spark"), 2) //groupBy指定键来标示一个数据,便于之后的聚合 val groupRDD: RDD[(String, Iterable[String])] = sourceRDD.groupBy(x => x) val mapPartRDD = groupRDD.mapPartitions(iterable1 => { iterable1.map { case (word, words) => { (word, words.size) } } }) mapPartRDD.foreach(println) val mapRDD = groupRDD.map { case (word, iterable) => { (word, iterable.size) } } mapRDD.foreachPartition(iterable => { iterable.foreach(println) }) spark.stop() }GroupByKey和ReduceByKey的区别: GroupByKey(numPartitions=None)
官方解释:将 RDD 中每个键的值分组为一个序列。 使用 numPartitions 分区对生成的 RDD 进行哈希分区。 注意:如果您分组是为了对每个键执行聚合(例如求和或平均值),则使用 reduceByKey 或 aggregateByKey 将提供更好的性能。
– GroupByKey 只是将相同键的给归纳到一个序列,没有其它函数 *** 作 ReduceByKey(func, numPartitions=None)官方解释:使用关联归约函数合并每个键的值。 这也将在结果发送到reduce端之前在每个映射器上本地执行合并, 输出将使numPartitions 分区进行哈希分区,如果未指定 numPartitions,则使用默认并行级别。
– ReduceByKey shuffle前将键相同的值局部函数聚合,减轻shuffle时的io压力,shuffle后在各自分区继续函数聚合。 aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])官方解释:zeroValue是给每个分区的初始值(相当于对应键的一个初始元素),map端每个分区数据都会使用第一个函数seqOp进行聚合 *** 作,
shuffle后分区使用第二个函数combOp进行 *** 作,并且最终返回值类型与初始值类型一样(和原有数据类型可以不一致)。
def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[*]") .getOrCreate() //隐式转换(spark自适应) import spark.implicits._ val sc = spark.sparkContext val sourceRDD: RDD[String] = sc.parallelize(List[String]( "hello", "hello", "hello", "spark", "spark"), 2) val wordPairRDD: RDD[(String, Int)] = sourceRDD.map((_, 1)) println("=" * 50) //相同键放在同一分区,值合并为一个序列 val groupByRDD: RDD[(String, Iterable[Int])] = wordPairRDD.groupByKey() groupByRDD.foreach(println) println("=" * 50) //对各键的序列进行聚合 *** 作 groupByRDD.map(pair =>{ val iterablevalue: Iterable[Int] = pair._2 var sum = 0 iterablevalue.foreach(num => sum += num) (pair._1,sum) }).foreach(println) println("=" * 50) //直接使用函数在shuffle前聚合一遍,减轻IO压力,shuffle后各分区在聚合一遍 val reduceByKeyRDD = wordPairRDD.reduceByKey(_ + _) reduceByKeyRDD.foreach(println) println("=" * 50) //查看数据所在分区 wordPairRDD.mapPartitionsWithIndex((index, ite) => { ite.map((index, _)) }).foreach(println) println("=" * 50) //分区数量 println("partitions: " + wordPairRDD.partitions.length) println("=" * 50) //shuffle前分区的聚合函数 val sepOP = (a: String, b: Int) => { a + "|" + b } //shuffle后分区的聚合函数 val combOP = (a: String, b: String) => { a + "-Shufix-" + b } //柯里化func(x)(y) //zeroValue给每个分区不同键的初始值 //可以根据初始值类型来返回最终聚合类型 val aggregateByKeyRDD: RDD[(String, String)] = wordPairRDD.aggregateByKey("Prefix")(sepOP, combOP) aggregateByKeyRDD.foreach(println) spark.stop() }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)